package cn.ewpark.core.net.factory;

import cn.ewpark.core.android.LogHelper;
import cn.ewpark.core.net.annotation.CachePolicyType;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.CountDownLatch;

/* loaded from: classes.dex */
public class AsyncOnSubscribeCacheNet<T> implements ObservableOnSubscribe<T> {
    protected AsyncOnSubscribeCacheNet<T>.ObservableWrapper<T> cacheObservable;
    protected CachePolicyType mCachePolicyType;
    protected AsyncOnSubscribeCacheNet<T>.ObservableWrapper<T> netObservable;
    protected Consumer<T> storeCacheAction;
    private final boolean debug = true;
    private final String TAG = "OnSubscribeCacheNet:";
    protected CountDownLatch cacheLatch = new CountDownLatch(1);
    protected CountDownLatch netLatch = new CountDownLatch(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public class CacheObserver<T> implements Observer<T> {
        AsyncOnSubscribeCacheNet<T>.ObservableWrapper<T> observableWrapper;
        Consumer<T> storeCacheAction;
        ObservableEmitter<? super T> subscriber;

        public CacheObserver(AsyncOnSubscribeCacheNet<T>.ObservableWrapper<T> observableWrapper, ObservableEmitter<? super T> observableEmitter, Consumer<T> consumer) {
            this.observableWrapper = observableWrapper;
            this.subscriber = observableEmitter;
            this.storeCacheAction = consumer;
        }

        @Override // io.reactivex.Observer
        public void onComplete() {
            LogHelper.info("OnSubscribeCacheNet:", "cache onCompleted");
            AsyncOnSubscribeCacheNet.this.cacheLatch.countDown();
        }

        @Override // io.reactivex.Observer
        public void onError(Throwable th) {
            LogHelper.error("OnSubscribeCacheNet:", "cache onError");
            LogHelper.error("OnSubscribeCacheNet:", "read cache error:" + th.getMessage());
            th.printStackTrace();
            AsyncOnSubscribeCacheNet.this.cacheLatch.countDown();
        }

        @Override // io.reactivex.Observer
        public void onNext(T t) {
            LogHelper.info("OnSubscribeCacheNet:", "cache onNext o:" + t);
            this.observableWrapper.setData(t);
            AsyncOnSubscribeCacheNet.this.logThread("");
            if (AsyncOnSubscribeCacheNet.this.netLatch.getCount() <= 0) {
                LogHelper.error("OnSubscribeCacheNet:", "net result had been load,so cache is not need to load");
                return;
            }
            LogHelper.error("OnSubscribeCacheNet:", " check subscriber :" + this.subscriber + " isUnsubscribed:" + this.subscriber.isDisposed());
            ObservableEmitter<? super T> observableEmitter = this.subscriber;
            if (observableEmitter == null || observableEmitter.isDisposed()) {
                return;
            }
            this.subscriber.onNext(t);
        }

        @Override // io.reactivex.Observer
        public void onSubscribe(Disposable disposable) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public class NetObserver<T> implements Observer<T> {
        AsyncOnSubscribeCacheNet<T>.ObservableWrapper<T> observableWrapper;
        Consumer<T> storeCacheAction;
        ObservableEmitter<? super T> subscriber;

        public NetObserver(AsyncOnSubscribeCacheNet<T>.ObservableWrapper<T> observableWrapper, ObservableEmitter<? super T> observableEmitter, Consumer<T> consumer, CachePolicyType cachePolicyType) {
            this.observableWrapper = observableWrapper;
            this.subscriber = observableEmitter;
            this.storeCacheAction = consumer;
            if (cachePolicyType == CachePolicyType.NONE) {
                AsyncOnSubscribeCacheNet.this.cacheLatch.countDown();
            }
        }

        @Override // io.reactivex.Observer
        public void onComplete() {
            LogHelper.info("OnSubscribeCacheNet:", "net onCompleted ");
            try {
                if (this.storeCacheAction != null && AsyncOnSubscribeCacheNet.this.mCachePolicyType != CachePolicyType.NONE) {
                    AsyncOnSubscribeCacheNet.this.logThread("保存到本地缓存 ");
                    this.storeCacheAction.accept(this.observableWrapper.getData());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            ObservableEmitter<? super T> observableEmitter = this.subscriber;
            if (observableEmitter != null && !observableEmitter.isDisposed()) {
                this.subscriber.onComplete();
            }
            AsyncOnSubscribeCacheNet.this.netLatch.countDown();
        }

        @Override // io.reactivex.Observer
        public void onError(Throwable th) {
            LogHelper.error("OnSubscribeCacheNet:", "net onError ");
            try {
                LogHelper.error("OnSubscribeCacheNet:", "net onError await if cache not completed.");
                AsyncOnSubscribeCacheNet.this.cacheLatch.await();
                LogHelper.error("OnSubscribeCacheNet:", "net onError await over.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ObservableEmitter<? super T> observableEmitter = this.subscriber;
            if (observableEmitter == null || observableEmitter.isDisposed()) {
                return;
            }
            this.subscriber.onError(th);
        }

        @Override // io.reactivex.Observer
        public void onNext(T t) {
            LogHelper.info("OnSubscribeCacheNet:", "net onNext o:" + t);
            this.observableWrapper.setData(t);
            AsyncOnSubscribeCacheNet.this.logThread("");
            LogHelper.error(" check subscriber :" + this.subscriber + " isUnsubscribed:" + this.subscriber.isDisposed());
            ObservableEmitter<? super T> observableEmitter = this.subscriber;
            if (observableEmitter == null || observableEmitter.isDisposed()) {
                return;
            }
            this.subscriber.onNext(t);
        }

        @Override // io.reactivex.Observer
        public void onSubscribe(Disposable disposable) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public class ObservableWrapper<T> {
        T data;
        Observable<T> observable;

        public ObservableWrapper(Observable<T> observable) {
            this.observable = observable;
        }

        public T getData() {
            return this.data;
        }

        public Observable<T> getObservable() {
            return this.observable;
        }

        public void setData(T t) {
            this.data = t;
        }
    }

    public AsyncOnSubscribeCacheNet(Observable<T> observable, Observable<T> observable2, Consumer<T> consumer, CachePolicyType cachePolicyType) {
        this.cacheObservable = new ObservableWrapper<>(observable);
        this.netObservable = new ObservableWrapper<>(observable2);
        this.storeCacheAction = consumer;
        this.mCachePolicyType = cachePolicyType;
    }

    public void logThread(String str) {
        LogHelper.info("OnSubscribeCacheNet:", str + " : " + Thread.currentThread().getName());
    }

    @Override // io.reactivex.ObservableOnSubscribe
    public void subscribe(ObservableEmitter<T> observableEmitter) throws Exception {
        this.cacheObservable.getObservable().subscribeOn(Schedulers.io()).subscribe(new CacheObserver(this.cacheObservable, observableEmitter, this.storeCacheAction));
        this.netObservable.getObservable().subscribeOn(Schedulers.io()).subscribe(new NetObserver(this.netObservable, observableEmitter, this.storeCacheAction, this.mCachePolicyType));
    }
}
