package com.pingan.education.classroom.base.data.topic.core;

import android.text.TextUtils;
import com.blankj.utilcode.util.FileUtils;
import com.blankj.utilcode.util.ObjectUtils;
import com.blankj.utilcode.util.StringUtils;
import com.pingan.education.classroom.base.data.api.DeleteFileInPC;
import com.pingan.education.classroom.base.data.api.UploadFileToPC;
import com.pingan.education.classroom.base.data.entity.EndPoint;
import com.pingan.education.classroom.base.data.entity.FileEntity;
import com.pingan.education.classroom.base.data.topic.ListFileInDir;
import com.pingan.education.classroom.base.data.topic.NotifyStateOnline;
import com.pingan.education.classroom.base.data.topic.core.data.AckJSON;
import com.pingan.education.classroom.base.data.topic.core.data.AckTopic;
import com.pingan.education.classroom.base.data.topic.core.data.Payload;
import com.pingan.education.classroom.base.data.topic.core.data.PubJSON;
import com.pingan.education.classroom.base.data.topic.core.data.SubscribeToken;
import com.pingan.education.classroom.base.data.topic.core.data.Topic;
import com.pingan.education.classroom.base.data.topic.core.data.TopicCharacter;
import com.pingan.education.classroom.base.data.topic.core.data.TopicConfig;
import com.pingan.education.classroom.base.util.MQTTUtils;
import com.pingan.education.core.CoreConfig;
import com.pingan.education.core.http.api.listener.OnProgressListener;
import com.pingan.education.core.http.core.exception.StatusCodeException;
import com.pingan.education.core.log.ELog;
import com.pingan.education.core.utils.RxTimerUtils;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import retrofit2.Response;

/* loaded from: classes3.dex */
public class MQTT {
    private static final int WAIT_PC_CONNECT_TIME = 500;
    private static final int WAIT_PC_CONNECT_TIMES = 10;
    private static Semaphore sSemaphore;
    private MqttAndroidClient mClient;
    private String mCurrentTeacherId;
    private PublishSubject<Boolean> mPCOnlinePublish;
    private Disposable mTimerDisposable;
    private static final String TAG = MQTT.class.getSimpleName();
    private static MQTT sInstance = new MQTT();
    private static Executor sPublishExecutor = Executors.newCachedThreadPool();
    private static CompositeDisposable sCompositeDisposable = new CompositeDisposable();
    private Option mOption = new Option();
    private ConcurrentHashMap<String, SubscribeToken> mTokenMap = new ConcurrentHashMap<>();
    private ArrayList<EndPoint> mPCOnlineList = new ArrayList<>();
    private ArrayList<EndPoint> mStudentOnlineList = new ArrayList<>();
    private ArrayList<EndPoint> mTeacherOnlineList = new ArrayList<>();
    private HashMap<String, OnSelfOnlineStateListener> mSelfListeners = new HashMap<>();
    private HashMap<String, OnOnlineStateListener> mPCListeners = new HashMap<>();
    private HashMap<String, OnOnlineStateListener> mStudentListeners = new HashMap<>();
    private HashMap<String, OnOnlineStateListener> mTeacherListeners = new HashMap<>();
    private ConcurrentHashMap<String, Integer> mRetainedTopicMap = new ConcurrentHashMap<>();

    /* loaded from: classes3.dex */
    public static class Option {
        private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
        private static final int DEFAULT_HEARTBEAT_INTERVAL = 30;
        private static final int DEFAULT_PUBLISH_TIMEOUT = 10;
        private String brokerIp;
        private String brokerUri;
        private String character;
        private String clientId;
        private int heartBeatInterval = 30;
        private int connectTimeout = 5000;
        private int pubTimeout = 10;

        public Option setBrokerIp(String str) {
            this.brokerIp = str;
            this.brokerUri = MQTTUtils.getBrokerTcp(str);
            return this;
        }

        public Option setCharacter(String str, String str2) {
            this.character = str;
            this.clientId = MQTTUtils.getClientId(str, str2);
            return this;
        }

        public Option setConnectTimeout(int i) {
            this.connectTimeout = i;
            return this;
        }

        public Option setHeartBeatInterval(int i) {
            this.heartBeatInterval = i;
            return this;
        }

        public Option setPubTimeout(int i) {
            this.pubTimeout = i;
            return this;
        }
    }

    private MQTT() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Boolean> afterConnected(Observable<Boolean> observable, final boolean z) {
        return observable.zipWith(this.mPCOnlinePublish, new BiFunction<Boolean, Boolean, Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.7
            @Override // io.reactivex.functions.BiFunction
            public Boolean apply(Boolean bool, Boolean bool2) throws Exception {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            }
        }).flatMap(new Function<Boolean, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.6
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Boolean bool) throws Exception {
                if (bool.booleanValue()) {
                    return MQTT.this.publishTopicAppendId(MQTT.this.mOption.character, TopicCharacter.ALL, new NotifyStateOnline(true));
                }
                throw new MQTTException("connect failed!");
            }
        }).timeout(this.mOption.connectTimeout, TimeUnit.MILLISECONDS).doOnNext(new Consumer<Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.5
            @Override // io.reactivex.functions.Consumer
            public void accept(Boolean bool) throws Exception {
                MQTT.this.handleSelfOnlineState(true, z);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearOnlineList() {
        this.mPCOnlineList.clear();
        this.mTeacherOnlineList.clear();
        this.mStudentOnlineList.clear();
    }

    private Observable<Boolean> connectToBroker() {
        return Observable.create(new ObservableOnSubscribe<Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.8
            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter<Boolean> observableEmitter) throws Exception {
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setAutomaticReconnect(true);
                mqttConnectOptions.setCleanSession(true);
                mqttConnectOptions.setKeepAliveInterval(MQTT.this.mOption.heartBeatInterval);
                mqttConnectOptions.setConnectionTimeout(MQTT.this.mOption.connectTimeout);
                mqttConnectOptions.setMqttVersion(4);
                NotifyStateOnline notifyStateOnline = new NotifyStateOnline(false);
                TopicConfig topicConfig = (TopicConfig) NotifyStateOnline.class.getAnnotation(TopicConfig.class);
                notifyStateOnline.setPubClientId(MQTT.this.mOption.clientId);
                mqttConnectOptions.setWill(MQTTUtils.formatTopicName(MQTT.this.mOption.character, TopicCharacter.ALL, String.format("%s/%s", topicConfig.name(), MQTT.this.mOption.clientId), false), notifyStateOnline.getPubRawData(), notifyStateOnline.getQoS(), true);
                try {
                    MQTT.this.mClient.connect(mqttConnectOptions).waitForCompletion(MQTT.this.mOption.connectTimeout);
                    ELog.i(MQTT.TAG, "connect onSuccess");
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onNext(true);
                    observableEmitter.onComplete();
                } catch (Throwable th) {
                    ELog.e(MQTT.TAG, "connect onFailure:" + th + " | ip:" + MQTT.this.mOption.brokerIp);
                    if (!(th instanceof MqttException)) {
                        if (observableEmitter.isDisposed()) {
                            return;
                        }
                        observableEmitter.onError(new MQTTException(th.getMessage()));
                        return;
                    }
                    int reasonCode = ((MqttException) th).getReasonCode();
                    if (reasonCode == 32100) {
                        if (observableEmitter.isDisposed()) {
                            return;
                        }
                        observableEmitter.onNext(true);
                        observableEmitter.onComplete();
                        return;
                    }
                    if (reasonCode != 32110) {
                        if (observableEmitter.isDisposed()) {
                            return;
                        }
                        observableEmitter.onError(new MQTTException("connect failed!"));
                        return;
                    }
                    try {
                        Thread.currentThread();
                        Thread.sleep(500L);
                        if (observableEmitter.isDisposed()) {
                            return;
                        }
                        observableEmitter.onNext(true);
                        observableEmitter.onComplete();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void execute(Observable<T> observable) {
        sCompositeDisposable.add((Disposable) observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableObserver<T>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.31
            @Override // io.reactivex.Observer
            public void onComplete() {
            }

            @Override // io.reactivex.Observer
            public void onError(Throwable th) {
            }

            @Override // io.reactivex.Observer
            public void onNext(T t) {
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public <T> void execute(Observable<T> observable, Consumer<T> consumer) {
        sCompositeDisposable.add(observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(consumer, new Consumer<Throwable>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.32
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
            }
        }));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> void execute(Observable<T> observable, Consumer<T> consumer, Consumer<Throwable> consumer2) {
        sCompositeDisposable.add(observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(consumer, consumer2));
    }

    public static MQTT get() {
        return sInstance;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMessage(String str, MqttMessage mqttMessage) {
        SubscribeToken subscribeToken = null;
        Iterator<String> it = this.mTokenMap.keySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (MqttTopic.isMatched(next, str)) {
                subscribeToken = this.mTokenMap.get(next);
                break;
            }
        }
        if (subscribeToken == null || isEmptyPayload(mqttMessage.getPayload())) {
            return;
        }
        if (subscribeToken.callBack != null) {
            subscribeToken.callBack.onReceived(MQTTUtils.toPayload(mqttMessage.getPayload(), subscribeToken.topicClass, subscribeToken.isConvertToAck));
        }
        if (subscribeToken.autoUnsubscribe) {
            execute(unsubscribeTopic(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleOnlineState(boolean z, String str, EndPoint endPoint) {
        ELog.d(TAG, "isOnline " + z + "id " + str);
        if (endPoint.isPC()) {
            if (!z) {
                this.mPCOnlineList.remove(endPoint);
            } else if (!this.mPCOnlineList.contains(endPoint)) {
                this.mPCOnlineList.add(endPoint);
            }
            if (this.mOption.clientId.equals(str)) {
                return;
            }
            for (OnOnlineStateListener onOnlineStateListener : this.mPCListeners.values()) {
                if (z) {
                    onOnlineStateListener.onConnect(endPoint);
                } else {
                    onOnlineStateListener.onDisconnect(endPoint);
                }
            }
            return;
        }
        if (endPoint.isTeacher()) {
            if (!z) {
                this.mTeacherOnlineList.remove(endPoint);
            } else if (!this.mTeacherOnlineList.contains(endPoint)) {
                this.mTeacherOnlineList.add(endPoint);
            }
            if (this.mOption.clientId.equals(str)) {
                return;
            }
            for (OnOnlineStateListener onOnlineStateListener2 : this.mTeacherListeners.values()) {
                if (z) {
                    onOnlineStateListener2.onConnect(endPoint);
                } else {
                    onOnlineStateListener2.onDisconnect(endPoint);
                }
            }
            return;
        }
        if (endPoint.isStudent()) {
            if (!z) {
                this.mStudentOnlineList.remove(endPoint);
            } else if (!this.mStudentOnlineList.contains(endPoint)) {
                this.mStudentOnlineList.add(endPoint);
            }
            if (this.mOption.clientId.equals(str)) {
                return;
            }
            for (OnOnlineStateListener onOnlineStateListener3 : this.mStudentListeners.values()) {
                if (z) {
                    onOnlineStateListener3.onConnect(endPoint);
                } else {
                    onOnlineStateListener3.onDisconnect(endPoint);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleSelfOnlineState(boolean z, boolean z2) {
        for (OnSelfOnlineStateListener onSelfOnlineStateListener : this.mSelfListeners.values()) {
            if (z) {
                onSelfOnlineStateListener.onConnect(new EndPoint(this.mOption.character, this.mOption.clientId), z2);
            } else {
                onSelfOnlineStateListener.onDisconnect(new EndPoint(this.mOption.character, this.mOption.clientId));
            }
        }
    }

    private boolean isEmptyPayload(byte[] bArr) {
        return bArr == null || bArr.length == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Boolean> publishTopic(final String str, final byte[] bArr, final int i, final boolean z) {
        return Observable.create(new ObservableOnSubscribe<Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.18
            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter<Boolean> observableEmitter) throws Exception {
                if (!MQTT.this.isConnected()) {
                    ELog.e(MQTT.TAG, "publish topic : publish failed need connect first");
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(new MQTTException("you need connect to broker first!"));
                    return;
                }
                if (TextUtils.isEmpty(str)) {
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(new MQTTException("invalid sub or topic"));
                    return;
                }
                boolean z2 = false;
                try {
                    try {
                        if (MQTT.sSemaphore != null) {
                            z2 = MQTT.sSemaphore.tryAcquire(MQTT.this.mOption.pubTimeout, TimeUnit.SECONDS);
                        }
                    } catch (Throwable th) {
                        ELog.e(MQTT.TAG, String.format("public topic : publish failed:%s", th));
                        if (!observableEmitter.isDisposed()) {
                            observableEmitter.onError(new MQTTException(th.getMessage()));
                        }
                        if (0 == 0 || MQTT.sSemaphore == null) {
                            return;
                        }
                    }
                    if (!z2) {
                        if (!observableEmitter.isDisposed()) {
                            observableEmitter.onError(new MQTTException("acquire semaphore failed!"));
                        }
                        if (!z2 || MQTT.sSemaphore == null) {
                            return;
                        }
                        MQTT.sSemaphore.release();
                        return;
                    }
                    ELog.d(MQTT.TAG, String.format("publish topic : topicName = %s | qos = %s | retained = %s ", str, Integer.valueOf(i), Boolean.valueOf(z)));
                    MQTT.this.mClient.publish(str, bArr, i, z).waitForCompletion(MQTT.this.mOption.pubTimeout * 1000);
                    ELog.d(MQTT.TAG, "public topic : publish success!");
                    if (z) {
                        if (ObjectUtils.isEmpty(bArr)) {
                            MQTT.this.mRetainedTopicMap.remove(str);
                        } else if (!MQTT.this.mRetainedTopicMap.containsKey(str)) {
                            MQTT.this.mRetainedTopicMap.put(str, Integer.valueOf(i));
                        }
                    }
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onNext(true);
                        observableEmitter.onComplete();
                    }
                    if (!z2 || MQTT.sSemaphore == null) {
                        return;
                    }
                    MQTT.sSemaphore.release();
                } catch (Throwable th2) {
                    if (0 != 0 && MQTT.sSemaphore != null) {
                        MQTT.sSemaphore.release();
                    }
                    throw th2;
                }
            }
        }).subscribeOn(Schedulers.from(sPublishExecutor)).observeOn(AndroidSchedulers.mainThread());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <Body, P extends PubJSON<Body>, T extends Topic<P>> Observable<Boolean> subscribeTopic(final String str, final int i, final Class<T> cls, final PayloadCallBack<Payload<P>> payloadCallBack, final boolean z, final boolean z2) {
        return Observable.create(new ObservableOnSubscribe<Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.13
            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter<Boolean> observableEmitter) throws Exception {
                if (!MQTT.this.isConnected()) {
                    ELog.e(MQTT.TAG, "subscribe topic : subscribe failed need connect first");
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(new MQTTException("you need connect to broker first!"));
                    return;
                }
                try {
                    ELog.d(MQTT.TAG, String.format("subscribe topic : topicName = %s", str));
                    if (MQTT.this.mTokenMap.containsKey(str)) {
                        MQTT.this.mTokenMap.remove(str);
                    }
                    MQTT.this.mTokenMap.put(str, new SubscribeToken(str, i, cls, payloadCallBack, z, z2));
                    MQTT.this.mClient.subscribe(str, i).waitForCompletion();
                    ELog.d(MQTT.TAG, "subscribe topic : subscribe success");
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onNext(true);
                    observableEmitter.onComplete();
                } catch (Throwable th) {
                    ELog.e(MQTT.TAG, String.format("subscribe topic : subscribe failed %s", th));
                    MQTT.this.mTokenMap.remove(str);
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(new MQTTException(th.getMessage()));
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    private <Body, P extends PubJSON<Body>, T extends Topic<P>> Observable<Boolean> subscribeTopic(final String str, final String str2, Class<T> cls, final PayloadCallBack<Payload<P>> payloadCallBack, final boolean z, final boolean z2, final String str3) {
        return Observable.just(cls).flatMap(new Function<Class<T>, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.12
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Class<T> cls2) throws Exception {
                TopicConfig topicConfig = (TopicConfig) cls2.getAnnotation(TopicConfig.class);
                if (topicConfig != null) {
                    return MQTT.this.subscribeTopic(z2 ? MQTTUtils.formatTopicNameWithAck(str, str2, topicConfig.name(), str3) : MQTTUtils.formatTopicName(str, str2, topicConfig.name(), topicConfig.appendPlus()), topicConfig.QoS(), cls2, payloadCallBack, z, z2);
                }
                throw new MQTTException("subscribe topic failed!");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Boolean> unsubscribeTopic(final String str) {
        return Observable.create(new ObservableOnSubscribe<Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.15
            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter<Boolean> observableEmitter) throws Exception {
                if (!MQTT.this.mTokenMap.containsKey(str)) {
                    ELog.d(MQTT.TAG, String.format("unsubscribe topic success: topicName = %s", str));
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onNext(true);
                    observableEmitter.onComplete();
                    return;
                }
                if (!MQTT.this.isConnected()) {
                    ELog.e(MQTT.TAG, "unsubscribe topic : unsubscribe failed need connect first");
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(new MQTTException("you need connect to broker first!"));
                    return;
                }
                MQTT.this.mTokenMap.remove(str);
                try {
                    ELog.d(MQTT.TAG, String.format(" unsubscribe topic : topicName = %s", str));
                    MQTT.this.mClient.unsubscribe(str).waitForCompletion();
                    ELog.d(MQTT.TAG, "unsubscribe topic success");
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onNext(true);
                    observableEmitter.onComplete();
                } catch (Throwable th) {
                    ELog.e(MQTT.TAG, String.format("unsubscribe topic failed:%s", th));
                    if (observableEmitter.isDisposed()) {
                        return;
                    }
                    observableEmitter.onError(new MQTTException(th.getMessage()));
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    public void checkConnect(final OnConnectStateListener onConnectStateListener) {
        if (StringUtils.isEmpty(get().getPCClientId())) {
            this.mTimerDisposable = RxTimerUtils.take(500L, new RxTimerUtils.IRxNext() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.10
                @Override // com.pingan.education.core.utils.RxTimerUtils.IRxNext
                public void doNext(Integer num) {
                    if (!StringUtils.isEmpty(MQTT.get().getPCClientId())) {
                        onConnectStateListener.onConnectSuccess();
                        if (MQTT.this.mTimerDisposable != null) {
                            MQTT.this.mTimerDisposable.dispose();
                            return;
                        }
                        return;
                    }
                    if (num.intValue() >= 9) {
                        onConnectStateListener.onConnectFailed();
                        if (MQTT.this.mTimerDisposable != null) {
                            MQTT.this.mTimerDisposable.dispose();
                        }
                    }
                }
            }, 10);
        } else {
            onConnectStateListener.onConnectSuccess();
        }
    }

    public void cleanState() {
        this.mSelfListeners.clear();
        this.mPCListeners.clear();
        this.mTeacherListeners.clear();
        this.mStudentListeners.clear();
        sSemaphore = null;
        sCompositeDisposable.clear();
        this.mRetainedTopicMap.clear();
        this.mTokenMap.clear();
        clearOnlineList();
    }

    public synchronized Observable<Boolean> connect() {
        return afterConnected(Observable.concatArray(connectToBroker(), subscribeTopic(TopicCharacter.TEACHER, TopicCharacter.ALL, NotifyStateOnline.class, new PayloadCallBack<Payload<PubJSON<NotifyStateOnline.Pub>>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.2
            @Override // com.pingan.education.classroom.base.data.topic.core.PayloadCallBack
            public void onReceived(Payload<PubJSON<NotifyStateOnline.Pub>> payload) {
                MQTT.this.handleOnlineState(payload.getJSON().getBody().online, payload.getClientId(), new EndPoint(TopicCharacter.TEACHER, payload.getClientId()));
            }
        }), subscribeTopic(TopicCharacter.PC, TopicCharacter.ALL, NotifyStateOnline.class, new PayloadCallBack<Payload<PubJSON<NotifyStateOnline.Pub>>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.3
            @Override // com.pingan.education.classroom.base.data.topic.core.PayloadCallBack
            public void onReceived(Payload<PubJSON<NotifyStateOnline.Pub>> payload) {
                if (MQTT.this.mPCOnlinePublish != null) {
                    MQTT.this.mPCOnlinePublish.onNext(Boolean.valueOf(payload.getJSON().getBody().online));
                }
                MQTT.this.handleOnlineState(payload.getJSON().getBody().online, payload.getClientId(), new EndPoint(TopicCharacter.PC, payload.getClientId()));
            }
        }), subscribeTopic(TopicCharacter.STUDENT, TopicCharacter.ALL, NotifyStateOnline.class, new PayloadCallBack<Payload<PubJSON<NotifyStateOnline.Pub>>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.4
            @Override // com.pingan.education.classroom.base.data.topic.core.PayloadCallBack
            public void onReceived(Payload<PubJSON<NotifyStateOnline.Pub>> payload) {
                MQTT.this.handleOnlineState(payload.getJSON().getBody().online, payload.getClientId(), new EndPoint(TopicCharacter.STUDENT, payload.getClientId()));
            }
        })).skip(3L), false);
    }

    public Observable<Boolean> deleteFile(String str) {
        return new DeleteFileInPC(String.format("http://%s%s", this.mOption.brokerIp, str)).build().toObservable().flatMap(new Function<Void, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.27
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Void r2) throws Exception {
                return Observable.just(true);
            }
        }).onErrorReturn(new Function<Throwable, Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.26
            @Override // io.reactivex.functions.Function
            public Boolean apply(Throwable th) throws Exception {
                return Boolean.valueOf((th instanceof StatusCodeException) && ((StatusCodeException) th).getStatusCode() == 404);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    public void disconnect() {
        if (!isConnected()) {
            cleanState();
            return;
        }
        NotifyStateOnline notifyStateOnline = new NotifyStateOnline(false);
        notifyStateOnline.setName(String.format("%s/%s", notifyStateOnline.getName(), this.mOption.clientId));
        notifyStateOnline.setPubClientId(this.mOption.clientId);
        execute(publishTopic(TopicCharacter.ALL, notifyStateOnline).observeOn(AndroidSchedulers.mainThread()).doFinally(new Action() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.9
            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                MQTT.this.cleanState();
                try {
                    MQTT.this.mClient.disconnect();
                } catch (IllegalArgumentException e) {
                }
            }
        }));
    }

    public String getBrokerIP() {
        return this.mOption != null ? this.mOption.brokerIp : "";
    }

    public String getPCClientId() {
        return !this.mPCOnlineList.isEmpty() ? this.mPCOnlineList.get(0).getClientId() : "";
    }

    public String getPCDownloadURL(String str) {
        return String.format("http://%s%s", this.mOption.brokerIp, str);
    }

    public String getStudentClientId(String str) {
        return MQTTUtils.getClientId(TopicCharacter.STUDENT, str);
    }

    public ArrayList<EndPoint> getStudentOnlineList() {
        return this.mStudentOnlineList;
    }

    public String getTeacherClientId() {
        String str = "";
        int i = 0;
        if (this.mTeacherOnlineList.size() == 1) {
            String clientId = this.mTeacherOnlineList.get(0).getClientId();
            ELog.i(TAG, "当前上课老师的Id:" + clientId);
            return clientId;
        }
        if (!this.mTeacherOnlineList.isEmpty() && !TextUtils.isEmpty(this.mCurrentTeacherId)) {
            while (true) {
                int i2 = i;
                if (i2 >= this.mTeacherOnlineList.size()) {
                    break;
                }
                if (this.mTeacherOnlineList.get(i2).getClientId().equals(MQTTUtils.getClientId(TopicCharacter.TEACHER, this.mCurrentTeacherId))) {
                    str = MQTTUtils.getClientId(TopicCharacter.TEACHER, this.mCurrentTeacherId);
                }
                i = i2 + 1;
            }
        }
        ELog.i(TAG, "当前上课老师的Id:" + str);
        return str;
    }

    public ArrayList<EndPoint> getTeacherOnlineList() {
        return this.mTeacherOnlineList;
    }

    public synchronized void init(Option option) {
        sSemaphore = new Semaphore(1, true);
        this.mPCOnlinePublish = PublishSubject.create();
        this.mOption = option;
        this.mClient = new MqttAndroidClient(CoreConfig.getContext(), this.mOption.brokerUri, this.mOption.clientId);
        this.mClient.setTraceEnabled(false);
        this.mClient.setCallback(new MqttCallbackExtended() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.1
            @Override // org.eclipse.paho.client.mqttv3.MqttCallbackExtended
            public void connectComplete(boolean z, String str) {
                ELog.i(MQTT.TAG, String.format("connectComplete reconnect = %s , serverURI = %s", Boolean.valueOf(z), str));
                if (z) {
                    ArrayList arrayList = new ArrayList();
                    final ArrayList arrayList2 = new ArrayList();
                    for (SubscribeToken subscribeToken : MQTT.this.mTokenMap.values()) {
                        Observable subscribeTopic = MQTT.this.subscribeTopic(subscribeToken.topic, subscribeToken.qos, subscribeToken.topicClass, subscribeToken.callBack, subscribeToken.autoUnsubscribe, subscribeToken.isConvertToAck);
                        if (subscribeToken.topicClass.equals(NotifyStateOnline.class)) {
                            arrayList.add(subscribeTopic);
                        } else {
                            arrayList2.add(subscribeTopic);
                        }
                    }
                    MQTT.this.execute(MQTT.this.afterConnected(Observable.concat(arrayList).skip(2L), true), new Consumer<Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.1.1
                        @Override // io.reactivex.functions.Consumer
                        public void accept(Boolean bool) throws Exception {
                            ELog.i(MQTT.TAG, String.format("re sub topics size = %s", Integer.valueOf(arrayList2.size())));
                            MQTT.this.execute(Observable.concat(arrayList2));
                        }
                    });
                }
            }

            @Override // org.eclipse.paho.client.mqttv3.MqttCallback
            public void connectionLost(Throwable th) {
                ELog.w(MQTT.TAG, String.format("connectionLost : cause = %s", th));
                MQTT.this.clearOnlineList();
                MQTT.this.handleSelfOnlineState(false, false);
            }

            @Override // org.eclipse.paho.client.mqttv3.MqttCallback
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                ELog.d(MQTT.TAG, "deliveryComplete:" + iMqttDeliveryToken);
            }

            @Override // org.eclipse.paho.client.mqttv3.MqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                ELog.i(MQTT.TAG, String.format("subscribe topic : messageArrived topicName = %s | message = %s | qos = %s | retained = %s ", str, mqttMessage, Integer.valueOf(mqttMessage.getQos()), Boolean.valueOf(mqttMessage.isRetained())));
                MQTT.this.handleMessage(str, mqttMessage);
            }
        });
    }

    public boolean isConnected() {
        try {
            if (this.mClient != null) {
                return this.mClient.isConnected();
            }
            return false;
        } catch (Exception e) {
            return false;
        }
    }

    public Observable<ArrayList<FileEntity>> listFilesInDir(String str, String str2) {
        return publishTopicWithAck(str, new ListFileInDir(str2)).flatMap(new Function<Payload<AckJSON<ListFileInDir.Ack>>, ObservableSource<ArrayList<FileEntity>>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.28
            @Override // io.reactivex.functions.Function
            public ObservableSource<ArrayList<FileEntity>> apply(Payload<AckJSON<ListFileInDir.Ack>> payload) throws Exception {
                return payload.getJSON().isSuccess() ? Observable.just(payload.getJSON().getBody().list) : Observable.just(new ArrayList());
            }
        });
    }

    public Observable<Boolean> publishTopic(final String str, Topic topic) {
        return Observable.just(topic).flatMap(new Function<Topic, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.17
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Topic topic2) throws Exception {
                topic2.setPubClientId(MQTT.this.mOption.clientId);
                return MQTT.this.publishTopic(MQTTUtils.formatPublishTopicName(MQTT.this.mOption.character, str, topic2.getName()), topic2.getPubRawData(), topic2.getQoS(), topic2.isRetained());
            }
        });
    }

    public Observable<Boolean> publishTopicAppendId(final String str, final String str2, Topic topic) {
        return Observable.just(topic).flatMap(new Function<Topic, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.16
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Topic topic2) throws Exception {
                topic2.setPubClientId(MQTT.this.mOption.clientId);
                return MQTT.this.publishTopic(MQTTUtils.formatPublishTopicNameWithId(str, str2, topic2.getName(), MQTT.this.mOption.clientId), topic2.getPubRawData(), topic2.getQoS(), topic2.isRetained());
            }
        });
    }

    public <P extends PubJSON, A extends AckJSON, T extends AckTopic<P, A>> Observable<Payload<A>> publishTopicWithAck(final String str, final T t) {
        final PublishSubject create = PublishSubject.create();
        return Observable.concat(subscribeTopic(this.mOption.character, str, t.getClass(), new PayloadCallBack<Payload<A>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.24
            @Override // com.pingan.education.classroom.base.data.topic.core.PayloadCallBack
            public void onReceived(Payload<A> payload) {
                create.onNext(payload);
                create.onComplete();
            }
        }, true, true, t.getPubMessageId()), publishTopic(str, t)).skip(1L).zipWith((ObservableSource) create, (BiFunction) new BiFunction<Boolean, Payload<A>, Payload<A>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.23
            @Override // io.reactivex.functions.BiFunction
            public Payload<A> apply(Boolean bool, Payload<A> payload) throws Exception {
                if (bool.booleanValue()) {
                    return payload;
                }
                throw new MQTTException("publish topic with ack failed!");
            }
        }).timeout(this.mOption.pubTimeout, TimeUnit.SECONDS).doOnError(new Consumer<Throwable>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.22
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                if (th instanceof TimeoutException) {
                    MQTT.this.execute(MQTT.this.unsubscribeTopic(MQTTUtils.formatTopicNameWithAck(MQTT.this.mOption.character, str, ((TopicConfig) t.getClass().getAnnotation(TopicConfig.class)).name(), t.getPubMessageId())));
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    public void registerOnlineListener(OnSelfOnlineStateListener onSelfOnlineStateListener) {
        if (onSelfOnlineStateListener != null) {
            this.mSelfListeners.put(onSelfOnlineStateListener.toString(), onSelfOnlineStateListener);
        }
    }

    public void registerPCOnlineListener(OnOnlineStateListener onOnlineStateListener) {
        if (onOnlineStateListener != null) {
            this.mPCListeners.put(onOnlineStateListener.toString(), onOnlineStateListener);
        }
    }

    public void registerStudentOnlineListener(OnOnlineStateListener onOnlineStateListener) {
        if (onOnlineStateListener != null) {
            this.mStudentListeners.put(onOnlineStateListener.toString(), onOnlineStateListener);
        }
    }

    public void registerTeacherOnlineListener(OnOnlineStateListener onOnlineStateListener) {
        if (onOnlineStateListener != null) {
            this.mTeacherListeners.put(onOnlineStateListener.toString(), onOnlineStateListener);
        }
    }

    public Observable<Boolean> removeAllRetainedTopic() {
        if (this.mRetainedTopicMap.isEmpty()) {
            return Observable.just(true);
        }
        ELog.i(TAG, String.format("remove all retained topic: size = %s", Integer.valueOf(this.mRetainedTopicMap.size())));
        ArrayList arrayList = new ArrayList();
        TopicConfig topicConfig = (TopicConfig) NotifyStateOnline.class.getAnnotation(TopicConfig.class);
        for (Map.Entry<String, Integer> entry : this.mRetainedTopicMap.entrySet()) {
            if (!entry.getKey().contains(topicConfig.name())) {
                arrayList.add(publishTopic(entry.getKey(), new byte[0], entry.getValue().intValue(), true));
            }
        }
        return Observable.merge(arrayList).last(true).toObservable();
    }

    public Observable<Boolean> removeRetainedTopic(final String str, Class<? extends Topic> cls) {
        return Observable.just(cls).flatMap(new Function<Class<? extends Topic>, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.29
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Class<? extends Topic> cls2) throws Exception {
                TopicConfig topicConfig = (TopicConfig) cls2.getAnnotation(TopicConfig.class);
                if (topicConfig == null) {
                    throw new IllegalArgumentException("topic config invalid!");
                }
                String formatTopicName = MQTTUtils.formatTopicName(MQTT.this.mOption.character, str, topicConfig.name(), topicConfig.appendPlus());
                ELog.d(MQTT.TAG, String.format("remove retained topic: topicName= %s", formatTopicName));
                return MQTT.this.publishTopic(formatTopicName, new byte[0], topicConfig.QoS(), topicConfig.isRetained());
            }
        });
    }

    public Observable<Boolean> removeRetainedTopicAppendId(final String str, Class<? extends Topic> cls) {
        return Observable.just(cls).flatMap(new Function<Class<? extends Topic>, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.30
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Class<? extends Topic> cls2) throws Exception {
                TopicConfig topicConfig = (TopicConfig) cls2.getAnnotation(TopicConfig.class);
                if (topicConfig == null) {
                    throw new IllegalArgumentException("topic config invalid!");
                }
                String formatPublishTopicNameWithId = MQTTUtils.formatPublishTopicNameWithId(MQTT.this.mOption.character, str, topicConfig.name(), MQTT.this.mOption.clientId);
                ELog.d(MQTT.TAG, String.format("remove retained topic: topicName= %s", formatPublishTopicNameWithId));
                return MQTT.this.publishTopic(formatPublishTopicNameWithId, new byte[0], topicConfig.QoS(), topicConfig.isRetained());
            }
        });
    }

    public void setCurrentTeacherId(String str) {
        this.mCurrentTeacherId = str;
    }

    public <Body, P extends PubJSON<Body>, T extends Topic<P>> Observable<Boolean> subscribeTopic(Class<T> cls, final PayloadCallBack<Payload<P>> payloadCallBack) {
        return Observable.just(cls).flatMap(new Function<Class<T>, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.11
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Class<T> cls2) throws Exception {
                TopicConfig topicConfig = (TopicConfig) cls2.getAnnotation(TopicConfig.class);
                if (topicConfig != null) {
                    return MQTT.this.subscribeTopic(topicConfig.pub(), cls2, payloadCallBack);
                }
                throw new MQTTException("subscribe topic failed!");
            }
        });
    }

    public <Body, P extends PubJSON<Body>, T extends Topic<P>> Observable<Boolean> subscribeTopic(String str, Class<T> cls, PayloadCallBack<Payload<P>> payloadCallBack) {
        return subscribeTopic(str, this.mOption.clientId, cls, payloadCallBack);
    }

    public <Body, P extends PubJSON<Body>, T extends Topic<P>> Observable<Boolean> subscribeTopic(String str, String str2, Class<T> cls, PayloadCallBack<Payload<P>> payloadCallBack) {
        return subscribeTopic(str, str2, cls, payloadCallBack, false, false, "");
    }

    public <Body, P extends PubJSON<Body>, T extends Topic<P>> Observable<Boolean> subscribeTopicOnce(String str, String str2, Class<T> cls, PayloadCallBack<Payload<P>> payloadCallBack) {
        return subscribeTopic(str, str2, cls, payloadCallBack, true, false, "");
    }

    public <P extends PubJSON, A extends AckJSON, T extends AckTopic<P, A>> Observable<Boolean> subscribeTopicWithAck(final String str, final String str2, final Class<T> cls, Function<Payload<P>, ObservableSource<T>> function) {
        final PublishSubject create = PublishSubject.create();
        return subscribeTopic(str, str2, cls, new PayloadCallBack<Payload<P>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.21
            @Override // com.pingan.education.classroom.base.data.topic.core.PayloadCallBack
            public void onReceived(Payload<P> payload) {
                create.onNext(payload);
            }
        }).flatMap(new Function<Boolean, ObservableSource<Payload<P>>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.20
            @Override // io.reactivex.functions.Function
            public ObservableSource<Payload<P>> apply(Boolean bool) throws Exception {
                return create;
            }
        }).flatMap(function).flatMap(new Function<T, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.19
            /* JADX WARN: Incorrect types in method signature: (TT;)Lio/reactivex/ObservableSource<Ljava/lang/Boolean;>; */
            @Override // io.reactivex.functions.Function
            public ObservableSource apply(AckTopic ackTopic) throws Exception {
                String formatTopicNameWithAck = MQTTUtils.formatTopicNameWithAck(str, str2, ((TopicConfig) cls.getAnnotation(TopicConfig.class)).name(), ackTopic.getAckMessageId());
                ackTopic.setAckClientId(MQTT.this.mOption.clientId);
                return MQTT.this.publishTopic(formatTopicNameWithAck, ackTopic.getAckRawData(), ackTopic.getQoS(), ackTopic.isRetained());
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    public void unregisterPCOnlineListener(OnOnlineStateListener onOnlineStateListener) {
        if (onOnlineStateListener == null || !this.mPCListeners.containsKey(onOnlineStateListener.toString())) {
            return;
        }
        this.mPCListeners.remove(onOnlineStateListener.toString());
    }

    public void unregisterTeacherOnlineListener(OnOnlineStateListener onOnlineStateListener) {
        if (onOnlineStateListener == null || !this.mTeacherListeners.containsKey(onOnlineStateListener.toString())) {
            return;
        }
        this.mTeacherListeners.remove(onOnlineStateListener.toString());
    }

    public <P extends PubJSON, T extends Topic<P>> Observable<Boolean> unsubscribeTopic(String str, Class<T> cls) {
        return unsubscribeTopic(str, this.mOption.clientId, cls);
    }

    public <P extends PubJSON, T extends Topic<P>> Observable<Boolean> unsubscribeTopic(final String str, final String str2, Class<T> cls) {
        return Observable.just(cls).flatMap(new Function<Class<T>, ObservableSource<Boolean>>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.14
            @Override // io.reactivex.functions.Function
            public ObservableSource<Boolean> apply(Class<T> cls2) throws Exception {
                TopicConfig topicConfig = (TopicConfig) cls2.getAnnotation(TopicConfig.class);
                if (topicConfig != null) {
                    return MQTT.this.unsubscribeTopic(MQTTUtils.formatTopicName(str, str2, topicConfig.name(), topicConfig.appendPlus()));
                }
                throw new MQTTException("subscribe topic failed!");
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

    public synchronized Observable<Boolean> uploadFile(String str, String str2, String str3, String str4, OnProgressListener onProgressListener) {
        if (!FileUtils.isFileExists(str2)) {
            return Observable.just(false);
        }
        String format = String.format("%s%s%s", str3, File.separator, str4);
        ELog.i(TAG, String.format("start upload file: localPath=%s | remotePath=%s", str2, format));
        UploadFileToPC uploadFileToPC = new UploadFileToPC(String.format("http://%s%s", this.mOption.brokerIp, format), str2);
        uploadFileToPC.setOnProgressListener(onProgressListener);
        return uploadFileToPC.build().toObservable().subscribeOn(Schedulers.io()).map(new Function<Response<String>, Boolean>() { // from class: com.pingan.education.classroom.base.data.topic.core.MQTT.25
            @Override // io.reactivex.functions.Function
            public Boolean apply(Response<String> response) throws Exception {
                return Boolean.valueOf(response.isSuccessful());
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }
}
