package org.zeromq;

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.zeromq.ZMQ;

/* loaded from: classes.dex */
public class ZDispatcher {
    private final ExecutorService dispatcherExecutor;
    private ConcurrentMap<ZMQ.Socket, SocketDispatcher> dispatchers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public static final class SocketDispatcher implements Runnable {
        private static final int BUFFER_SIZE = 1024;
        private static final ThreadLocal<ZMessageBuffer> messages = new ThreadLocal<ZMessageBuffer>() { // from class: org.zeromq.ZDispatcher.SocketDispatcher.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // java.lang.ThreadLocal
            public ZMessageBuffer initialValue() {
                return new ZMessageBuffer();
            }
        };
        private final ZMessageHandler handler;
        private final ZSender sender;
        private final ZMQ.Socket socket;
        private final ExecutorService threadpool;
        private volatile boolean active = false;
        private final CountDownLatch shutdownLatch = new CountDownLatch(1);
        private final BlockingQueue<ZMsg> in = new LinkedBlockingQueue();
        private final AtomicBoolean busy = new AtomicBoolean(false);

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: classes.dex */
        public static class ZMessageBuffer {
            private final ZMsg[] buffer;
            private int lastValidIndex;

            private ZMessageBuffer() {
                this.buffer = new ZMsg[SocketDispatcher.BUFFER_SIZE];
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void drainFrom(BlockingQueue<ZMsg> blockingQueue) {
                ZMsg poll;
                int i = -1;
                this.lastValidIndex = -1;
                while (true) {
                    int i2 = i + 1;
                    if (i2 >= this.buffer.length || (poll = blockingQueue.poll()) == null) {
                        return;
                    }
                    this.buffer[i2] = poll;
                    this.lastValidIndex = i2;
                    i = i2;
                }
            }
        }

        public SocketDispatcher(ZMQ.Socket socket, ZMessageHandler zMessageHandler, ZSender zSender, ExecutorService executorService) {
            this.socket = socket;
            this.handler = zMessageHandler;
            this.sender = zSender;
            this.threadpool = executorService;
        }

        private void doHandle() {
            if (this.in.isEmpty() || !this.busy.compareAndSet(false, true)) {
                return;
            }
            this.threadpool.submit(new Runnable() { // from class: org.zeromq.ZDispatcher.SocketDispatcher.2
                @Override // java.lang.Runnable
                public void run() {
                    SocketDispatcher socketDispatcher = SocketDispatcher.this;
                    ZMessageBuffer zMessageBuffer = (ZMessageBuffer) SocketDispatcher.messages.get();
                    zMessageBuffer.drainFrom(SocketDispatcher.this.in);
                    SocketDispatcher.this.busy.set(false);
                    for (int i = 0; i <= zMessageBuffer.lastValidIndex; i++) {
                        if (SocketDispatcher.this.active) {
                            SocketDispatcher.this.handler.handleMessage(SocketDispatcher.this.sender, zMessageBuffer.buffer[i]);
                        }
                    }
                }
            });
        }

        private void doReceive() {
            ZMsg recvMsg;
            int i = BUFFER_SIZE;
            while (this.active) {
                int i2 = i - 1;
                if (i <= 0 || (recvMsg = ZMsg.recvMsg(this.socket, 1)) == null || recvMsg.size() <= 0 || !recvMsg.getFirst().hasData()) {
                    return;
                }
                this.in.add(recvMsg);
                i = i2;
            }
        }

        private void doSend() {
            ZMsg zMsg;
            int i = BUFFER_SIZE;
            while (this.active) {
                int i2 = i - 1;
                if (i <= 0 || (zMsg = (ZMsg) this.sender.out.poll()) == null) {
                    return;
                }
                zMsg.send(this.socket);
                i = i2;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.active) {
                doReceive();
                doHandle();
                doSend();
            }
            this.threadpool.shutdown();
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.active = false;
                this.shutdownLatch.await();
            } catch (InterruptedException e) {
            }
        }

        public void start() {
            this.active = true;
        }
    }

    /* loaded from: classes.dex */
    public interface ZMessageHandler {
        void handleMessage(ZSender zSender, ZMsg zMsg);
    }

    /* loaded from: classes.dex */
    public static final class ZSender {
        private final BlockingQueue<ZMsg> out = new LinkedBlockingQueue();

        public final boolean send(ZMsg zMsg) {
            return this.out.add(zMsg);
        }
    }

    public ZDispatcher() {
        this.dispatchers = new ConcurrentHashMap();
        this.dispatcherExecutor = Executors.newCachedThreadPool();
    }

    public ZDispatcher(ExecutorService executorService) {
        this.dispatchers = new ConcurrentHashMap();
        this.dispatcherExecutor = executorService;
    }

    public void registerHandler(ZMQ.Socket socket, ZMessageHandler zMessageHandler, ZSender zSender) {
        registerHandler(socket, zMessageHandler, zSender, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    }

    public void registerHandler(ZMQ.Socket socket, ZMessageHandler zMessageHandler, ZSender zSender, ExecutorService executorService) {
        SocketDispatcher socketDispatcher = new SocketDispatcher(socket, zMessageHandler, zSender, executorService);
        if (this.dispatchers.putIfAbsent(socket, socketDispatcher) != null) {
            throw new IllegalArgumentException("This socket already have a message handler");
        }
        socketDispatcher.start();
        this.dispatcherExecutor.execute(socketDispatcher);
    }

    public void shutdown() {
        this.dispatcherExecutor.shutdown();
        Iterator<SocketDispatcher> it = this.dispatchers.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.dispatchers.clear();
    }

    public void unregisterHandler(ZMQ.Socket socket) {
        SocketDispatcher remove = this.dispatchers.remove(socket);
        if (remove == null) {
            throw new IllegalArgumentException("This socket doesn't have a message handler");
        }
        remove.shutdown();
    }
}
