package com.walker.cheetah.client.support;

import com.walker.cheetah.client.transport.AbstractConnector;
import com.walker.cheetah.client.transport.LongConnector;
import com.walker.cheetah.core.DataProtocolsException;
import com.walker.cheetah.core.Protocols;
import com.walker.cheetah.core.ProtocolsFactory;
import com.walker.cheetah.core.RemoteException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: classes.dex */
public class AbstractTcpLongConnector extends AbstractConnector implements LongConnector {
    static final /* synthetic */ boolean $assertionsDisabled;
    private static final int MAX_SAVED_PUSH_SIZE = 128;
    private static final int THREAD_WAIT_TIME = 1;
    protected Protocols protocols = ProtocolsFactory.getInstance();
    private BlockingQueue<ByteBuffer> pushedFromServerData = new ArrayBlockingQueue(128);
    private BlockingQueue<ByteBuffer> sendToServerData = new ArrayBlockingQueue(128);
    private BlockingQueue<ByteBuffer> responseData = new ArrayBlockingQueue(1);
    private volatile boolean connectThreadStoped = $assertionsDisabled;
    private ListeningServerThread connectThread = null;
    private volatile boolean isAuthenticated = $assertionsDisabled;

    /* loaded from: classes.dex */
    private class ListeningServerThread extends Thread {
        private static final String T_NAME = "ListeningServerThread";
        private volatile boolean isSending = AbstractTcpLongConnector.$assertionsDisabled;
        private Thread parentThread;

        public ListeningServerThread() {
            setName(T_NAME);
        }

        private void clearSendStatus0() {
            this.isSending = AbstractTcpLongConnector.$assertionsDisabled;
            AbstractTcpLongConnector.this.client.register(AbstractTcpLongConnector.this.selector, 1);
        }

        private void processInterrupted() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            AbstractTcpLongConnector.this.logger.info("======= connect thread is runing!");
            while (!AbstractTcpLongConnector.this.connectThreadStoped) {
                try {
                    try {
                        AbstractTcpLongConnector.this.selector.select();
                        Iterator<SelectionKey> it = AbstractTcpLongConnector.this.selector.selectedKeys().iterator();
                        if (!this.isSending && AbstractTcpLongConnector.this.sendToServerData.peek() != null) {
                            AbstractTcpLongConnector.this.logger.info("++发现发送数据，进入发送模式");
                            AbstractTcpLongConnector.this.client.register(AbstractTcpLongConnector.this.selector, 4);
                            this.isSending = true;
                        }
                        while (it.hasNext()) {
                            SelectionKey next = it.next();
                            it.remove();
                            if (next.isConnectable()) {
                                if (AbstractTcpLongConnector.this.client.isConnectionPending()) {
                                    AbstractTcpLongConnector.this.client.finishConnect();
                                }
                                AbstractTcpLongConnector.this.logger.debug("is connected...");
                                if (this.isSending) {
                                    AbstractTcpLongConnector.this.client.register(AbstractTcpLongConnector.this.selector, 4);
                                } else {
                                    AbstractTcpLongConnector.this.client.register(AbstractTcpLongConnector.this.selector, 1);
                                }
                            } else if (next.isWritable()) {
                                if (!this.isSending) {
                                    throw new IllegalStateException("send data, but isSending is: false!");
                                }
                                try {
                                    AbstractTcpLongConnector.this.write0(AbstractTcpLongConnector.this.sendToServerData.take());
                                    AbstractTcpLongConnector.this.client.register(AbstractTcpLongConnector.this.selector, 1);
                                    AbstractTcpLongConnector.this.logger.info("已发送数据");
                                } catch (InterruptedException e) {
                                    AbstractTcpLongConnector.this.logger.error("从队列中获得发送数据时阻塞，当前已被中断");
                                }
                            } else if (next.isReadable()) {
                                try {
                                    ByteBuffer read0 = AbstractTcpLongConnector.this.read0();
                                    AbstractTcpLongConnector.this.logger.info("接收到数据: " + read0);
                                    try {
                                        if (AbstractTcpLongConnector.this.isAuthenticated) {
                                            this.isSending = AbstractTcpLongConnector.$assertionsDisabled;
                                        } else {
                                            AbstractTcpLongConnector.this.isAuthenticated = true;
                                        }
                                        if (this.isSending) {
                                            AbstractTcpLongConnector.this.responseData.put(read0);
                                            clearSendStatus0();
                                        } else {
                                            AbstractTcpLongConnector.this.pushedFromServerData.put(read0);
                                        }
                                    } catch (InterruptedException e2) {
                                        AbstractTcpLongConnector.this.logger.info("接收服务端消息时，接收队列已满，无法继续接收");
                                        if (this.isSending) {
                                            processInterrupted();
                                        }
                                    }
                                } catch (DataProtocolsException e3) {
                                    e3.printStackTrace();
                                    AbstractTcpLongConnector.this.logger.error("接收到的数据不符合协议: " + e3.getMessage());
                                    AbstractTcpLongConnector.this.connectThreadStoped = true;
                                }
                            } else {
                                continue;
                            }
                        }
                    } catch (IOException e4) {
                        e4.printStackTrace();
                        AbstractTcpLongConnector.this.logger.info("connect thread stoped: " + e4.getMessage());
                        try {
                            if (!AbstractTcpLongConnector.this.connectThreadStoped) {
                                AbstractTcpLongConnector.this.close();
                            }
                        } catch (RemoteException e5) {
                        }
                        AbstractTcpLongConnector.this.logger.info("客户端监听线程结束...");
                        terminate();
                        return;
                    }
                } finally {
                    try {
                        if (!AbstractTcpLongConnector.this.connectThreadStoped) {
                            AbstractTcpLongConnector.this.close();
                        }
                    } catch (RemoteException e6) {
                    }
                    AbstractTcpLongConnector.this.logger.info("客户端监听线程结束...");
                    terminate();
                }
            }
            AbstractTcpLongConnector.this.logger.info(String.valueOf(getName()) + ": terminated because of connectThreadStoped = " + AbstractTcpLongConnector.this.connectThreadStoped);
        }

        public void setParentThread(Thread thread) {
            this.parentThread = thread;
        }

        public void setSendingEnd() {
            this.isSending = AbstractTcpLongConnector.$assertionsDisabled;
        }

        public void setStop() {
            AbstractTcpLongConnector.this.connectThreadStoped = true;
        }

        public void terminate() {
            AbstractTcpLongConnector.this.connectThreadStoped = true;
            if (this.parentThread != null) {
                this.parentThread.interrupt();
            }
        }
    }

    static {
        $assertionsDisabled = !AbstractTcpLongConnector.class.desiredAssertionStatus() ? true : $assertionsDisabled;
    }

    private void checkConnection0() {
        if (this.client == null) {
            throw new IllegalStateException("客户端还未建立连接,必须先调用start()方法。");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ByteBuffer read0() {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        if (this.client.read(allocate) != 4) {
            throw new DataProtocolsException();
        }
        allocate.flip();
        int i = allocate.getInt();
        if (i <= 0) {
            throw new DataProtocolsException();
        }
        if (i > 65536) {
            throw new DataProtocolsException("out of max received size: 65536");
        }
        ByteBuffer allocate2 = ByteBuffer.allocate(i);
        this.client.read(allocate2);
        allocate2.flip();
        return allocate2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void write0(Object obj) {
        if (!$assertionsDisabled && obj == null) {
            throw new AssertionError();
        }
        if (obj instanceof byte[]) {
            byte[] encode = this.protocols.encode((byte[]) obj);
            this.logger.debug("发送的是字节，共: " + encode.length);
            this.client.write(ByteBuffer.wrap(encode));
        } else {
            if (!(obj instanceof ByteBuffer)) {
                throw new UnsupportedOperationException();
            }
            ByteBuffer encode2 = this.protocols.encode((ByteBuffer) obj);
            this.logger.debug("发送的是字节缓冲，共: " + encode2.remaining() + ", " + encode2);
            this.client.write(encode2);
        }
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public void close() {
        try {
            this.logger.info("客户端长连接被调用close()");
            stop();
        } catch (Exception e) {
            throw new RemoteException("close connector failed!", e);
        }
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public SocketChannel connect() {
        checkConnection0();
        if (this.connectThreadStoped) {
            this.logger.info("长连接客户端线程重新连接，设置connectThreadStoped = false;");
            this.connectThreadStoped = $assertionsDisabled;
        }
        this.connectThread.setDaemon(true);
        this.connectThread.start();
        try {
            this.logger.debug("connectThread正在启动...");
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return this.client;
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public boolean isConnected() {
        if (this.connectThreadStoped || this.client == null || !this.client.isConnected()) {
            return $assertionsDisabled;
        }
        return true;
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public byte[] receivePushed() {
        if (this.connectThreadStoped) {
            throw new RemoteException();
        }
        try {
            return this.pushedFromServerData.take().array();
        } catch (InterruptedException e) {
            this.logger.debug("调用线程被中断，停止从'推送队列'获取数据");
            throw e;
        }
    }

    @Override // com.walker.cheetah.client.transport.Connector
    public Object send(Object obj) {
        ByteBuffer take;
        if (!$assertionsDisabled && obj == null) {
            throw new AssertionError();
        }
        checkConnection0();
        if (this.connectThreadStoped) {
            throw new RemoteException();
        }
        if (!this.responseData.isEmpty()) {
            this.logger.debug("上次发送还没有处理响应数据，自动清除");
            this.responseData.clear();
        }
        if (!this.sendToServerData.isEmpty()) {
            this.logger.debug("上次发送数据还未处理，系统仅支持阻塞消息发送");
            return null;
        }
        this.sendToServerData.add(obj instanceof byte[] ? ByteBuffer.wrap((byte[]) obj) : obj instanceof ByteBuffer ? (ByteBuffer) obj : null);
        this.selector.wakeup();
        if (this.isAuthenticated) {
            take = null;
        } else {
            try {
                take = this.responseData.take();
                this.logger.debug("........... 认证结果：" + new String(take.array()));
            } catch (InterruptedException e) {
                e.printStackTrace();
                this.logger.warn("响应队列中，获取数据中断", e);
                this.responseData.clear();
                if (this.connectThread == null) {
                    return null;
                }
                this.connectThread.setSendingEnd();
                return null;
            }
        }
        return take;
    }

    @Override // com.walker.cheetah.client.transport.AbstractConnector
    protected void startConnector() {
        if (this.ip == null) {
            throw new NullPointerException("remote adress is required!");
        }
        this.client = SocketChannel.open();
        this.client.configureBlocking($assertionsDisabled);
        this.client.socket().setSoTimeout(0);
        this.selector = Selector.open();
        this.client.register(this.selector, 8);
        if (this.client.connect(this.ip)) {
            this.logger.info("客户端连接成功!");
        } else {
            this.logger.info("客户端正在连接...");
        }
        if (this.connectThread == null) {
            this.connectThread = new ListeningServerThread();
            this.logger.debug("创建了一个客户端线程:connectThread = " + this.connectThread);
        }
        this.connectThread.setParentThread(Thread.currentThread());
    }

    @Override // com.walker.cheetah.client.transport.AbstractConnector
    protected void stopConnector() {
        if (this.connectThread != null) {
            this.connectThread.setStop();
            this.logger.info("设置connectThread.setStop()...");
            this.connectThread.interrupt();
        }
        this.connectThread = null;
        if (this.client != null) {
            closeChannel(this.client, this.selector);
            this.client = null;
            this.selector = null;
            this.logger.info("客户端连接已被关闭：stopConnector");
        } else {
            this.logger.info("客户端已经关闭，调用无效: stopConnector");
        }
        this.pushedFromServerData.clear();
        this.sendToServerData.clear();
    }
}
