package com.chinaway.framework.swordfish.push.message;

import com.chinaway.framework.swordfish.push.client.MessageStats;
import com.chinaway.framework.swordfish.push.exception.MqttCommandCancelledException;
import com.chinaway.framework.swordfish.push.exception.MqttException;
import com.chinaway.framework.swordfish.push.exception.MqttInterruptedException;
import com.chinaway.framework.swordfish.push.exception.MqttInvocationError;
import com.chinaway.framework.swordfish.push.exception.MqttInvocationException;
import com.chinaway.framework.swordfish.push.exception.MqttTimeoutException;
import com.chinaway.framework.swordfish.util.LogUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: classes.dex */
public final class ChannelManagerImpl implements ChannelManager {
    private final boolean blocking;
    private final long blockingTimeoutMillis;
    private final BlockingQueue<Command<?>> commands;
    private boolean doShutdown;
    private final Thread ioThread;
    private final long messageResendIntervalMillis;
    private final Set<MqttChannel> openChannels;
    private final CountDownLatch readyLatch;
    private final Selector selector;
    private final MessageStatsImpl stats;

    /* loaded from: classes.dex */
    private final class AttachChannelCommand extends Command<List<MqttMessage>> {
        private final MqttChannel channel;
        private final MessageHandler messageHandler;

        public AttachChannelCommand(MqttChannelRef mqttChannelRef, MessageHandler messageHandler) {
            super(true);
            this.messageHandler = messageHandler;
            this.channel = (MqttChannel) mqttChannelRef;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            this.channel.register(ChannelManagerImpl.this.selector, this.messageHandler);
            ChannelManagerImpl.this.addToOpenChannels(this.channel);
        }
    }

    /* loaded from: classes.dex */
    private final class CancelBlockingCommandsCommand extends Command<Void> {
        private final MqttChannel channel;

        public CancelBlockingCommandsCommand(MqttChannelRef mqttChannelRef) {
            super(true);
            this.channel = (MqttChannel) mqttChannelRef;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            this.channel.cancelBlockingCommands();
        }
    }

    /* loaded from: classes.dex */
    private final class CloseCommand extends Command<Void> {
        private final Throwable cause;
        private final MqttChannel channel;

        public CloseCommand(MqttChannelRef mqttChannelRef, Throwable th) {
            super(true);
            this.cause = th;
            this.channel = (MqttChannel) mqttChannelRef;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            this.channel.close(this.cause);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public abstract class Command<T> extends AbstractBlockingCommand<T> {
        private final boolean unblockImmediately;

        public Command(boolean z) {
            this.unblockImmediately = z;
        }
    }

    /* loaded from: classes.dex */
    private final class DetachChannelCommand extends Command<List<MqttMessage>> {
        private final MqttChannel channel;

        public DetachChannelCommand(MqttChannelRef mqttChannelRef) {
            super(true);
            this.channel = (MqttChannel) mqttChannelRef;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            ChannelManagerImpl.this.removeFromOpenChannels(this.channel);
            this.channel.deregister();
        }
    }

    /* loaded from: classes.dex */
    private final class GetStatsCommand extends Command<MessageStats> {
        private final boolean reset;

        public GetStatsCommand(boolean z) {
            super(true);
            this.reset = z;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            try {
                MessageStatsImpl m405clone = ChannelManagerImpl.this.stats.m405clone();
                if (this.reset) {
                    ChannelManagerImpl.this.stats.reset();
                }
                setResult(m405clone);
            } catch (Exception e) {
                LogUtils.e(e, "Unable to get a snapshot of the current stats.", new Object[0]);
            }
        }
    }

    /* loaded from: classes.dex */
    private final class GetUnsentMessagesCommand extends Command<List<MqttMessage>> {
        private final MqttChannel channel;

        public GetUnsentMessagesCommand(MqttChannelRef mqttChannelRef) {
            super(true);
            this.channel = (MqttChannel) mqttChannelRef;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            setResult(this.channel.getUnsentMessages());
        }
    }

    /* loaded from: classes.dex */
    private final class NewBrokerChannelCommand extends Command<MqttBrokerChannel> {
        private final MessageHandler messageHandler;
        private final SocketChannel socketChannel;

        public NewBrokerChannelCommand(SocketChannel socketChannel, MessageHandler messageHandler) {
            super(true);
            this.socketChannel = socketChannel;
            this.messageHandler = messageHandler;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            try {
                MqttBrokerChannel mqttBrokerChannel = new MqttBrokerChannel(this.socketChannel, this.messageHandler, ChannelManagerImpl.this.selector, ChannelManagerImpl.this.messageResendIntervalMillis, ChannelManagerImpl.this.stats);
                ChannelManagerImpl.this.addToOpenChannels(mqttBrokerChannel);
                setResult(mqttBrokerChannel);
            } catch (Exception e) {
                try {
                    this.socketChannel.close();
                } catch (IOException e2) {
                }
                throw new MqttException("MQTT broker channel creation failed", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public final class NewClientChannelCommand extends Command<MqttChannel> {
        private MqttChannel channel;
        private final String host;
        private final MessageHandler messageHandler;
        private final int port;

        public NewClientChannelCommand(String str, int i, MessageHandler messageHandler) {
            super(!ChannelManagerImpl.this.blocking);
            this.host = str;
            this.port = i;
            this.messageHandler = messageHandler;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) throws Exception {
            MqttClientChannel mqttClientChannel = new MqttClientChannel(this.host, this.port, this.messageHandler, ChannelManagerImpl.this.selector, ChannelManagerImpl.this.messageResendIntervalMillis, this, ChannelManagerImpl.this.stats);
            this.channel = new DelegatingMqttChannel(mqttClientChannel);
            ChannelManagerImpl.this.addToOpenChannels(mqttClientChannel);
            setResult(this.channel);
        }
    }

    /* loaded from: classes.dex */
    private final class SendCommand extends Command<MqttMessage> {
        private final MqttChannel channel;
        private final MqttMessage message;

        public SendCommand(MqttChannelRef mqttChannelRef, MqttMessage mqttMessage) {
            super(!ChannelManagerImpl.this.blocking);
            this.message = mqttMessage;
            this.channel = (MqttChannel) mqttChannelRef;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            this.channel.send(this.message, this);
        }
    }

    /* loaded from: classes.dex */
    private final class ShutdownCommand extends Command<Void> {
        public ShutdownCommand() {
            super(true);
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            ChannelManagerImpl.this.doShutdown = true;
        }
    }

    /* loaded from: classes.dex */
    private final class TransferCommand extends Command<Void> {
        private final DelegatingMqttChannel newChannel;
        private final DelegatingMqttChannel oldChannel;

        public TransferCommand(MqttChannelRef mqttChannelRef, MqttChannelRef mqttChannelRef2) {
            super(true);
            this.oldChannel = (DelegatingMqttChannel) mqttChannelRef;
            this.newChannel = (DelegatingMqttChannel) mqttChannelRef2;
        }

        @Override // com.chinaway.framework.swordfish.push.message.AbstractBlockingCommand
        public void doExecute(long j) {
            for (MqttMessage mqttMessage : this.oldChannel.getUnsentMessages()) {
                mqttMessage.blockingCommand.setFailureCause(null);
                this.newChannel.send(mqttMessage, mqttMessage.blockingCommand);
            }
            this.oldChannel.delegate = this.newChannel.delegate;
        }
    }

    public ChannelManagerImpl(long j) {
        this(j, -1);
    }

    public ChannelManagerImpl(long j, int i) {
        this.openChannels = new HashSet();
        this.commands = new LinkedBlockingQueue();
        this.readyLatch = new CountDownLatch(1);
        this.blocking = i >= 0;
        this.blockingTimeoutMillis = i <= 0 ? Long.MAX_VALUE : i * 1000;
        this.messageResendIntervalMillis = 1000 * j;
        this.stats = new MessageStatsImpl(this.openChannels);
        this.ioThread = new Thread(new Runnable() { // from class: com.chinaway.framework.swordfish.push.message.ChannelManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                ChannelManagerImpl.this.doIO();
            }
        }, "MqttChannelManager");
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            throw new MqttException("Failed to open selector", e);
        }
    }

    private <T, C extends Command<T>> C addCommand(C c) {
        this.commands.add(c);
        this.selector.wakeup();
        return c;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addToOpenChannels(MqttChannel mqttChannel) {
        if (mqttChannel instanceof DelegatingMqttChannel) {
            mqttChannel = ((DelegatingMqttChannel) mqttChannel).delegate;
        }
        this.openChannels.add(mqttChannel);
    }

    private void channelClosed(MqttChannel mqttChannel) {
        removeFromOpenChannels(mqttChannel);
    }

    private void closeAll() {
        LogUtils.d("Channel manager closing all channels");
        Iterator<MqttChannel> it = this.openChannels.iterator();
        while (it.hasNext()) {
            MqttChannel next = it.next();
            try {
                next.close();
            } catch (Exception e) {
            } finally {
                next.cancelBlockingCommands();
            }
        }
    }

    private void doConnect(long j, Set<SelectionKey> set) {
        Iterator<SelectionKey> it = set.iterator();
        while (it.hasNext()) {
            SelectionKey next = it.next();
            try {
                if (next.isConnectable()) {
                    MqttChannel mqttChannel = (MqttChannel) next.attachment();
                    if (!mqttChannel.finishConnect()) {
                        channelClosed(mqttChannel);
                        it.remove();
                    }
                }
            } catch (CancelledKeyException e) {
                it.remove();
            }
        }
    }

    private long doHouseKeeping(long j, Set<SelectionKey> set) {
        long j2 = Long.MAX_VALUE;
        Iterator<SelectionKey> it = set.iterator();
        while (it.hasNext()) {
            MqttChannel mqttChannel = (MqttChannel) it.next().attachment();
            long houseKeeping = mqttChannel.houseKeeping(j);
            if (houseKeeping < 0) {
                channelClosed(mqttChannel);
            } else if (houseKeeping < j2) {
                j2 = houseKeeping;
            }
        }
        return j2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doIO() {
        LogUtils.d("Channel manager thread started");
        this.readyLatch.countDown();
        long j = Long.MAX_VALUE;
        while (!this.doShutdown) {
            try {
                if (j == Long.MAX_VALUE) {
                    this.selector.select();
                } else {
                    this.selector.select(j);
                }
                long currentTimeMillis = System.currentTimeMillis();
                executeCommands(currentTimeMillis);
                Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
                doConnect(currentTimeMillis, selectedKeys);
                doRead(currentTimeMillis, selectedKeys);
                doWrite(currentTimeMillis, selectedKeys);
                j = doHouseKeeping(currentTimeMillis, this.selector.keys());
                selectedKeys.clear();
            } catch (Throwable th) {
                LogUtils.e(th, "Channel manager thread caught a fatal exception and is dying", new Object[0]);
            }
        }
        LogUtils.d("Channel manager thread stopping");
        closeAll();
        try {
            this.selector.close();
        } catch (Exception e) {
        }
        startCleanupThread();
    }

    private void doRead(long j, Set<SelectionKey> set) {
        Iterator<SelectionKey> it = set.iterator();
        while (it.hasNext()) {
            SelectionKey next = it.next();
            try {
                if (next.isReadable()) {
                    MqttChannel mqttChannel = (MqttChannel) next.attachment();
                    if (!mqttChannel.read(j)) {
                        channelClosed(mqttChannel);
                        it.remove();
                    }
                }
            } catch (CancelledKeyException e) {
                it.remove();
            }
        }
    }

    private void doWrite(long j, Set<SelectionKey> set) {
        Iterator<SelectionKey> it = set.iterator();
        while (it.hasNext()) {
            SelectionKey next = it.next();
            try {
                if (next.isWritable()) {
                    MqttChannel mqttChannel = (MqttChannel) next.attachment();
                    if (!mqttChannel.write(j)) {
                        channelClosed(mqttChannel);
                        it.remove();
                    }
                }
            } catch (CancelledKeyException e) {
                it.remove();
            }
        }
    }

    private void executeCommands(long j) {
        int size = this.commands.size();
        for (int i = 0; i < size; i++) {
            Command<?> poll = this.commands.poll();
            if (poll == null) {
                return;
            }
            poll.execute(j);
            if (((Command) poll).unblockImmediately) {
                poll.complete();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeFromOpenChannels(MqttChannel mqttChannel) {
        if (mqttChannel instanceof DelegatingMqttChannel) {
            mqttChannel = ((DelegatingMqttChannel) mqttChannel).delegate;
        }
        this.openChannels.remove(mqttChannel);
    }

    private void startCleanupThread() {
        Thread thread = new Thread("CommandCleanup") { // from class: com.chinaway.framework.swordfish.push.message.ChannelManagerImpl.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        ((Command) ChannelManagerImpl.this.commands.take()).cancel();
                    } catch (Exception e) {
                        return;
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void attachChannel(MqttChannelRef mqttChannelRef, MessageHandler messageHandler) throws MqttCommandCancelledException, MqttTimeoutException, MqttInterruptedException, MqttInvocationException, MqttInvocationError {
        ((AttachChannelCommand) addCommand(new AttachChannelCommand(mqttChannelRef, messageHandler))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void cancelBlockingCommands(MqttChannelRef mqttChannelRef) {
        ((CancelBlockingCommandsCommand) addCommand(new CancelBlockingCommandsCommand(mqttChannelRef))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void close(MqttChannelRef mqttChannelRef) throws MqttInterruptedException {
        ((CloseCommand) addCommand(new CloseCommand(mqttChannelRef, null))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void close(MqttChannelRef mqttChannelRef, Throwable th) {
        ((CloseCommand) addCommand(new CloseCommand(mqttChannelRef, th))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void detachChannel(MqttChannelRef mqttChannelRef) throws MqttCommandCancelledException, MqttTimeoutException, MqttInterruptedException, MqttInvocationException, MqttInvocationError {
        DetachChannelCommand detachChannelCommand = new DetachChannelCommand(mqttChannelRef);
        if (Thread.currentThread() == this.ioThread) {
            detachChannelCommand.execute(0L);
        } else {
            ((DetachChannelCommand) addCommand(detachChannelCommand)).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
        }
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public MessageStats getStats(boolean z) {
        return ((GetStatsCommand) addCommand(new GetStatsCommand(z))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public List<MqttMessage> getUnsentMessages(MqttChannelRef mqttChannelRef) {
        return ((GetUnsentMessagesCommand) addCommand(new GetUnsentMessagesCommand(mqttChannelRef))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void init() {
        this.ioThread.start();
        while (true) {
            try {
                this.readyLatch.await();
                return;
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public boolean isRunning() {
        return this.ioThread.isAlive();
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public MqttChannelRef newBrokerChannel(SocketChannel socketChannel, MessageHandler messageHandler) throws MqttInterruptedException {
        return ((NewBrokerChannelCommand) addCommand(new NewBrokerChannelCommand(socketChannel, messageHandler))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public MqttChannelRef newClientChannel(String str, int i, MessageHandler messageHandler) throws MqttInterruptedException {
        return ((NewClientChannelCommand) addCommand(new NewClientChannelCommand(str, i, messageHandler))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public MqttChannelRef newClientChannel(String str, MessageHandler messageHandler) throws MqttInterruptedException {
        try {
            return newClientChannel(new URI(str), messageHandler);
        } catch (URISyntaxException e) {
            throw new MqttException("Failed to parse broker URI: " + str, e);
        }
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public MqttChannelRef newClientChannel(URI uri, MessageHandler messageHandler) throws MqttInterruptedException {
        if ("tcp".equals(uri.getScheme())) {
            return newClientChannel(uri.getHost(), uri.getPort(), messageHandler);
        }
        throw new MqttException("Invalid broker URI (scheme must be 'tcp'): " + uri);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public <T extends MqttMessage> T send(MqttChannelRef mqttChannelRef, MqttMessage mqttMessage) throws MqttInterruptedException {
        return (T) ((SendCommand) addCommand(new SendCommand(mqttChannelRef, mqttMessage))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void shutdown() {
        addCommand(new ShutdownCommand());
        try {
            this.ioThread.join();
        } catch (InterruptedException e) {
            this.ioThread.interrupt();
        }
    }

    @Override // com.chinaway.framework.swordfish.push.message.ChannelManager
    public void transfer(MqttChannelRef mqttChannelRef, MqttChannelRef mqttChannelRef2) {
        ((TransferCommand) addCommand(new TransferCommand(mqttChannelRef, mqttChannelRef2))).await(this.blockingTimeoutMillis, TimeUnit.MILLISECONDS);
    }
}
