package com.higgs.app.imkitsrc.websocket;

import android.support.annotation.NonNull;
import com.higgs.app.imkitsrc.util.LogHelper;
import com.higgs.app.imkitsrc.websocket.core.RxMoreObservables;
import com.higgs.app.imkitsrc.websocket.core.object.event.RxObjectEvent;
import com.higgs.app.imkitsrc.websocket.core.object.event.RxObjectEventConn;
import com.higgs.app.imkitsrc.websocket.core.object.event.RxObjectEventConnected;
import com.higgs.app.imkitsrc.websocket.core.object.event.RxObjectEventDisconnected;
import com.higgs.app.imkitsrc.websocket.core.object.event.RxObjectEventMessage;
import com.higgs.app.imkitsrc.websocket.model.out.BaseDataMessage;
import com.higgs.app.imkitsrc.websocket.model.out.PingMessage;
import com.higgs.app.imkitsrc.websocket.model.out.RegisteredMessage;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;

/* loaded from: classes.dex */
public class Socket {
    public static final Logger LOGGER = Logger.getLogger("Rx");
    private final BehaviorSubject<RxObjectEventConn> connectedAndRegistered;
    private final Observable<Object> connection;
    private final Observable<RxObjectEvent> events;
    private PingMessage pingMessage;

    @Nonnull
    private final Scheduler scheduler;
    private final Class<? extends BaseDataMessage> target;
    private final Object lock = new Object();
    private int counter = 0;

    /* loaded from: classes3.dex */
    private static class FilterRegisterMessage implements Func1<RxObjectEventMessage, Boolean> {
        private FilterRegisterMessage() {
        }

        @Override // rx.functions.Func1
        public Boolean call(RxObjectEventMessage rxObjectEventMessage) {
            return Boolean.valueOf(rxObjectEventMessage.message() instanceof RegisteredMessage);
        }
    }

    /* loaded from: classes3.dex */
    private class FlatMapToRegisterMessage implements Func1<RxObjectEventConnected, Observable<Object>> {
        RegisteredMessage registeredMessage;

        public FlatMapToRegisterMessage(RegisteredMessage registeredMessage) {
            this.registeredMessage = registeredMessage;
        }

        @Override // rx.functions.Func1
        public Observable<Object> call(RxObjectEventConnected rxObjectEventConnected) {
            LogHelper.getInstance().e("send message for registering....");
            return Observable.just(this.registeredMessage).compose(RxMoreObservables.sendMessage(rxObjectEventConnected));
        }
    }

    public Socket(@Nonnull SocketConnection socketConnection, @Nonnull Scheduler scheduler, RegisteredMessage registeredMessage, PingMessage pingMessage, Class<? extends BaseDataMessage> cls) {
        this.scheduler = scheduler;
        PublishSubject create = PublishSubject.create();
        this.connection = socketConnection.connection().lift(new OperatorDoOnNext(create)).lift(MoreObservables.ignoreNext()).compose(MoreObservables.behaviorRefCount());
        this.events = create;
        Observable filter = create.compose(MoreObservables.filterAndMap(RxObjectEventMessage.class)).filter(new FilterRegisterMessage());
        Observable<R> compose = create.compose(MoreObservables.filterAndMap(RxObjectEventDisconnected.class));
        this.pingMessage = pingMessage;
        this.connectedAndRegistered = BehaviorSubject.create((RxObjectEventConn) null);
        compose.map(new Func1<RxObjectEventDisconnected, RxObjectEventConn>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.1
            @Override // rx.functions.Func1
            public RxObjectEventConn call(RxObjectEventDisconnected rxObjectEventDisconnected) {
                LogHelper.getInstance().e("disconnectedMessage", rxObjectEventDisconnected.toString());
                return null;
            }
        }).mergeWith(filter).subscribe(this.connectedAndRegistered);
        create.compose(MoreObservables.filterAndMap(RxObjectEventConnected.class)).lift(LoggingObservables.loggingLift(LOGGER, "ConnectedEvent")).flatMap(new FlatMapToRegisterMessage(registeredMessage)).lift(LoggingObservables.loggingOnlyErrorLift(LOGGER, "SendRegisterEvent")).onErrorReturn(MoreObservables.throwableToIgnoreError()).subscribe();
        LOGGER.setLevel(Level.ALL);
        RxMoreObservables.logger.setLevel(Level.ALL);
        create.subscribe(LoggingObservables.logging(LOGGER, "Events"));
        this.connectedAndRegistered.subscribe(LoggingObservables.logging(LOGGER, "ConnectedAndRegistered"));
        this.target = cls;
    }

    static /* synthetic */ Observable.Transformer access$200() {
        return isConnected();
    }

    @NonNull
    private static Observable.Transformer<RxObjectEvent, RxObjectEventConn> isConnected() {
        return new Observable.Transformer<RxObjectEvent, RxObjectEventConn>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.10
            @Override // rx.functions.Func1
            public Observable<RxObjectEventConn> call(Observable<RxObjectEvent> observable) {
                return observable.filter(new Func1<RxObjectEvent, Boolean>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.10.2
                    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
                    @Override // rx.functions.Func1
                    public Boolean call(RxObjectEvent rxObjectEvent) {
                        return Boolean.valueOf(rxObjectEvent instanceof RxObjectEventConn);
                    }
                }).map(new Func1<RxObjectEvent, RxObjectEventConn>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.10.1
                    @Override // rx.functions.Func1
                    public RxObjectEventConn call(RxObjectEvent rxObjectEvent) {
                        return (RxObjectEventConn) rxObjectEvent;
                    }
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NonNull
    public Observable<Object> requestData(final RxObjectEventConn rxObjectEventConn, final Func1<String, Observable<Object>> func1) {
        return nextId().flatMap(new Func1<String, Observable<Object>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.7
            @Override // rx.functions.Func1
            public Observable<Object> call(String str) {
                return ((Observable) func1.call(str)).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NonNull
    public Observable<Object> requestData(final boolean z, final RxObjectEventConn rxObjectEventConn, final Func1<String, Object> func1) {
        return nextId().flatMap(new Func1<String, Observable<Object>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.9
            @Override // rx.functions.Func1
            public Observable<Object> call(String str) {
                final Object call = func1.call(str);
                Observable<Object> compose = Observable.defer(new Func0<Observable<Object>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.9.1
                    @Override // rx.functions.Func0, java.util.concurrent.Callable
                    public Observable<Object> call() {
                        return Observable.just(call);
                    }
                }).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
                return !z ? compose : Observable.combineLatest(Socket.this.events.doOnNext(new Action1<RxObjectEvent>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.9.2
                    @Override // rx.functions.Action1
                    public void call(RxObjectEvent rxObjectEvent) {
                        LogHelper.getInstance().e("waitEvent:" + rxObjectEvent);
                    }
                }).compose(MoreObservables.filterAndMap(RxObjectEventMessage.class)).compose(RxObjectEventMessage.filterAndMap(Socket.this.target)).first().timeout(7L, TimeUnit.SECONDS, Socket.this.scheduler), compose, new Func2<BaseDataMessage, Object, Object>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.9.3
                    @Override // rx.functions.Func2
                    public Object call(BaseDataMessage baseDataMessage, Object obj) {
                        return baseDataMessage;
                    }
                });
            }
        });
    }

    public Observable<RxObjectEventConn> connectedAndRegistered() {
        return this.connectedAndRegistered;
    }

    public Observable<Object> connection() {
        return this.connection;
    }

    public Observable<RxObjectEvent> events() {
        return this.events;
    }

    @Nonnull
    public Observable<String> nextId() {
        return Observable.create(new Observable.OnSubscribe<String>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.5
            @Override // rx.functions.Action1
            public void call(Subscriber<? super String> subscriber) {
                int i;
                synchronized (Socket.this.lock) {
                    i = Socket.this.counter;
                    Socket.this.counter++;
                }
                subscriber.onNext(String.valueOf(i));
                subscriber.onCompleted();
            }
        });
    }

    @Nonnull
    public Observable<Object> sendMessageOnceWhenConnected(final Func1<String, Observable<Object>> func1) {
        return this.connectedAndRegistered.compose(isConnected()).first().flatMap(new Func1<RxObjectEventConn, Observable<Object>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.6
            @Override // rx.functions.Func1
            public Observable<Object> call(RxObjectEventConn rxObjectEventConn) {
                return Socket.this.requestData(rxObjectEventConn, func1);
            }
        });
    }

    @NonNull
    public Observable<Object> sendMessageOnceWhenConnectedV2(final boolean z, final Func1<String, Object> func1) {
        return this.connectedAndRegistered.compose(isConnected()).first().flatMap(new Func1<RxObjectEventConn, Observable<Object>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.8
            @Override // rx.functions.Func1
            public Observable<Object> call(RxObjectEventConn rxObjectEventConn) {
                return Socket.this.requestData(z, rxObjectEventConn, func1);
            }
        });
    }

    public void sendPingEvery5seconds() {
        Observable.interval(5L, TimeUnit.SECONDS, this.scheduler).flatMap(new Func1<Long, Observable<?>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.4
            @Override // rx.functions.Func1
            public Observable<?> call(Long l) {
                return Socket.this.connectedAndRegistered.compose(Socket.access$200()).first().flatMap(new Func1<RxObjectEventConn, Observable<?>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.4.1
                    @Override // rx.functions.Func1
                    public Observable<?> call(RxObjectEventConn rxObjectEventConn) {
                        return Observable.just(Socket.this.pingMessage).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
                    }
                });
            }
        }).subscribe();
    }

    public Subscription sendPingInterval(int i) {
        return Observable.combineLatest(Observable.interval(i, TimeUnit.SECONDS, this.scheduler), this.connectedAndRegistered, new Func2<Long, RxObjectEventConn, RxObjectEventConn>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.3
            @Override // rx.functions.Func2
            public RxObjectEventConn call(Long l, RxObjectEventConn rxObjectEventConn) {
                return rxObjectEventConn;
            }
        }).compose(isConnected()).flatMap(new Func1<RxObjectEventConn, Observable<?>>() { // from class: com.higgs.app.imkitsrc.websocket.Socket.2
            @Override // rx.functions.Func1
            public Observable<?> call(RxObjectEventConn rxObjectEventConn) {
                return Observable.just(Socket.this.pingMessage).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
            }
        }).subscribe();
    }
}
