package com.changba.plugin.livechorus.websocket;

import com.changba.library.commonUtils.KTVLog;
import com.changba.library.commonUtils.ObjUtil;
import com.changba.plugin.snatchmic.live.models.socket.MessageEntity;
import com.changba.plugin.snatchmic.live.models.socket.ReceiveMessage;
import com.changba.plugin.snatchmic.live.util.ParseUtil;
import com.changba.plugin.snatchmic.socket.IReceiveDataListener;
import com.changba.plugin.snatchmic.socket.WebSocketMessageController;
import com.google.gson.Gson;
import com.meituan.robust.ChangeQuickRedirect;
import com.meituan.robust.PatchProxy;
import com.meituan.robust.PatchProxyResult;
import com.xiaomi.mipush.sdk.Constants;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: classes3.dex */
public class RxWebSocketMessageReceiver<T> implements IReceiveDataListener {
    public static ChangeQuickRedirect changeQuickRedirect;
    private static RxWebSocketMessageReceiver d;

    /* renamed from: c, reason: collision with root package name */
    private final Gson f20252c = new Gson();

    /* renamed from: a, reason: collision with root package name */
    private Subject<RxWebSocketMessageTypeModel<T>> f20251a = PublishSubject.d().b();
    private ThreadPoolExecutor b = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(100000), new ThreadPoolExecutor.DiscardPolicy());

    private RxWebSocketMessageReceiver() {
    }

    public static <T> RxWebSocketMessageReceiver<T> a() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], null, changeQuickRedirect, true, 58261, new Class[0], RxWebSocketMessageReceiver.class);
        if (proxy.isSupported) {
            return (RxWebSocketMessageReceiver) proxy.result;
        }
        if (d == null) {
            synchronized (RxWebSocketMessageReceiver.class) {
                if (d == null) {
                    d = new RxWebSocketMessageReceiver();
                    WebSocketMessageController.d().a(d);
                }
            }
        }
        return d;
    }

    private void d(String str) {
        if (!PatchProxy.proxy(new Object[]{str}, this, changeQuickRedirect, false, 58265, new Class[]{String.class}, Void.TYPE).isSupported && RxWebSocketMessageType.a(str) == null) {
            throw new RuntimeException(RxWebSocketMessageType.class.getName() + Constants.COLON_SEPARATOR + str + ":RxWebSocketMessageType中没有定义对应的类型");
        }
    }

    public Observable<RxWebSocketMessageTypeModel<T>> a(final String str) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{str}, this, changeQuickRedirect, false, 58264, new Class[]{String.class}, Observable.class);
        if (proxy.isSupported) {
            return (Observable) proxy.result;
        }
        d(str);
        return this.f20251a.b().filter(new Predicate<RxWebSocketMessageTypeModel<T>>(this) { // from class: com.changba.plugin.livechorus.websocket.RxWebSocketMessageReceiver.2
            public static ChangeQuickRedirect changeQuickRedirect;

            public boolean a(RxWebSocketMessageTypeModel<T> rxWebSocketMessageTypeModel) throws Exception {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{rxWebSocketMessageTypeModel}, this, changeQuickRedirect, false, 58269, new Class[]{RxWebSocketMessageTypeModel.class}, Boolean.TYPE);
                return proxy2.isSupported ? ((Boolean) proxy2.result).booleanValue() : str.equals(rxWebSocketMessageTypeModel.getType());
            }

            @Override // io.reactivex.functions.Predicate
            public /* bridge */ /* synthetic */ boolean test(Object obj) throws Exception {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{obj}, this, changeQuickRedirect, false, 58270, new Class[]{Object.class}, Boolean.TYPE);
                return proxy2.isSupported ? ((Boolean) proxy2.result).booleanValue() : a((RxWebSocketMessageTypeModel) obj);
            }
        });
    }

    public Observable<RxWebSocketMessageTypeModel<T>> a(String str, int i) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{str, new Integer(i)}, this, changeQuickRedirect, false, 58267, new Class[]{String.class, Integer.TYPE}, Observable.class);
        return proxy.isSupported ? (Observable) proxy.result : a(str).toFlowable(BackpressureStrategy.BUFFER).a().a(AndroidSchedulers.a(), false, i).e();
    }

    public void a(MessageEntity messageEntity) {
        if (PatchProxy.proxy(new Object[]{messageEntity}, this, changeQuickRedirect, false, 58263, new Class[]{MessageEntity.class}, Void.TYPE).isSupported) {
            return;
        }
        String str = messageEntity.msgType;
        String str2 = messageEntity.context;
        KTVLog.a("-----RxWebSocket-----", "Rx--onReceiveMessage  type = " + str + " , message=" + str2);
        Class<T> a2 = RxWebSocketMessageType.a(str);
        if (a2 != null) {
            try {
                this.f20251a.onNext(new RxWebSocketMessageTypeModel<>(str, messageEntity, this.f20252c.fromJson(str2, (Class) a2)));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Override // com.changba.plugin.snatchmic.socket.IReceiveDataListener
    public void b(final String str) {
        if (PatchProxy.proxy(new Object[]{str}, this, changeQuickRedirect, false, 58262, new Class[]{String.class}, Void.TYPE).isSupported) {
            return;
        }
        this.b.execute(new Runnable() { // from class: com.changba.plugin.livechorus.websocket.RxWebSocketMessageReceiver.1
            public static ChangeQuickRedirect changeQuickRedirect;

            @Override // java.lang.Runnable
            public void run() {
                ReceiveMessage a2;
                if (PatchProxy.proxy(new Object[0], this, changeQuickRedirect, false, 58268, new Class[0], Void.TYPE).isSupported) {
                    return;
                }
                KTVLog.d("-----RxWebSocket-----", "LiveRoomActivity onReceive() data:::::" + str);
                if (ObjUtil.isEmpty(str) || (a2 = ParseUtil.a(str)) == null || ObjUtil.isEmpty((Collection<?>) a2.messageList)) {
                    return;
                }
                Iterator<MessageEntity> it = a2.messageList.iterator();
                while (it.hasNext()) {
                    RxWebSocketMessageReceiver.this.a(it.next());
                }
            }
        });
    }

    public Observable<RxWebSocketMessageTypeModel<T>> c(String str) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{str}, this, changeQuickRedirect, false, 58266, new Class[]{String.class}, Observable.class);
        return proxy.isSupported ? (Observable) proxy.result : a(str, 128);
    }
}
