package com.lantu.MobileCampus.haust.im;

import com.walker.cheetah.client.transport.LongConnector;
import com.walker.cheetah.core.DataProtocolsException;
import com.walker.cheetah.core.Protocols;
import com.walker.cheetah.core.ProtocolsFactory;
import com.walker.cheetah.core.RemoteException;
import com.walkersoft.mobile.core.util.LogUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: classes2.dex */
public class b extends com.lantu.MobileCampus.haust.im.a implements LongConnector {
    public static final int i = 8192;
    public static final int k = 8;
    static final /* synthetic */ boolean l;
    private static final int m = 128;
    private static final int s = 1;
    protected Protocols j = ProtocolsFactory.getInstance();
    private BlockingQueue<ByteBuffer> n = new ArrayBlockingQueue(128);
    private BlockingQueue<ByteBuffer> o = new ArrayBlockingQueue(128);
    private volatile boolean p = false;
    private C0052b q = null;
    private final Map<Integer, a> r = new HashMap(8);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class a {

        /* renamed from: a, reason: collision with root package name */
        private int f1673a;
        private int b = 0;
        private ByteBuffer c;

        public a(int i) {
            this.f1673a = 0;
            this.c = null;
            this.f1673a = i;
            this.c = ByteBuffer.allocate(i);
        }

        public int a() {
            return this.f1673a - this.b;
        }

        public void a(ByteBuffer byteBuffer, int i) {
            byteBuffer.flip();
            this.c.put(byteBuffer);
            byteBuffer.flip();
            this.b += i;
        }

        public ByteBuffer b() {
            this.c.flip();
            return this.c;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder(128);
            sb.append("[currentSize=");
            sb.append(this.b);
            sb.append(", totalBytes=");
            sb.append(this.f1673a);
            sb.append(", resultBytes=");
            sb.append(this.c);
            sb.append("]");
            return sb.toString();
        }
    }

    /* renamed from: com.lantu.MobileCampus.haust.im.b$b, reason: collision with other inner class name */
    /* loaded from: classes2.dex */
    private class C0052b extends Thread {
        private static final String b = "ListeningServerThread";
        private volatile boolean c = false;
        private Thread d;

        public C0052b() {
            setName(b);
        }

        private void c() throws IOException {
        }

        public void a() {
            b.this.p = true;
            if (this.d != null) {
                this.d.interrupt();
            }
        }

        public void a(Thread thread) {
            this.d = thread;
        }

        public void b() {
            b.this.p = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            LogUtils.b("======= connect thread is runing!");
            while (!b.this.p) {
                try {
                    try {
                        LogUtils.b("count = " + b.this.g.select());
                        Iterator<SelectionKey> it = b.this.g.selectedKeys().iterator();
                        if (b.this.o.peek() != null) {
                            LogUtils.b("++发现发送数据，进入发送模式");
                            b.this.f.register(b.this.g, 4);
                            this.c = true;
                        } else {
                            this.c = false;
                        }
                        while (it.hasNext()) {
                            SelectionKey next = it.next();
                            it.remove();
                            if (next.isConnectable()) {
                                if (b.this.f.isConnectionPending()) {
                                    b.this.f.finishConnect();
                                }
                                LogUtils.b("is connected...");
                                if (this.c) {
                                    b.this.f.register(b.this.g, 4);
                                } else {
                                    b.this.f.register(b.this.g, 1);
                                }
                            } else if (next.isWritable()) {
                                try {
                                    b.this.b(b.this.o.take());
                                    b.this.f.register(b.this.g, 1);
                                    LogUtils.b("已发送数据");
                                } catch (InterruptedException e) {
                                    LogUtils.b("从队列中获得发送数据时阻塞，当前已被中断");
                                    c();
                                }
                            } else if (next.isReadable()) {
                                ByteBuffer byteBuffer = null;
                                try {
                                    byteBuffer = b.this.d();
                                    if (byteBuffer != null) {
                                        LogUtils.b("接收到数据: " + byteBuffer);
                                        try {
                                            b.this.n.put(byteBuffer);
                                        } catch (InterruptedException e2) {
                                            LogUtils.b("接收服务端消息时，线程中断，接收到的数据可以在此处理，如：保存数据库等");
                                        }
                                    }
                                } catch (DataProtocolsException e3) {
                                    e3.printStackTrace();
                                    LogUtils.b("接收到的数据不符合协议: " + e3.getMessage());
                                    LogUtils.b("错误的数据: " + byteBuffer);
                                    b.this.p = true;
                                }
                            } else {
                                continue;
                            }
                        }
                    } finally {
                        try {
                            if (!b.this.p) {
                                b.this.close();
                            }
                        } catch (RemoteException e4) {
                        }
                        LogUtils.b("客户端监听线程结束...");
                        a();
                    }
                } catch (IOException e5) {
                    LogUtils.b("connect thread stoped: " + e5.getMessage());
                    return;
                } catch (ClosedSelectorException e6) {
                    LogUtils.b(".........selector is closed: " + e6.getMessage());
                    try {
                        if (!b.this.p) {
                            b.this.close();
                        }
                    } catch (RemoteException e7) {
                    }
                    LogUtils.b("客户端监听线程结束...");
                    a();
                    return;
                }
            }
            LogUtils.b(getName() + ": terminated because of connectThreadStoped = " + b.this.p);
            try {
                if (!b.this.p) {
                    b.this.close();
                }
            } catch (RemoteException e8) {
            }
            LogUtils.b("客户端监听线程结束...");
            a();
        }
    }

    static {
        l = !b.class.desiredAssertionStatus();
    }

    private ByteBuffer a(ByteBuffer byteBuffer) {
        ByteBuffer allocate = ByteBuffer.allocate(byteBuffer.remaining());
        allocate.put(byteBuffer);
        return allocate;
    }

    private boolean a(SocketChannel socketChannel, ByteBuffer byteBuffer, int i2) throws IOException {
        int read = socketChannel.read(byteBuffer);
        LogUtils.b("============= total: " + i2);
        if (read < i2) {
            LogUtils.b(">>>>>>>>>>> 读入的数据，比宣称的要少: " + read);
            a aVar = new a(i2);
            aVar.a(byteBuffer, read);
            this.r.put(Integer.valueOf(socketChannel.hashCode()), aVar);
            return false;
        }
        if (read > i2) {
            LogUtils.b(">>>>>>>>>>> 读入的数据，比宣称的要多: " + read + ",剩余的是另一个数据包");
            throw new UnsupportedOperationException();
        }
        if (read != i2) {
            return false;
        }
        LogUtils.b(">>>>>>>>>>> 读入的数据，和宣称的一样: " + read);
        byteBuffer.flip();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void b(Object obj) throws IOException {
        if (!l && obj == null) {
            throw new AssertionError();
        }
        if (obj instanceof byte[]) {
            byte[] encode = this.j.encode((byte[]) obj);
            LogUtils.b("发送的是字节，共: " + encode.length);
            this.f.write(ByteBuffer.wrap(encode));
        } else {
            if (!(obj instanceof ByteBuffer)) {
                throw new UnsupportedOperationException();
            }
            ByteBuffer encode2 = this.j.encode((ByteBuffer) obj);
            LogUtils.b("发送的是字节缓冲，共: " + encode2.remaining() + ", " + encode2);
            this.f.write(encode2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ByteBuffer d() throws IOException {
        int hashCode = this.f.hashCode();
        a aVar = this.r.get(Integer.valueOf(hashCode));
        ByteBuffer allocate = ByteBuffer.allocate(8192);
        if (aVar != null) {
            int a2 = aVar.a();
            if (a2 <= 0) {
                LogUtils.b("Processor.read()-没有剩余内容可以读取了");
                this.r.remove(Integer.valueOf(hashCode));
                return aVar.b();
            }
            int read = this.f.read(allocate);
            if (read < a2) {
                aVar.a(allocate, read);
                LogUtils.b("++++++++++ 接着读数据，length: " + read);
                return null;
            }
            if (read != a2) {
                return null;
            }
            aVar.a(allocate, read);
            LogUtils.b("++++++++++ 剩余内容读完，length: " + read);
            this.r.remove(Integer.valueOf(hashCode));
            return aVar.b();
        }
        ByteBuffer allocate2 = ByteBuffer.allocate(4);
        int read2 = this.f.read(allocate2);
        if (read2 <= 0) {
            LogUtils.b("xxxxxxxxxxx 读入数据格式错误，返回");
            int read3 = this.f.read(allocate);
            if (read3 <= 0) {
                LogUtils.b("xxxxxxxxxxx 尝试在读，但仍未读到消息内容");
                LogUtils.b("xxxxxxxxxxx 因为读取到空数据，可能存在链接异常，系统尝试关闭该连接 ============");
                close();
                return null;
            }
            LogUtils.b("xxxxxxxxxxx 尝试在读，消息字节数：" + read3);
            allocate.flip();
            String str = new String(allocate.array());
            if (read3 <= 8192) {
                LogUtils.b("xxxxxxxxxxx 尝试在读，一次读完：" + str);
            } else {
                LogUtils.b("xxxxxxxxxxx 尝试在读，内容过多：" + str);
            }
            return null;
        }
        allocate2.flip();
        if (read2 != 4) {
            throw new IllegalArgumentException("The four bytes ahead must be type of Integer(size of content)!");
        }
        int i2 = allocate2.getInt();
        if (i2 > 8192) {
            LogUtils.b("++++++++++ 第一次读取，超过最大限制，total: " + i2);
            a(this.f, hashCode, i2, allocate);
            return null;
        }
        ByteBuffer allocate3 = ByteBuffer.allocate(i2);
        if (!a(this.f, allocate3, i2)) {
            return null;
        }
        LogUtils.b("++++++++++ 第一次读取，一次读完，total: " + allocate3.limit());
        this.r.remove(Integer.valueOf(hashCode));
        return allocate3;
    }

    private void e() {
        if (this.f == null) {
            throw new IllegalStateException("客户端还未建立连接,必须先调用start()方法。");
        }
    }

    void a(SocketChannel socketChannel, int i2, int i3, ByteBuffer byteBuffer) throws IOException {
        int read = socketChannel.read(byteBuffer);
        a aVar = new a(i3);
        aVar.a(byteBuffer, read);
        this.r.put(Integer.valueOf(i2), aVar);
        LogUtils.b("...... 本次读入一次完成，等待下次，this size = " + read);
    }

    @Override // com.lantu.MobileCampus.haust.im.a
    protected void b() throws Exception {
        if (this.e == null) {
            throw new NullPointerException("remote adress is required!");
        }
        this.f = SocketChannel.open();
        this.f.configureBlocking(false);
        this.f.socket().setSoTimeout(0);
        this.g = Selector.open();
        this.f.register(this.g, 8);
        if (this.f.connect(this.e)) {
            LogUtils.b("客户端连接成功!");
        } else {
            LogUtils.b("客户端正在连接...");
        }
        if (this.q == null) {
            this.q = new C0052b();
            LogUtils.b("创建了一个客户端线程:connectThread = " + this.q);
        }
        this.q.a(Thread.currentThread());
    }

    @Override // com.lantu.MobileCampus.haust.im.a
    protected void c() throws Exception {
        if (this.q != null) {
            this.q.b();
            LogUtils.b("设置connectThread.setStop()...");
            this.q.interrupt();
        }
        this.q = null;
        if (this.f != null) {
            a(this.f, this.g);
            this.f = null;
            this.g = null;
            LogUtils.b("客户端连接已被关闭：stopConnector");
        } else {
            LogUtils.b("客户端已经关闭，调用无效: stopConnector");
        }
        this.n.clear();
        this.o.clear();
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public void close() throws RemoteException {
        try {
            LogUtils.b("客户端长连接被调用close()");
            stop();
        } catch (Exception e) {
            throw new RemoteException("close connector failed!", e);
        }
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public SocketChannel connect() throws RemoteException {
        e();
        if (this.p) {
            LogUtils.b("长连接客户端线程重新连接，设置connectThreadStoped = false;");
            this.p = false;
        }
        this.q.setDaemon(true);
        this.q.start();
        try {
            LogUtils.b("connectThread正在启动...");
            TimeUnit.SECONDS.sleep(1L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return this.f;
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public boolean isConnected() {
        return (this.p || this.f == null || !this.f.isConnected()) ? false : true;
    }

    @Override // com.walker.cheetah.client.transport.LongConnector
    public byte[] receivePushed() throws RemoteException, InterruptedException {
        if (this.p) {
            throw new RemoteException();
        }
        try {
            return this.n.take().array();
        } catch (InterruptedException e) {
            LogUtils.b("调用线程被中断，停止从'推送队列'获取数据");
            throw e;
        }
    }

    @Override // com.walker.cheetah.client.transport.Connector
    public Object send(Object obj) throws RemoteException {
        if (!l && obj == null) {
            throw new AssertionError();
        }
        e();
        if (this.p) {
            throw new RemoteException();
        }
        this.o.add(obj instanceof byte[] ? ByteBuffer.wrap((byte[]) obj) : obj instanceof ByteBuffer ? (ByteBuffer) obj : null);
        this.g.wakeup();
        return null;
    }
}
