package org.fusesource.mqtt.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.HexSupport;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
import org.fusesource.hawtdispatch.transport.HeartBeatMonitor;
import org.fusesource.hawtdispatch.transport.SslTransport;
import org.fusesource.hawtdispatch.transport.TcpTransport;
import org.fusesource.hawtdispatch.transport.Transport;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.MQTTProtocolCodec;
import org.fusesource.mqtt.codec.MessageSupport;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;

/* loaded from: classes5.dex */
public class CallbackConnection {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    private static final ExtendedListener DEFAULT_LISTENER = new ExtendedListener() { // from class: org.fusesource.mqtt.client.CallbackConnection.1
        @Override // org.fusesource.mqtt.client.Listener
        public void onConnected() {
        }

        @Override // org.fusesource.mqtt.client.Listener
        public void onDisconnected() {
        }

        @Override // org.fusesource.mqtt.client.Listener
        public void onFailure(Throwable th) {
            Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), th);
        }

        @Override // org.fusesource.mqtt.client.Listener
        public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Runnable runnable) {
            onFailure(CallbackConnection.access$000());
        }

        @Override // org.fusesource.mqtt.client.ExtendedListener
        public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Callback<Callback<Void>> callback) {
            onFailure(CallbackConnection.access$000());
        }
    };
    public static final Task NOOP = Dispatch.NOOP;
    private Throwable failure;
    private HeartBeatMonitor heartBeatMonitor;
    private final MQTT mqtt;
    private long pingedAt;
    private final DispatchQueue queue;
    private Runnable refiller;
    private Transport transport;
    private ExtendedListener listener = DEFAULT_LISTENER;
    private Map<Short, Request> requests = new ConcurrentHashMap();
    private LinkedList<Request> overflow = new LinkedList<>();
    private final HashMap<Short, Callback<Void>> processed = new HashMap<>();
    private boolean disconnected = false;
    private long reconnects = 0;
    private final AtomicInteger suspendCount = new AtomicInteger(0);
    private final AtomicInteger suspendChanges = new AtomicInteger(0);
    private final HashMap<UTF8Buffer, QoS> activeSubs = new HashMap<>();
    private boolean onRefillCalled = false;
    private short nextMessageId = 1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.fusesource.mqtt.client.CallbackConnection$17, reason: invalid class name */
    /* loaded from: classes5.dex */
    public static /* synthetic */ class AnonymousClass17 {
        static final /* synthetic */ int[] $SwitchMap$org$fusesource$mqtt$codec$CONNACK$Code;

        static {
            try {
                $SwitchMap$org$fusesource$mqtt$client$QoS[QoS.AT_LEAST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError unused) {
            }
            try {
                $SwitchMap$org$fusesource$mqtt$client$QoS[QoS.EXACTLY_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError unused2) {
            }
            try {
                $SwitchMap$org$fusesource$mqtt$client$QoS[QoS.AT_MOST_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError unused3) {
            }
            $SwitchMap$org$fusesource$mqtt$codec$CONNACK$Code = new int[CONNACK.Code.values().length];
            try {
                $SwitchMap$org$fusesource$mqtt$codec$CONNACK$Code[CONNACK.Code.CONNECTION_ACCEPTED.ordinal()] = 1;
            } catch (NoSuchFieldError unused4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes5.dex */
    public class LoginHandler implements Callback<Transport> {
        static final /* synthetic */ boolean $assertionsDisabled = false;
        private final Callback<Void> cb;
        private final boolean initialConnect;

        LoginHandler(Callback<Void> callback, boolean z) {
            this.cb = callback;
            this.initialConnect = z;
        }

        private boolean tryReconnect() {
            return this.initialConnect ? CallbackConnection.this.mqtt.connectAttemptsMax < 0 || CallbackConnection.this.reconnects < CallbackConnection.this.mqtt.connectAttemptsMax : CallbackConnection.this.mqtt.reconnectAttemptsMax < 0 || CallbackConnection.this.reconnects < CallbackConnection.this.mqtt.reconnectAttemptsMax;
        }

        @Override // org.fusesource.mqtt.client.Callback
        public void onFailure(Throwable th) {
            if (CallbackConnection.this.disconnected || !tryReconnect()) {
                this.cb.onFailure(th);
            } else {
                CallbackConnection.this.reconnect(this);
            }
        }

        @Override // org.fusesource.mqtt.client.Callback
        public void onSuccess(final Transport transport) {
            transport.setTransportListener(new DefaultTransportListener() { // from class: org.fusesource.mqtt.client.CallbackConnection.LoginHandler.1
                @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
                public void onTransportCommand(Object obj) {
                    MQTTFrame mQTTFrame = (MQTTFrame) obj;
                    CallbackConnection.this.mqtt.tracer.onReceive(mQTTFrame);
                    try {
                        if (mQTTFrame.messageType() != 2) {
                            CallbackConnection.this.mqtt.tracer.debug("Received unexpected MQTT frame: %d", Byte.valueOf(mQTTFrame.messageType()));
                            transport.stop(CallbackConnection.NOOP);
                            LoginHandler.this.cb.onFailure(new IOException("Could not connect. Received unexpected command: " + ((int) mQTTFrame.messageType())));
                            return;
                        }
                        CONNACK mo190decode = new CONNACK().mo190decode(mQTTFrame);
                        if (AnonymousClass17.$SwitchMap$org$fusesource$mqtt$codec$CONNACK$Code[mo190decode.code().ordinal()] != 1) {
                            CallbackConnection.this.mqtt.tracer.debug("MQTT login rejected", new Object[0]);
                            transport.stop(CallbackConnection.NOOP);
                            LoginHandler.this.cb.onFailure(new MQTTException("Could not connect: " + mo190decode.code(), mo190decode));
                            return;
                        }
                        CallbackConnection.this.mqtt.tracer.debug("MQTT login accepted", new Object[0]);
                        if (transport != null) {
                            CallbackConnection.this.onSessionEstablished(transport);
                            LoginHandler.this.cb.onSuccess(null);
                            CallbackConnection.this.listener.onConnected();
                            CallbackConnection.this.queue.execute(new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.LoginHandler.1.1
                                @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                                public void run() {
                                    CallbackConnection.this.drainOverflow();
                                }
                            });
                            return;
                        }
                        CallbackConnection.this.mqtt.tracer.debug("transport is null", new Object[0]);
                        LoginHandler.this.cb.onFailure(new MQTTException("transport is null: " + mo190decode.code(), mo190decode));
                    } catch (ProtocolException e) {
                        CallbackConnection.this.mqtt.tracer.debug("Protocol error: %s", e);
                        transport.stop(CallbackConnection.NOOP);
                        LoginHandler.this.cb.onFailure(e);
                    }
                }

                @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
                public void onTransportFailure(IOException iOException) {
                    CallbackConnection.this.mqtt.tracer.debug("Transport failure: %s", iOException);
                    transport.stop(CallbackConnection.NOOP);
                    LoginHandler.this.onFailure(iOException);
                }
            });
            transport.resumeRead();
            if (CallbackConnection.this.mqtt.connect.clientId() == null) {
                String str = CallbackConnection.hex(transport.getLocalAddress()) + Long.toHexString(System.currentTimeMillis() / 1000);
                if (str.length() > 23) {
                    str = str.substring(0, 23);
                }
                CallbackConnection.this.mqtt.connect.clientId(Buffer.utf8(str));
            }
            MQTTFrame encode = CallbackConnection.this.mqtt.connect.encode();
            transport.offer(encode);
            CallbackConnection.this.mqtt.tracer.onSend(encode);
            CallbackConnection.this.mqtt.tracer.debug("Logging in", new Object[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes5.dex */
    public static class Request {
        private final Callback cb;
        private final MQTTFrame frame;
        private final short id;

        Request(int i, MQTTFrame mQTTFrame, Callback callback) {
            this.id = (short) i;
            this.cb = callback;
            this.frame = mQTTFrame;
        }
    }

    public CallbackConnection(MQTT mqtt) {
        this.mqtt = mqtt;
        if (this.mqtt.dispatchQueue == null) {
            this.queue = Dispatch.createQueue("mqtt client");
        } else {
            this.queue = this.mqtt.dispatchQueue;
        }
    }

    static /* synthetic */ IllegalStateException access$000() {
        return createListenerNotSetError();
    }

    static /* synthetic */ IllegalStateException access$1100() {
        return createDisconnectedError();
    }

    private void completeRequest(short s, byte b, Object obj) {
        Request remove = this.requests.remove(Short.valueOf(s));
        if (remove == null) {
            handleFatalFailure(new ProtocolException("Command from server contained an invalid message id: " + ((int) s)));
            return;
        }
        if (remove.cb != null) {
            if (obj == null) {
                remove.cb.onSuccess(null);
            } else {
                remove.cb.onSuccess(obj);
            }
        }
    }

    private static IllegalStateException createDisconnectedError() {
        return (IllegalStateException) new IllegalStateException("Disconnected").fillInStackTrace();
    }

    private static IllegalStateException createListenerNotSetError() {
        return (IllegalStateException) new IllegalStateException("No connection listener set to handle message received from the server.").fillInStackTrace();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainOverflow() {
        Runnable runnable;
        this.queue.assertExecuting();
        if (this.overflow.isEmpty() || this.transport == null) {
            return;
        }
        while (true) {
            Request peek = this.overflow.peek();
            if (peek == null || !this.transport.offer(peek.frame)) {
                break;
            }
            this.mqtt.tracer.onSend(peek.frame);
            this.overflow.removeFirst();
            if (peek.id != 0) {
                this.requests.put(Short.valueOf(peek.id), peek);
            } else if (peek.cb != null) {
                peek.cb.onSuccess(null);
            }
        }
        if (!this.overflow.isEmpty() || (runnable = this.refiller) == null) {
            return;
        }
        try {
            runnable.run();
        } catch (Throwable th) {
            Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), th);
        }
    }

    private short getNextMessageId() {
        short s = this.nextMessageId;
        this.nextMessageId = (short) (s + 1);
        if (this.nextMessageId == 0) {
            this.nextMessageId = (short) 1;
        }
        return s;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFatalFailure(Throwable th) {
        if (this.failure == null) {
            this.failure = th;
            this.mqtt.tracer.debug("Fatal connection failure: %s", th);
            ArrayList arrayList = new ArrayList(this.requests.values());
            this.requests.clear();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Request request = (Request) it.next();
                if (request.cb != null) {
                    request.cb.onFailure(this.failure);
                }
            }
            try {
                if (this.overflow != null) {
                    ArrayList arrayList2 = new ArrayList(this.overflow);
                    this.overflow.clear();
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        Request request2 = (Request) it2.next();
                        if (request2 != null && request2.cb != null) {
                            request2.cb.onFailure(this.failure);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            ExtendedListener extendedListener = this.listener;
            if (extendedListener == null || this.disconnected) {
                return;
            }
            try {
                extendedListener.onFailure(this.failure);
            } catch (Exception e2) {
                Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String hex(SocketAddress socketAddress) {
        if (!(socketAddress instanceof InetSocketAddress)) {
            return "";
        }
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        return HexSupport.toHexFromBuffer(new Buffer(inetSocketAddress.getAddress().getAddress())) + Integer.toHexString(inetSocketAddress.getPort());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processFrame(MQTTFrame mQTTFrame) {
        try {
            switch (mQTTFrame.messageType()) {
                case 3:
                    toReceiver(new PUBLISH().mo190decode(mQTTFrame));
                    return;
                case 4:
                    completeRequest(new PUBACK().mo190decode(mQTTFrame).messageId(), (byte) 3, null);
                    return;
                case 5:
                    PUBREC mo190decode = new PUBREC().mo190decode(mQTTFrame);
                    PUBREL pubrel = new PUBREL();
                    pubrel.messageId(mo190decode.messageId());
                    send(new Request(0, pubrel.encode(), null));
                    return;
                case 6:
                    PUBREL mo190decode2 = new PUBREL().mo190decode(mQTTFrame);
                    Callback<Void> remove = this.processed.remove(Short.valueOf(mo190decode2.messageId()));
                    PUBCOMP pubcomp = new PUBCOMP();
                    pubcomp.messageId(mo190decode2.messageId());
                    send(new Request(0, pubcomp.encode(), null));
                    if (remove != null) {
                        remove.onSuccess(null);
                        return;
                    }
                    return;
                case 7:
                    completeRequest(new PUBCOMP().mo190decode(mQTTFrame).messageId(), (byte) 3, null);
                    return;
                case 8:
                case 10:
                case 12:
                default:
                    throw new ProtocolException("Unexpected MQTT command type: " + ((int) mQTTFrame.messageType()));
                case 9:
                    SUBACK mo190decode3 = new SUBACK().mo190decode(mQTTFrame);
                    completeRequest(mo190decode3.messageId(), (byte) 8, mo190decode3.grantedQos());
                    return;
                case 11:
                    completeRequest(new UNSUBACK().mo190decode(mQTTFrame).messageId(), (byte) 10, null);
                    return;
                case 13:
                    this.pingedAt = 0L;
                    return;
            }
        } catch (Throwable th) {
            handleFatalFailure(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(Request request) {
        Transport transport;
        if (this.failure != null) {
            if (request.cb != null) {
                request.cb.onFailure(this.failure);
                return;
            }
            return;
        }
        if (request.id != 0) {
            this.requests.put(Short.valueOf(request.id), request);
        }
        if (!this.overflow.isEmpty() || (transport = this.transport) == null || !transport.offer(request.frame)) {
            this.requests.remove(Short.valueOf(request.id));
            this.overflow.addLast(request);
            return;
        }
        this.mqtt.tracer.onSend(request.frame);
        if (request.id != 0 || request.cb == null) {
            return;
        }
        request.cb.onSuccess(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(MessageSupport.Acked acked, Callback callback) {
        short s;
        if (acked.qos() != QoS.AT_MOST_ONCE) {
            s = getNextMessageId();
            acked.messageId(s);
        } else {
            s = 0;
        }
        send(new Request(s, acked.encode(), callback));
    }

    private void toReceiver(final PUBLISH publish) {
        if (this.listener != null) {
            Callback<Callback<Void>> callback = null;
            try {
                switch (publish.qos()) {
                    case AT_LEAST_ONCE:
                        callback = new Callback<Callback<Void>>() { // from class: org.fusesource.mqtt.client.CallbackConnection.14
                            @Override // org.fusesource.mqtt.client.Callback
                            public void onFailure(Throwable th) {
                            }

                            @Override // org.fusesource.mqtt.client.Callback
                            public void onSuccess(Callback<Void> callback2) {
                                PUBACK puback = new PUBACK();
                                puback.messageId(publish.messageId());
                                CallbackConnection.this.send(new Request(0, puback.encode(), null));
                                if (callback2 != null) {
                                    callback2.onSuccess(null);
                                }
                            }
                        };
                        break;
                    case EXACTLY_ONCE:
                        callback = new Callback<Callback<Void>>() { // from class: org.fusesource.mqtt.client.CallbackConnection.15
                            @Override // org.fusesource.mqtt.client.Callback
                            public void onFailure(Throwable th) {
                            }

                            @Override // org.fusesource.mqtt.client.Callback
                            public void onSuccess(Callback<Void> callback2) {
                                PUBREC pubrec = new PUBREC();
                                pubrec.messageId(publish.messageId());
                                CallbackConnection.this.processed.put(Short.valueOf(publish.messageId()), callback2);
                                CallbackConnection.this.send(new Request(0, pubrec.encode(), null));
                            }
                        };
                        if (this.processed.get(Short.valueOf(publish.messageId())) != null) {
                            return;
                        }
                        break;
                    case AT_MOST_ONCE:
                        callback = new Callback<Callback<Void>>() { // from class: org.fusesource.mqtt.client.CallbackConnection.16
                            @Override // org.fusesource.mqtt.client.Callback
                            public void onFailure(Throwable th) {
                            }

                            @Override // org.fusesource.mqtt.client.Callback
                            public void onSuccess(Callback<Void> callback2) {
                                if (callback2 != null) {
                                    callback2.onSuccess(null);
                                }
                            }
                        };
                        break;
                }
                this.listener.onPublish(publish.topicName(), publish.payload(), callback);
            } catch (Throwable th) {
                handleFatalFailure(th);
            }
        }
    }

    public void connect(Callback<Void> callback) {
        if (this.transport != null) {
            callback.onFailure(new IllegalStateException("Already connected"));
            return;
        }
        try {
            createTransport(new LoginHandler(callback, true));
        } catch (Throwable th) {
            callback.onFailure(th);
        }
    }

    void createTransport(final Callback<Transport> callback) throws Exception {
        final Transport transport;
        this.mqtt.tracer.debug("Connecting", new Object[0]);
        String scheme = this.mqtt.host.getScheme();
        if ("tcp".equals(scheme)) {
            transport = new TcpTransport();
        } else {
            if (SslTransport.protocol(scheme) == null) {
                throw new Exception("Unsupported URI scheme '" + scheme + "'");
            }
            SslTransport sslTransport = new SslTransport();
            if (this.mqtt.sslContext == null) {
                this.mqtt.sslContext = SSLContext.getDefault();
            }
            sslTransport.setSSLContext(this.mqtt.sslContext);
            transport = sslTransport;
        }
        if (this.mqtt.blockingExecutor == null) {
            this.mqtt.blockingExecutor = MQTT.getBlockingThreadPool();
        }
        transport.setBlockingExecutor(this.mqtt.blockingExecutor);
        transport.setDispatchQueue(this.queue);
        transport.setProtocolCodec(new MQTTProtocolCodec());
        if (transport instanceof TcpTransport) {
            SslTransport sslTransport2 = (TcpTransport) transport;
            sslTransport2.setMaxReadRate(this.mqtt.maxReadRate);
            sslTransport2.setMaxWriteRate(this.mqtt.maxWriteRate);
            sslTransport2.setReceiveBufferSize(this.mqtt.receiveBufferSize);
            sslTransport2.setSendBufferSize(this.mqtt.sendBufferSize);
            sslTransport2.setTrafficClass(this.mqtt.trafficClass);
            sslTransport2.setUseLocalHost(this.mqtt.useLocalHost);
            sslTransport2.connecting(this.mqtt.host, this.mqtt.localAddress);
        }
        transport.setTransportListener(new DefaultTransportListener() { // from class: org.fusesource.mqtt.client.CallbackConnection.5
            private void onFailure(final Throwable th) {
                if (transport.isClosed()) {
                    return;
                }
                transport.stop(new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.5.1
                    @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                    public void run() {
                        callback.onFailure(th);
                    }
                });
            }

            @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
            public void onTransportConnected() {
                CallbackConnection.this.mqtt.tracer.debug("Transport connected", new Object[0]);
                if (CallbackConnection.this.disconnected) {
                    onFailure(CallbackConnection.access$1100());
                } else {
                    callback.onSuccess(transport);
                }
            }

            @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
            public void onTransportFailure(IOException iOException) {
                CallbackConnection.this.mqtt.tracer.debug("Transport failure: %s", iOException);
                onFailure(iOException);
            }
        });
        transport.start(NOOP);
    }

    public void disconnect(final Callback<Void> callback) {
        if (this.disconnected) {
            if (callback != null) {
                callback.onSuccess(null);
                return;
            }
            return;
        }
        this.disconnected = true;
        final short nextMessageId = getNextMessageId();
        final Runnable runnable = new Runnable() { // from class: org.fusesource.mqtt.client.CallbackConnection.9
            private boolean executed = false;

            @Override // java.lang.Runnable
            public void run() {
                if (this.executed) {
                    return;
                }
                this.executed = true;
                CallbackConnection.this.requests.remove(Short.valueOf(nextMessageId));
                if (CallbackConnection.this.heartBeatMonitor != null) {
                    CallbackConnection.this.heartBeatMonitor.stop();
                    CallbackConnection.this.heartBeatMonitor = null;
                }
                if (CallbackConnection.this.transport != null) {
                    CallbackConnection.this.transport.stop(new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.9.1
                        @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                        public void run() {
                            CallbackConnection.this.listener.onDisconnected();
                            if (callback != null) {
                                callback.onSuccess(null);
                            }
                        }
                    });
                }
            }
        };
        Callback<Void> callback2 = new Callback<Void>() { // from class: org.fusesource.mqtt.client.CallbackConnection.10
            @Override // org.fusesource.mqtt.client.Callback
            public void onFailure(Throwable th) {
                runnable.run();
            }

            @Override // org.fusesource.mqtt.client.Callback
            public void onSuccess(Void r2) {
                CallbackConnection.this.onRefillCalled = false;
                CallbackConnection.this.refiller = new Runnable() { // from class: org.fusesource.mqtt.client.CallbackConnection.10.1
                    @Override // java.lang.Runnable
                    public void run() {
                        if (CallbackConnection.this.onRefillCalled) {
                            runnable.run();
                        }
                    }
                };
                if (CallbackConnection.this.transport != null) {
                    CallbackConnection.this.transport.flush();
                }
            }
        };
        if (this.transport == null) {
            callback2.onSuccess(null);
        } else {
            send(new Request(getNextMessageId(), new DISCONNECT().encode(), callback2));
        }
    }

    public Throwable failure() {
        this.queue.assertExecuting();
        return this.failure;
    }

    public boolean full() {
        this.queue.assertExecuting();
        return this.transport.full();
    }

    public DispatchQueue getDispatchQueue() {
        return this.queue;
    }

    void handleSessionFailure(Throwable th) {
        if (this.disconnected || (this.mqtt.reconnectAttemptsMax >= 0 && this.reconnects >= this.mqtt.reconnectAttemptsMax)) {
            handleFatalFailure(th);
            return;
        }
        this.mqtt.tracer.debug("Reconnecting transport", new Object[0]);
        HeartBeatMonitor heartBeatMonitor = this.heartBeatMonitor;
        if (heartBeatMonitor != null) {
            heartBeatMonitor.stop();
            this.heartBeatMonitor = null;
        }
        Transport transport = this.transport;
        this.transport = null;
        if (transport != null) {
            transport.stop(new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.3
                @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                public void run() {
                    CallbackConnection.this.listener.onDisconnected();
                    CallbackConnection.this.reconnect();
                }
            });
        } else {
            reconnect();
        }
    }

    public void kill(final Callback<Void> callback) {
        if (this.disconnected) {
            if (callback != null) {
                callback.onSuccess(null);
            }
        } else {
            this.disconnected = true;
            HeartBeatMonitor heartBeatMonitor = this.heartBeatMonitor;
            if (heartBeatMonitor != null) {
                heartBeatMonitor.stop();
                this.heartBeatMonitor = null;
            }
            this.transport.stop(new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.11
                @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                public void run() {
                    CallbackConnection.this.listener.onDisconnected();
                    Callback callback2 = callback;
                    if (callback2 != null) {
                        callback2.onSuccess(null);
                    }
                }
            });
        }
    }

    public CallbackConnection listener(final Listener listener) {
        if (listener instanceof ExtendedListener) {
            this.listener = (ExtendedListener) listener;
        } else {
            this.listener = new ExtendedListener() { // from class: org.fusesource.mqtt.client.CallbackConnection.8
                @Override // org.fusesource.mqtt.client.Listener
                public void onConnected() {
                    listener.onConnected();
                }

                @Override // org.fusesource.mqtt.client.Listener
                public void onDisconnected() {
                    listener.onDisconnected();
                }

                @Override // org.fusesource.mqtt.client.Listener
                public void onFailure(Throwable th) {
                    listener.onFailure(th);
                }

                @Override // org.fusesource.mqtt.client.Listener
                public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Runnable runnable) {
                    listener.onPublish(uTF8Buffer, buffer, runnable);
                }

                @Override // org.fusesource.mqtt.client.ExtendedListener
                public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, final Callback<Callback<Void>> callback) {
                    listener.onPublish(uTF8Buffer, buffer, new Runnable() { // from class: org.fusesource.mqtt.client.CallbackConnection.8.1
                        @Override // java.lang.Runnable
                        public void run() {
                            callback.onSuccess(null);
                        }
                    });
                }
            };
        }
        return this;
    }

    public void onSessionEstablished(Transport transport) {
        try {
            this.transport = transport;
            if (this.suspendCount.get() > 0) {
                this.transport.suspendRead();
            }
            this.transport.setTransportListener(new DefaultTransportListener() { // from class: org.fusesource.mqtt.client.CallbackConnection.6
                @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
                public void onRefill() {
                    CallbackConnection.this.onRefillCalled = true;
                    CallbackConnection.this.drainOverflow();
                }

                @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
                public void onTransportCommand(Object obj) {
                    MQTTFrame mQTTFrame = (MQTTFrame) obj;
                    CallbackConnection.this.mqtt.tracer.onReceive(mQTTFrame);
                    CallbackConnection.this.processFrame(mQTTFrame);
                }

                @Override // org.fusesource.hawtdispatch.transport.DefaultTransportListener, org.fusesource.hawtdispatch.transport.TransportListener
                public void onTransportFailure(IOException iOException) {
                    CallbackConnection.this.handleSessionFailure(iOException);
                }
            });
            this.pingedAt = 0L;
            if (this.mqtt.getKeepAlive() > 0) {
                this.heartBeatMonitor = new HeartBeatMonitor();
                this.heartBeatMonitor.setWriteInterval((this.mqtt.getKeepAlive() * 1000) / 2);
                if (this.transport != null) {
                    this.heartBeatMonitor.setTransport(this.transport);
                    this.heartBeatMonitor.suspendRead();
                    this.heartBeatMonitor.setOnKeepAlive(new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.7
                        @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                        public void run() {
                            MQTTFrame encode;
                            if (CallbackConnection.this.disconnected || CallbackConnection.this.pingedAt != 0 || (encode = new PINGREQ().encode()) == null || CallbackConnection.this.transport == null || !CallbackConnection.this.transport.offer(encode)) {
                                return;
                            }
                            CallbackConnection.this.mqtt.tracer.onSend(encode);
                            final long currentTimeMillis = System.currentTimeMillis();
                            final long j = CallbackConnection.this.suspendChanges.get();
                            CallbackConnection.this.pingedAt = currentTimeMillis;
                            CallbackConnection.this.queue.executeAfter(CallbackConnection.this.mqtt.getKeepAlive(), TimeUnit.SECONDS, new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.7.1
                                @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
                                public void run() {
                                    if (currentTimeMillis == CallbackConnection.this.pingedAt) {
                                        if (j == CallbackConnection.this.suspendChanges.get() && CallbackConnection.this.suspendCount.get() > 0) {
                                            CallbackConnection.this.mqtt.tracer.debug("The connection has remained suspended for an extended period of time so it cannot do proper keep alive processing.  Did you forget to resume the connection?", new Object[0]);
                                        } else {
                                            CallbackConnection.this.mqtt.tracer.debug("Ping timeout", new Object[0]);
                                            CallbackConnection.this.handleSessionFailure(new ProtocolException("Ping timeout").fillInStackTrace());
                                        }
                                    }
                                }
                            });
                        }
                    });
                    this.heartBeatMonitor.start();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void publish(String str, byte[] bArr, QoS qoS, boolean z, Callback<Void> callback) {
        publish(Buffer.utf8(str), new Buffer(bArr), qoS, z, callback);
    }

    public void publish(UTF8Buffer uTF8Buffer, Buffer buffer, QoS qoS, boolean z, Callback<Void> callback) {
        this.queue.assertExecuting();
        if (this.disconnected) {
            callback.onFailure(createDisconnectedError());
            return;
        }
        PUBLISH retain = new PUBLISH().qos(qoS).retain(z);
        retain.topicName(uTF8Buffer).payload(buffer);
        send(retain, callback);
    }

    void reconnect() {
        try {
            createTransport(new LoginHandler(new Callback<Void>() { // from class: org.fusesource.mqtt.client.CallbackConnection.2
                @Override // org.fusesource.mqtt.client.Callback
                public void onFailure(Throwable th) {
                    CallbackConnection.this.handleFatalFailure(th);
                }

                @Override // org.fusesource.mqtt.client.Callback
                public void onSuccess(Void r7) {
                    CallbackConnection.this.mqtt.tracer.debug("Restoring MQTT connection state", new Object[0]);
                    LinkedList linkedList = CallbackConnection.this.overflow;
                    Map map = CallbackConnection.this.requests;
                    CallbackConnection.this.overflow = new LinkedList();
                    CallbackConnection.this.requests = new ConcurrentHashMap();
                    if (!CallbackConnection.this.activeSubs.isEmpty()) {
                        ArrayList arrayList = new ArrayList(CallbackConnection.this.activeSubs.size());
                        for (Map.Entry entry : CallbackConnection.this.activeSubs.entrySet()) {
                            arrayList.add(new Topic((UTF8Buffer) entry.getKey(), (QoS) entry.getValue()));
                        }
                        CallbackConnection.this.send(new SUBSCRIBE().topics((Topic[]) arrayList.toArray(new Topic[arrayList.size()])), null);
                    }
                    for (Map.Entry entry2 : map.entrySet()) {
                        ((Request) entry2.getValue()).frame.dup(true);
                        CallbackConnection.this.send((Request) entry2.getValue());
                    }
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        CallbackConnection.this.send((Request) it.next());
                    }
                }
            }, false));
        } catch (Throwable th) {
            handleFatalFailure(th);
        }
    }

    void reconnect(final Callback<Transport> callback) {
        long j = this.mqtt.reconnectDelay;
        if (j > 0 && this.mqtt.reconnectBackOffMultiplier > 1.0d) {
            j = (long) Math.pow(this.mqtt.reconnectDelay * this.reconnects, this.mqtt.reconnectBackOffMultiplier);
        }
        long min = Math.min(j, this.mqtt.reconnectDelayMax);
        this.reconnects++;
        this.queue.executeAfter(min, TimeUnit.MILLISECONDS, new Task() { // from class: org.fusesource.mqtt.client.CallbackConnection.4
            @Override // org.fusesource.hawtdispatch.Task, java.lang.Runnable
            public void run() {
                if (CallbackConnection.this.disconnected) {
                    callback.onFailure(CallbackConnection.access$1100());
                    return;
                }
                try {
                    CallbackConnection.this.createTransport(callback);
                } catch (Exception e) {
                    callback.onFailure(e);
                }
            }
        });
    }

    public CallbackConnection refiller(Runnable runnable) {
        this.queue.assertExecuting();
        this.refiller = runnable;
        return this;
    }

    public void resume() {
        Transport transport;
        this.suspendChanges.incrementAndGet();
        if (this.suspendCount.decrementAndGet() != 0 || (transport = this.transport) == null) {
            return;
        }
        transport.resumeRead();
        HeartBeatMonitor heartBeatMonitor = this.heartBeatMonitor;
        if (heartBeatMonitor != null) {
            heartBeatMonitor.resumeRead();
        }
    }

    public void subscribe(final Topic[] topicArr, Callback<byte[]> callback) {
        if (topicArr == null) {
            throw new IllegalArgumentException("topics must not be null");
        }
        this.queue.assertExecuting();
        if (this.disconnected) {
            callback.onFailure(createDisconnectedError());
        } else if (this.listener == DEFAULT_LISTENER) {
            callback.onFailure(createListenerNotSetError());
        } else {
            send(new SUBSCRIBE().topics(topicArr), new ProxyCallback<byte[]>(callback) { // from class: org.fusesource.mqtt.client.CallbackConnection.12
                @Override // org.fusesource.mqtt.client.ProxyCallback, org.fusesource.mqtt.client.Callback
                public void onSuccess(byte[] bArr) {
                    for (Topic topic : topicArr) {
                        CallbackConnection.this.activeSubs.put(topic.name(), topic.qos());
                    }
                    if (this.next != null) {
                        this.next.onSuccess(bArr);
                    }
                }
            });
        }
    }

    public void suspend() {
        Transport transport;
        this.suspendChanges.incrementAndGet();
        if (this.suspendCount.incrementAndGet() != 1 || (transport = this.transport) == null) {
            return;
        }
        transport.suspendRead();
        HeartBeatMonitor heartBeatMonitor = this.heartBeatMonitor;
        if (heartBeatMonitor != null) {
            heartBeatMonitor.suspendRead();
        }
    }

    public Transport transport() {
        return this.transport;
    }

    public void unsubscribe(final UTF8Buffer[] uTF8BufferArr, Callback<Void> callback) {
        this.queue.assertExecuting();
        if (this.disconnected) {
            callback.onFailure(createDisconnectedError());
        } else {
            send(new UNSUBSCRIBE().topics(uTF8BufferArr), new ProxyCallback(callback) { // from class: org.fusesource.mqtt.client.CallbackConnection.13
                @Override // org.fusesource.mqtt.client.ProxyCallback, org.fusesource.mqtt.client.Callback
                public void onSuccess(Object obj) {
                    for (UTF8Buffer uTF8Buffer : uTF8BufferArr) {
                        CallbackConnection.this.activeSubs.remove(uTF8Buffer);
                    }
                    if (this.next != null) {
                        this.next.onSuccess(obj);
                    }
                }
            });
        }
    }
}
