package com.microsoft.azure.sdk.iot.deps.transport.amqp;

import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.deps.util.ObjectLock;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes3.dex */
public class AmqpsConnection extends ErrorLoggingBaseHandlerWithCleanup {

    /* renamed from: t, reason: collision with root package name */
    private static final Logger f27270t = LoggerFactory.getLogger((Class<?>) AmqpsConnection.class);

    /* renamed from: d, reason: collision with root package name */
    private int f27271d;

    /* renamed from: e, reason: collision with root package name */
    private long f27272e;

    /* renamed from: f, reason: collision with root package name */
    private final Boolean f27273f;

    /* renamed from: g, reason: collision with root package name */
    private Boolean f27274g;

    /* renamed from: h, reason: collision with root package name */
    private final String f27275h;

    /* renamed from: i, reason: collision with root package name */
    private final String f27276i;

    /* renamed from: j, reason: collision with root package name */
    private Connection f27277j;

    /* renamed from: k, reason: collision with root package name */
    private Session f27278k;

    /* renamed from: l, reason: collision with root package name */
    private ExecutorService f27279l;

    /* renamed from: m, reason: collision with root package name */
    private final AmqpDeviceOperations f27280m;

    /* renamed from: n, reason: collision with root package name */
    private Reactor f27281n;
    private SaslListenerImpl o;
    private AmqpListener p;

    /* renamed from: q, reason: collision with root package name */
    private CountDownLatch f27282q;

    /* renamed from: r, reason: collision with root package name */
    private final ObjectLock f27283r;

    /* renamed from: s, reason: collision with root package name */
    private final SSLContext f27284s;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public static class a implements Callable<Object> {

        /* renamed from: b, reason: collision with root package name */
        private final AmqpReactor f27285b;

        a(AmqpReactor amqpReactor) {
            this.f27285b = amqpReactor;
        }

        @Override // java.util.concurrent.Callable
        public Object call() {
            Thread.currentThread().setName("azure-iot-sdk-ReactorRunner");
            AmqpsConnection.f27270t.trace("Amqp reactor thread {} has started", "azure-iot-sdk-ReactorRunner");
            try {
                this.f27285b.run();
                AmqpsConnection.f27270t.trace("Amqp reactor thread {} has finished", "azure-iot-sdk-ReactorRunner");
                return null;
            } catch (HandlerException e2) {
                AmqpsConnection.f27270t.error("Encountered an exception while running the AMQP reactor", (Throwable) e2);
                throw e2;
            }
        }
    }

    public AmqpsConnection(String str, AmqpDeviceOperations amqpDeviceOperations, SSLContext sSLContext, SaslHandler saslHandler, boolean z2) {
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("The hostname cannot be null or empty.");
        }
        this.f27271d = -1;
        this.f27272e = 0L;
        this.f27280m = amqpDeviceOperations;
        Boolean valueOf = Boolean.valueOf(z2);
        this.f27273f = valueOf;
        if (saslHandler != null) {
            this.o = new SaslListenerImpl(saslHandler);
        }
        this.f27282q = new CountDownLatch(1);
        this.f27283r = new ObjectLock();
        this.f27284s = sSLContext;
        this.f27274g = Boolean.FALSE;
        Object[] objArr = new Object[2];
        objArr[0] = str;
        objArr[1] = Integer.valueOf(valueOf.booleanValue() ? 443 : 5671);
        this.f27276i = String.format("%s:%d", objArr);
        this.f27275h = str;
        add(new Handshaker());
        add(new FlowController());
        ReactorOptions reactorOptions = new ReactorOptions();
        reactorOptions.setEnableSaslByDefault(false);
        this.f27281n = Proton.reactor(reactorOptions, this);
    }

    private SslDomain b() {
        SslDomain sslDomain = Proton.sslDomain();
        sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
        sslDomain.init(SslDomain.Mode.CLIENT);
        sslDomain.setSslContext(this.f27284s);
        return sslDomain;
    }

    public void close() {
        if (this.f27274g.booleanValue()) {
            Logger logger = f27270t;
            logger.debug("Closing amqp connection");
            this.f27280m.closeLinks();
            Session session = this.f27278k;
            if (session != null) {
                session.close();
            }
            Connection connection = this.f27277j;
            if (connection != null) {
                connection.close();
            }
            Reactor reactor2 = this.f27281n;
            if (reactor2 != null) {
                reactor2.stop();
            }
            try {
                synchronized (this.f27283r) {
                    this.f27283r.waitLock(DateUtils.MILLIS_PER_MINUTE);
                }
                ExecutorService executorService = this.f27279l;
                if (executorService != null) {
                    executorService.shutdown();
                    try {
                        ExecutorService executorService2 = this.f27279l;
                        TimeUnit timeUnit = TimeUnit.SECONDS;
                        if (!executorService2.awaitTermination(30L, timeUnit)) {
                            this.f27279l.shutdownNow();
                            if (!this.f27279l.awaitTermination(30L, timeUnit)) {
                                logger.info("Pool did not terminate");
                            }
                        }
                    } catch (InterruptedException unused) {
                        this.f27279l.shutdownNow();
                    }
                }
                this.f27274g = Boolean.FALSE;
            } catch (InterruptedException e2) {
                throw new IOException("Waited too long for the connection to close.", e2);
            }
        }
    }

    public boolean isConnected() {
        SaslListenerImpl saslListenerImpl = this.o;
        if (saslListenerImpl != null && saslListenerImpl.getSavedException() != null) {
            throw this.o.getSavedException();
        }
        ProtonJExceptionParser protonJExceptionParser = this.protonJExceptionParser;
        if (protonJExceptionParser == null || protonJExceptionParser.getError() == null) {
            return this.f27274g.booleanValue();
        }
        throw new IOException("Encountered exception during amqp connection: " + this.protonJExceptionParser.getError() + " with description " + this.protonJExceptionParser.getErrorDescription());
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onConnectionBound(Event event) {
        Transport transport = event.getConnection().getTransport();
        if (transport != null) {
            if (this.o != null) {
                f27270t.debug("Setting up sasl negotiator");
                transport.sasl().setListener(this.o);
            }
            if (this.f27273f.booleanValue()) {
                f27270t.debug("Adding websocket layer");
                WebSocketImpl webSocketImpl = new WebSocketImpl(262144);
                webSocketImpl.configure(this.f27275h, "/$iothub/websocket", "iothub-no-client-cert=true", 443, "AMQPWSB10", null, null);
                ((TransportInternal) transport).addTransportLayer(webSocketImpl);
            }
            transport.ssl(b());
        }
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onConnectionInit(Event event) {
        Connection connection = event.getConnection();
        this.f27277j = connection;
        connection.setHostname(this.f27276i);
        this.f27278k = this.f27277j.session();
        this.f27277j.open();
        this.f27278k.open();
        this.f27280m.openLinks(this.f27278k);
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onConnectionUnbound(Event event) {
        f27270t.trace("Amqp connection unbound");
        this.f27274g = Boolean.FALSE;
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onDelivery(Event event) {
        Link link = event.getLink();
        if (link instanceof Sender) {
            Delivery delivery = event.getDelivery();
            DeliveryState remoteState = delivery.getRemoteState();
            if (!remoteState.equals(Accepted.getInstance())) {
                this.p.messageSendFailed("Amqp message was not accepted by service, remote state was " + remoteState.getType());
            }
            delivery.free();
            return;
        }
        if (!(link instanceof Receiver)) {
            f27270t.warn("onDelivery executed on a link that is neither a sender or a receiver");
            return;
        }
        AmqpMessage receiverMessageFromLink = this.f27280m.receiverMessageFromLink(event.getLink().getName());
        if (receiverMessageFromLink == null) {
            f27270t.warn("onDelivery executed on a receiver link but no message could be received");
        } else {
            f27270t.debug("Amqp connection received message");
            this.p.messageReceived(receiverMessageFromLink);
        }
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkFlow(Event event) {
        int credit = event.getLink().getCredit();
        this.f27271d = credit;
        f27270t.trace("Amqp link received {} link credit", Integer.valueOf(credit));
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkInit(Event event) {
        this.f27280m.initLink(event.getLink());
    }

    @Override // com.microsoft.azure.sdk.iot.deps.transport.amqp.ErrorLoggingBaseHandlerWithCleanup, org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkRemoteOpen(Event event) {
        super.onLinkRemoteOpen(event);
        if (this.f27280m.isReceiverLinkTag(event.getLink().getName())) {
            this.f27274g = Boolean.TRUE;
            if (this.p != null) {
                this.f27282q.countDown();
            }
        }
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onReactorFinal(Event event) {
        super.onReactorFinal(event);
        this.f27281n = null;
        synchronized (this.f27283r) {
            this.f27283r.notifyLock();
        }
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onReactorInit(Event event) {
        event.getReactor().connectionToHost(this.f27275h, this.f27273f.booleanValue() ? 443 : 5671, this);
    }

    @Override // com.microsoft.azure.sdk.iot.deps.transport.amqp.ErrorLoggingBaseHandlerWithCleanup, com.microsoft.azure.sdk.iot.deps.transport.amqp.ErrorLoggingBaseHandler, org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onTransportError(Event event) {
        super.onTransportError(event);
        this.f27274g = Boolean.FALSE;
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onTransportHeadClosed(Event event) {
        this.f27282q.countDown();
        f27270t.trace("Amqp transport head closed");
    }

    public void open() {
        if (this.f27274g.booleanValue()) {
            f27270t.trace("Open called while amqp connection was already open");
        } else {
            try {
                f27270t.debug("Opening amqp connection asynchronously");
                openAmqpAsync();
                try {
                    this.f27282q.await(DateUtils.MILLIS_PER_MINUTE, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    f27270t.error("Amqp connection was interrupted while opening.", (Throwable) e2);
                    close();
                    throw new IOException("Amqp connection was interrupted while opening.", e2);
                }
            } catch (Exception e3) {
                f27270t.error("Error opening Amqp connection: ", (Throwable) e3);
                close();
                throw new IOException("Error opening Amqp connection: ", e3);
            }
        }
        if (!this.f27274g.booleanValue()) {
            throw new IOException("Timed out  to open the amqp connection");
        }
    }

    public void openAmqpAsync() {
        this.f27282q = new CountDownLatch(1);
        if (this.f27279l == null) {
            this.f27279l = Executors.newFixedThreadPool(1);
        }
        f27270t.debug("Starting amqp reactor thread...");
        this.f27279l.submit(new a(new AmqpReactor(this.f27281n)));
    }

    public boolean sendAmqpMessage(AmqpMessage amqpMessage) {
        if (!isConnected()) {
            return false;
        }
        byte[] bArr = new byte[1024];
        boolean z2 = false;
        int i2 = 0;
        do {
            try {
                i2 = amqpMessage.encode(bArr, 0);
                z2 = true;
            } catch (BufferOverflowException unused) {
                bArr = new byte[bArr.length * 2];
            }
        } while (!z2);
        if (i2 <= 0) {
            return false;
        }
        byte[] bytes = String.valueOf(this.f27272e).getBytes(StandardCharsets.UTF_8);
        long j2 = this.f27272e;
        if (j2 == 2147483647L || j2 < 0) {
            this.f27272e = 0L;
        } else {
            this.f27272e = j2 + 1;
        }
        this.f27280m.sendMessage(bytes, bArr, i2, 0);
        return true;
    }

    public void setListener(AmqpListener amqpListener) {
        if (amqpListener == null) {
            throw new IllegalArgumentException("The listener cannot be null.");
        }
        this.p = amqpListener;
    }
}
