package com.dingtaxi.common.protocol.stream;

import android.content.Context;
import android.support.v7.widget.helper.ItemTouchHelper;
import com.dingtaxi.common.AppState;
import com.dingtaxi.common.api.CustomEvent;
import com.dingtaxi.common.protocol.BaseProtocol;
import com.dingtaxi.common.utils.LogUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.greenrobot.event.EventBus;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Observable;
import java.util.Observer;
import reactive.CloseStream;
import reactive.Error;
import reactive.Event;
import reactive.Frame;
import reactive.OpenStream;
import reactive.ResponseListener;
import reactive.StreamEOF;
import reactive.StreamNext;
import reactive.StreamNextF;
import reactive.StreamStarted;

/* loaded from: classes.dex */
public class StreamProtocol extends BaseProtocol {
    private static HashMap<String, Stream> streams = new HashMap<>();
    private static HashMap<Integer, Stream> streamsById = new HashMap<>();
    private static final ObjectMapper mapper = new ObjectMapper();

    /* loaded from: classes.dex */
    public static abstract class Stream<X> extends StreamBehavior<X> {
        public static final int CLOSED = 2;
        public static final int CLOSING = 3;
        public static final int OPEN = 1;
        private int _asked;
        private int _askedf;
        private int _received;
        private int _receivedf;
        private Frame frame;
        private final String key;
        private final String path;
        private final TypeReference<List<X>> tref;
        protected LogUtil log = LogUtil.tagOf(getClass());
        protected int batchSize = 50;
        protected int preload = ItemTouchHelper.Callback.DEFAULT_DRAG_ANIMATION_DURATION;
        private int state = 2;
        public int openid = 1;
        protected EventBus eb = null;

        public Stream(String str, String str2, TypeReference<List<X>> typeReference) {
            this.key = str;
            this.path = str2;
            this.tref = typeReference;
        }

        private String _getPath() {
            return "stream/" + getPath();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void _onEOF(Frame frame) {
            this.log.d("%s EOF (%s <> %s)", this, frame.frameid, this.frame.frameid);
            if (this.state == 3 && this.frame != null && frame.frameid.equals(this.frame.frameid)) {
                onEOF();
                this.state = 2;
                this.frame = null;
                setChanged();
                notifyObservers();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void _onNext(Frame frame, String str, boolean z) {
            if (this.state == 1 && this.frame != null && frame.frameid.equals(this.frame.frameid)) {
                try {
                    List<X> list = (List) StreamProtocol.mapper.readValue(str, getClazz());
                    if (z) {
                        this._receivedf += list.size();
                    } else {
                        this._received += list.size();
                    }
                    onNext(list, z);
                } catch (IOException e) {
                    this.log.e(e, "Unable to read stream next data");
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void _onOpen(Frame frame) {
            this.log.d("%s OPEN", this);
            if (this.state == 1 || this.frame == null || !frame.frameid.equals(this.frame.frameid)) {
                return;
            }
            this.state = 1;
            setChanged();
            notifyObservers();
            this._received = 0;
            this._receivedf = 0;
            this._askedf = 0;
            this._asked = 0;
            onOpen();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void open(OpenStream openStream, EventBus eventBus) {
            this.openid++;
            this.frame = new Frame(openStream.setName(this.key), _getPath());
            this.eb = eventBus;
            if (this.eb != null) {
                this.eb.post(this.frame);
            }
        }

        public Stream ask() {
            if (this._asked < this.preload && this._asked - this._received < this.batchSize / 2) {
                next(this.batchSize);
            }
            if (this._askedf - this._receivedf < this.batchSize / 2) {
                nextF(this.batchSize);
            }
            return this;
        }

        public Frame close() {
            this.state = 3;
            setChanged();
            notifyObservers();
            Frame frame = new Frame(new CloseStream(this.key), _getPath());
            if (this.eb != null) {
                this.eb.post(frame);
            }
            return frame;
        }

        public void disable() {
            this.log.d("Stream Disabled %s", this);
            this.eb = null;
        }

        public TypeReference<List<X>> getClazz() {
            return this.tref;
        }

        public String getPath() {
            return this.path;
        }

        public boolean isOpen() {
            return this.frame != null && this.state == 1;
        }

        public Stream next(int i) {
            this.log.d("Ask for more next %s", Integer.valueOf(i));
            this._asked += i;
            if (this.eb != null) {
                this.eb.post(new Frame(new StreamNext(this.key, Integer.valueOf(i)), _getPath()));
            }
            return this;
        }

        public Stream nextF(int i) {
            this.log.d("Ask for more following %s", Integer.valueOf(i));
            this._askedf += i;
            if (this.eb != null) {
                this.eb.post(new Frame(new StreamNextF(this.key, Integer.valueOf(i)), _getPath()));
            }
            return this;
        }

        @Override // com.dingtaxi.common.protocol.stream.StreamBehavior
        public abstract void onEOF();

        @Override // com.dingtaxi.common.protocol.stream.StreamBehavior
        public abstract void onNext(List<X> list, boolean z);

        @Override // com.dingtaxi.common.protocol.stream.StreamBehavior
        public abstract void onOpen();

        public String toString() {
            return getClass().getSimpleName() + "@" + (this.frame == null ? -1 : this.frame.frameid.intValue()) + "(" + this.key + "," + this.state + "," + (this.eb == null ? "disabled" : "enabled") + ", openid=" + this.openid + ")";
        }

        public void with(StreamBehavior<X> streamBehavior) {
        }
    }

    static {
        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
    }

    public StreamProtocol(Context context) {
        super(context);
    }

    public static void openStream(Stream stream, OpenStream openStream) {
        openStream(stream, openStream, 0);
    }

    public static void openStream(final Stream stream, final OpenStream openStream, int i) {
        final int max;
        stream.log.d("Open stream %s", stream);
        Stream stream2 = streams.get(stream.key);
        if (stream2 == null || stream2 == stream || !stream2.isOpen()) {
            stream2 = stream;
        }
        if (i > 0) {
            max = i;
        } else {
            max = Math.max(stream2 == null ? 0 : stream2.openid, stream.openid);
        }
        ResponseListener responseListener = new ResponseListener() { // from class: com.dingtaxi.common.protocol.stream.StreamProtocol.1
            @Override // reactive.ResponseListener
            public void onError(Error error, Frame frame) {
                StreamProtocol.openStream(Stream.this, openStream, max);
            }

            @Override // reactive.ResponseListener
            public void onResponse(Event event, Frame frame) {
                StreamProtocol.openStream(Stream.this, openStream, max);
            }
        };
        if (stream2.openid < max) {
            stream.log.d("PS id %s is smaller than %s", stream2, Integer.valueOf(max));
            return;
        }
        if (stream2.isOpen()) {
            stream.log.d("PS stream %s is open", stream2);
            stream2.close().withResponseListener(responseListener);
            final Stream stream3 = stream2;
            stream3.addObserver(new Observer() { // from class: com.dingtaxi.common.protocol.stream.StreamProtocol.2
                @Override // java.util.Observer
                public void update(Observable observable, Object obj) {
                    Stream.this.log.d("Observer notified for state %s", Stream.this);
                    if (Stream.this.state == 2) {
                        Stream.this.deleteObserver(this);
                        if (Stream.this.openid == max) {
                            StreamProtocol.openStream(stream, openStream, stream.openid);
                        }
                    }
                }
            });
            return;
        }
        stream.log.d("PS stream %s is closed, open stream %s", stream2, stream);
        streams.put(stream.key, stream);
        stream.open(openStream, AppState.bus());
        streamsById.put(stream.frame.frameid, stream);
    }

    @Override // com.dingtaxi.common.protocol.BaseProtocol
    public void disable() {
        super.disable();
        Iterator<Stream> it = streamsById.values().iterator();
        while (it.hasNext()) {
            it.next().disable();
        }
        streamsById.clear();
        Iterator<Stream> it2 = streams.values().iterator();
        while (it2.hasNext()) {
            it2.next().disable();
        }
        streams.clear();
    }

    public void onEventBackgroundThread(CustomEvent customEvent) {
        Stream stream;
        Frame frame = customEvent.getFrame();
        if (frame.route.startsWith("stream/") && (stream = streamsById.get(frame.frameid)) != null) {
            stream._onNext(frame, frame.data.substring(1), frame.route.endsWith("/h"));
        }
    }

    public void onEventBackgroundThread(StreamEOF streamEOF) {
        Stream stream;
        Frame frame = streamEOF.getFrame();
        if (frame.route.startsWith("stream/") && (stream = streamsById.get(frame.frameid)) != null) {
            stream._onEOF(frame);
        }
    }

    public void onEventBackgroundThread(StreamStarted streamStarted) {
        this.log.d("Stream started");
        Frame frame = streamStarted.getFrame();
        if (frame.route.startsWith("stream/")) {
            Stream stream = streamsById.get(frame.frameid);
            this.log.d("Found stream by fid %s : %s", frame.frameid, stream);
            if (stream != null) {
                try {
                    stream._onOpen(frame);
                } catch (Exception e) {
                    this.log.e(e);
                }
            }
        }
    }
}
