package com.youku.live.messagechannel.connection;

import android.content.Context;
import com.alibaba.fastjson.JSONArray;
import com.taobao.orange.OrangeConfig;
import com.taobao.tao.log.TLog;
import com.taobao.weex.el.parse.Operators;
import com.youku.live.messagechannel.callback.IMCConnectionEventCallback;
import com.youku.live.messagechannel.conf.OrangeConfKey;
import com.youku.live.messagechannel.message.MCMessage;
import com.youku.live.messagechannel.message.MCMessageProcessor;
import com.youku.live.messagechannel.message.MCSysMessageName;
import com.youku.live.messagechannel.utils.MCThreadFactory;
import com.youku.live.messagechannel.utils.MyLog;
import com.youku.live.messagechannel.utils.ServerTimeEstimater;
import com.youku.live.messagechannel.utils.TLogUtil;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: classes6.dex */
public abstract class AbstractMCConnection implements IMCConnection {
    protected final long appId;
    protected final String channelId;
    protected final Context context;
    private volatile MCConnectionState mcConnectionState;
    private ScheduledThreadPoolExecutor stateCheckExecutor;
    private ScheduledFuture stateCheckFuture;
    protected final String TAG = getClass().getSimpleName();
    private volatile long lastedStateChangeTime = 0;
    private String maxDeliverCount = OrangeConfig.getInstance().getConfig(OrangeConfKey.Group.android_youku_messagechannel, OrangeConfKey.maxPullmsgDistributeCnt.name, OrangeConfKey.maxPullmsgDistributeCnt.def);
    private String appMonitorHeartBeatInterval = OrangeConfig.getInstance().getConfig(OrangeConfKey.Group.android_youku_messagechannel, OrangeConfKey.appMonitorHeartBeatInterval.name, OrangeConfKey.appMonitorHeartBeatInterval.def);

    public AbstractMCConnection(Context context, long j, String str) {
        this.context = context;
        this.appId = j;
        this.channelId = str;
        setConnectionState(MCConnectionState.INIT);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String appIdAndChannelString() {
        return new StringBuffer().append(", appId:").append(this.appId).append(", channelId:").append(this.channelId).toString();
    }

    protected abstract boolean customDispatchFilterPass(MCMessage mCMessage);

    protected abstract void customLaunch(IMCConnectionEventCallback iMCConnectionEventCallback);

    protected abstract void customShutdown(IMCConnectionEventCallback iMCConnectionEventCallback);

    /* JADX INFO: Access modifiers changed from: protected */
    public void deliverMessages(List<MCMessage> list) {
        if (list == null || list.size() <= 0) {
            MyLog.d(this.TAG, "Deliver messages is empty, connectionFlag: ", getConnectionFlag(), appIdAndChannelString());
        } else {
            MyLog.v(this.TAG, "Deliver receive ", Integer.valueOf(list.size()), " messages.");
            Observable.fromIterable(list).filter(new Predicate<MCMessage>() { // from class: com.youku.live.messagechannel.connection.AbstractMCConnection.4
                @Override // io.reactivex.functions.Predicate
                public boolean test(MCMessage mCMessage) {
                    if (mCMessage == null) {
                        return false;
                    }
                    if (MCSysMessageName.SYS_PROBE.getName().equals(mCMessage.msgType)) {
                        MCConnectionReceivedProbeMessages.refreshProbe(mCMessage);
                    }
                    return AbstractMCConnection.this.customDispatchFilterPass(mCMessage);
                }
            }).sorted(new Comparator<MCMessage>() { // from class: com.youku.live.messagechannel.connection.AbstractMCConnection.3
                @Override // java.util.Comparator
                public int compare(MCMessage mCMessage, MCMessage mCMessage2) {
                    if (mCMessage.sendTime > mCMessage2.sendTime) {
                        return 1;
                    }
                    return mCMessage.sendTime < mCMessage2.sendTime ? -1 : 0;
                }
            }).take(Integer.valueOf(this.maxDeliverCount).intValue()).toList().subscribe(new Consumer<List<MCMessage>>() { // from class: com.youku.live.messagechannel.connection.AbstractMCConnection.2
                @Override // io.reactivex.functions.Consumer
                public void accept(List<MCMessage> list2) {
                    if (list2 == null || list2.size() <= 0) {
                        return;
                    }
                    if (AbstractMCConnection.this.getConnectionFlag() != MCConnectionFlag.PM && AbstractMCConnection.this.getConnectionFlag() != MCConnectionFlag.ACCS_MASS && AbstractMCConnection.this.mcConnectionState == MCConnectionState.CLOSED) {
                        MyLog.w(AbstractMCConnection.this.TAG, "Because of connection closed, deliver terminal push ", Integer.valueOf(list2.size()), " messages to MCMessageProcessor. MCConnectionState:", AbstractMCConnection.this.mcConnectionState, Operators.SPACE_STR, JSONArray.toJSONString(list2));
                    } else {
                        MyLog.v(AbstractMCConnection.this.TAG, "Deliver push ", Integer.valueOf(list2.size()), " messages to MCMessageProcessor. ", JSONArray.toJSONString(list2));
                        MCMessageProcessor.getInstance().process(list2);
                    }
                }
            });
        }
    }

    protected abstract void doSomethingForReopen();

    @Override // com.youku.live.messagechannel.connection.IMCConnection
    public MCConnectionState getConnectionState() {
        return this.mcConnectionState;
    }

    @Override // com.youku.live.messagechannel.connection.IMCConnection
    public boolean isHealth() {
        return MCConnectionState.OPEN == this.mcConnectionState;
    }

    @Override // com.youku.live.messagechannel.connection.IMCConnection
    public void launch(IMCConnectionEventCallback iMCConnectionEventCallback) {
        if (this.mcConnectionState == MCConnectionState.INIT) {
            setConnectionState(MCConnectionState.OPENING);
            MCConnectionReceivedProbeMessages.initProbe(this.appId, this.channelId, getConnectionFlag());
            this.stateCheckExecutor = new ScheduledThreadPoolExecutor(1, new MCThreadFactory("stateCheck"));
            this.stateCheckFuture = this.stateCheckExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.youku.live.messagechannel.connection.AbstractMCConnection.1
                @Override // java.lang.Runnable
                public void run() {
                    TLog.logi(TLogUtil.TLOG_MODULE_NAME, AbstractMCConnection.this.TAG, TLogUtil.jointContents("State selfCheck start", AbstractMCConnection.this.appIdAndChannelString()));
                    if (AbstractMCConnection.this.mcConnectionState.getCode() <= MCConnectionState.INIT.getCode() || System.currentTimeMillis() - AbstractMCConnection.this.lastedStateChangeTime <= Integer.valueOf(AbstractMCConnection.this.appMonitorHeartBeatInterval).intValue() * 1000) {
                        return;
                    }
                    MCMessage lastedProbe = MCConnectionReceivedProbeMessages.getLastedProbe(AbstractMCConnection.this.appId, AbstractMCConnection.this.channelId, AbstractMCConnection.this.getConnectionFlag());
                    if (lastedProbe == null || ServerTimeEstimater.estimateServerTimestamp() - lastedProbe.sendTime > Integer.valueOf(AbstractMCConnection.this.appMonitorHeartBeatInterval).intValue() * 1000) {
                        if (AbstractMCConnection.this.mcConnectionState == MCConnectionState.OPEN) {
                            AbstractMCConnection.this.setConnectionState(MCConnectionState.BROKEN);
                            TLog.logi(TLogUtil.TLOG_MODULE_NAME, AbstractMCConnection.this.TAG, TLogUtil.jointContents("Probe is miss, connection change state to:", AbstractMCConnection.this.mcConnectionState.name(), AbstractMCConnection.this.appIdAndChannelString(), ", mcConnectionFlag:", AbstractMCConnection.this.getConnectionFlag().name()));
                        }
                        AbstractMCConnection.this.doSomethingForReopen();
                        return;
                    }
                    if (AbstractMCConnection.this.mcConnectionState != MCConnectionState.OPEN) {
                        AbstractMCConnection.this.setConnectionState(MCConnectionState.OPEN);
                        TLog.logi(TLogUtil.TLOG_MODULE_NAME, AbstractMCConnection.this.TAG, TLogUtil.jointContents("Probe received, connection change state to:", AbstractMCConnection.this.mcConnectionState.name(), AbstractMCConnection.this.appIdAndChannelString(), ", mcConnectionFlag:", AbstractMCConnection.this.getConnectionFlag().name()));
                    }
                }
            }, Integer.valueOf(this.appMonitorHeartBeatInterval).intValue(), Integer.valueOf(this.appMonitorHeartBeatInterval).intValue(), TimeUnit.SECONDS);
            customLaunch(iMCConnectionEventCallback);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setConnectionState(MCConnectionState mCConnectionState) {
        this.lastedStateChangeTime = System.currentTimeMillis();
        if (this.mcConnectionState != mCConnectionState) {
            MCConnectionState mCConnectionState2 = this.mcConnectionState;
            this.mcConnectionState = mCConnectionState;
            TLog.logi(TLogUtil.TLOG_MODULE_NAME, this.TAG, TLogUtil.jointContents("Connection state change to:", this.mcConnectionState.name(), appIdAndChannelString(), ", mcConnectionFlag:", getConnectionFlag().name()));
            stateChangeNotify(mCConnectionState2, mCConnectionState);
        }
    }

    @Override // com.youku.live.messagechannel.connection.IMCConnection
    public void shutdown(IMCConnectionEventCallback iMCConnectionEventCallback) {
        setConnectionState(MCConnectionState.CLOSED);
        if (this.stateCheckFuture != null) {
            this.stateCheckFuture.cancel(true);
            this.stateCheckFuture = null;
        }
        if (this.stateCheckExecutor != null) {
            this.stateCheckExecutor.shutdownNow();
        }
        MCConnectionReceivedProbeMessages.clearProbe(this.appId, this.channelId, getConnectionFlag());
        customShutdown(iMCConnectionEventCallback);
    }

    protected abstract void stateChangeNotify(MCConnectionState mCConnectionState, MCConnectionState mCConnectionState2);
}
