package jd.dd.network.tcp;

import android.os.Process;
import android.text.TextUtils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.PriorityBlockingQueue;
import jd.dd.network.tcp.message.MessageUtil;
import jd.dd.network.tcp.protocol.BaseMessage;
import jd.dd.waiter.util.LogUtils;

/* loaded from: classes4.dex */
public class MessageProducer extends Thread {
    private static int CURRENT_MESSAGE_COUNT = 0;
    public static final int DEFAULT_CONSUMER_SIZE = 4;
    public static final String TAG = "PacketReader";
    private final BlockingQueue<String> mCacheQueue;
    private MessageConsumer[] mConsumers;
    private final BlockingQueue<BaseMessage> mMessageQueue;
    private volatile boolean mQuit = false;

    public MessageProducer(AbstractConnection abstractConnection) {
        int i = 0;
        setName("MessageProducer");
        this.mCacheQueue = new LinkedBlockingDeque();
        this.mMessageQueue = new PriorityBlockingQueue();
        this.mConsumers = new MessageConsumer[4];
        while (true) {
            MessageConsumer[] messageConsumerArr = this.mConsumers;
            if (i >= messageConsumerArr.length) {
                return;
            }
            int i2 = i + 1;
            messageConsumerArr[i] = new MessageConsumer(abstractConnection, i2, this.mMessageQueue);
            this.mConsumers[i].start();
            i = i2;
        }
    }

    public void add(String str) {
        if (TextUtils.isEmpty(str)) {
            return;
        }
        try {
            this.mCacheQueue.put(str);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void quit() {
        LogUtils.d(TAG, "MessageProducer开始退出.");
        for (MessageConsumer messageConsumer : this.mConsumers) {
            messageConsumer.quit();
        }
        this.mQuit = true;
        interrupt();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LogUtils.d(TAG, "MessageProducer begin run.");
        Process.setThreadPriority(10);
        while (true) {
            try {
                String take = this.mCacheQueue.take();
                CURRENT_MESSAGE_COUNT++;
                LogUtils.printAll(TAG, "MessageProducer cache-queue-take 第 " + CURRENT_MESSAGE_COUNT + " 条消息:" + take);
                BaseMessage castToLocalObject = MessageUtil.castToLocalObject(take);
                if (castToLocalObject != null) {
                    this.mMessageQueue.put(castToLocalObject);
                } else {
                    LogUtils.d(TAG, "MessageProducer 包解析异常，请即时更正包修改代码.packet=" + take);
                }
            } catch (InterruptedException unused) {
                if (this.mQuit) {
                    return;
                }
            }
        }
    }
}
