package com.hivemq.client.internal.mqtt.handler.publish.outgoing;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.advanced.interceptor.MqttClientInterceptors;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.f;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.mqtt.message.publish.puback.MqttPubAck;
import com.hivemq.client.internal.mqtt.message.publish.pubcomp.MqttPubComp;
import com.hivemq.client.internal.mqtt.message.publish.pubrec.MqttPubRec;
import com.hivemq.client.internal.mqtt.message.publish.pubrel.MqttPubRel;
import com.hivemq.client.internal.mqtt.message.publish.pubrel.MqttPubRelBuilder;
import com.hivemq.client.internal.netty.ContextFuture;
import com.hivemq.client.internal.netty.DefaultContextPromise;
import com.hivemq.client.internal.util.Ranges;
import com.hivemq.client.internal.util.collections.IntIndex;
import com.hivemq.client.internal.util.collections.NodeList;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.exceptions.ConnectionClosedException;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos1.Mqtt5OutgoingQos1Interceptor;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos2.Mqtt5OutgoingQos2Interceptor;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.functions.Function;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
import javax.inject.Inject;
import org.jctools.queues.SpscUnboundedArrayQueue;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

@ClientScope
/* loaded from: classes2.dex */
public class MqttOutgoingQosHandler extends MqttSessionAwareHandler implements FlowableSubscriber<h>, Runnable, ContextFuture.Listener<h> {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    private static final int MAX_CONCURRENT_PUBLISH_FLOWABLES = 64;

    @NotNull
    public static final String NAME = "qos.outgoing";
    private static final boolean QOS_2_COMPLETE_RESULT = false;

    @NotNull
    private final MqttClientConfig clientConfig;

    @Nullable
    private h currentPending;

    @Nullable
    private e resendPending;
    private int sendMaximum;
    private int shrinkRequests;

    @Nullable
    private Subscription subscription;

    @Nullable
    private MqttTopicAliasMapping topicAliasMapping;

    @NotNull
    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(MqttOutgoingQosHandler.class);
    private static final IntIndex.Spec<e> INDEX_SPEC = new IntIndex.Spec<>(new ToIntFunction() { // from class: com.hivemq.client.internal.mqtt.handler.publish.outgoing.d
        @Override // java.util.function.ToIntFunction
        public final int applyAsInt(Object obj) {
            int i2;
            i2 = ((e) obj).f16234b;
            return i2;
        }
    });

    @NotNull
    private final SpscUnboundedArrayQueue<h> queue = new SpscUnboundedArrayQueue<>(32);

    @NotNull
    private final AtomicInteger queuedCounter = new AtomicInteger();

    @NotNull
    private final NodeList<e> pending = new NodeList<>();

    @NotNull
    private final Ranges packetIdentifiers = new Ranges(1, 0);

    @NotNull
    private final IntIndex<e> pendingIndex = new IntIndex<>(INDEX_SPEC);

    @NotNull
    private final MqttPublishFlowables publishFlowables = new MqttPublishFlowables();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public MqttOutgoingQosHandler(@NotNull MqttClientConfig mqttClientConfig) {
        this.clientConfig = mqttClientConfig;
    }

    @NotNull
    private MqttPubRel buildPubRel(@NotNull MqttPublish mqttPublish, @NotNull MqttPubRec mqttPubRec) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttPubRelBuilder mqttPubRelBuilder = new MqttPubRelBuilder(mqttPubRec);
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors != null && (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) != null) {
            outgoingQos2Interceptor.onPubRec(this.clientConfig, mqttPublish, mqttPubRec, mqttPubRelBuilder);
        }
        return mqttPubRelBuilder.build();
    }

    private void clearQueued(@NotNull Throwable th) {
        int i2;
        do {
            i2 = 0;
            while (true) {
                h hVar = (h) this.queue.poll();
                if (hVar == null) {
                    break;
                }
                hVar.a().b(new MqttPublishResult(hVar.b(), th));
                i2++;
            }
        } while (this.queuedCounter.addAndGet(-i2) != 0);
    }

    private void completePending(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull e eVar) {
        this.pending.remove(eVar);
        int i2 = eVar.f16234b;
        this.packetIdentifiers.returnId(i2);
        int i3 = this.sendMaximum;
        if (i2 > i3) {
            this.packetIdentifiers.resize(i3);
        }
        if (this.resendPending != null) {
            channelHandlerContext.channel().eventLoop().execute(this);
        }
    }

    private static void error(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull String str) {
        MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, str);
    }

    private boolean isRepublishIfSessionExpired() {
        return this.clientConfig.isRepublishIfSessionExpired() && this.clientConfig.getState() != MqttClientState.DISCONNECTED;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ Publisher lambda$onSessionStartOrResume$1(Flowable flowable) throws Exception {
        return flowable;
    }

    private void onPubAck(@NotNull MqttPublish mqttPublish, @NotNull MqttPubAck mqttPubAck) {
        Mqtt5OutgoingQos1Interceptor outgoingQos1Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos1Interceptor = interceptors.getOutgoingQos1Interceptor()) == null) {
            return;
        }
        outgoingQos1Interceptor.onPubAck(this.clientConfig, mqttPublish, mqttPubAck);
    }

    private void onPubComp(@NotNull MqttPubRel mqttPubRel, @NotNull MqttPubComp mqttPubComp) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) == null) {
            return;
        }
        outgoingQos2Interceptor.onPubComp(this.clientConfig, mqttPubRel, mqttPubComp);
    }

    private void onPubRecError(@NotNull MqttPublish mqttPublish, @NotNull MqttPubRec mqttPubRec) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) == null) {
            return;
        }
        outgoingQos2Interceptor.onPubRecError(this.clientConfig, mqttPublish, mqttPubRec);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readPubAck(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubAck mqttPubAck) {
        e remove = this.pendingIndex.remove(mqttPubAck.getPacketIdentifier());
        if (remove == null) {
            error(channelHandlerContext, "PUBACK contained unknown packet identifier");
            return;
        }
        if (!(remove instanceof h)) {
            this.pendingIndex.put(remove);
            error(channelHandlerContext, "PUBACK must not be received for a PUBREL");
            return;
        }
        h hVar = (h) remove;
        MqttPublish b2 = hVar.b();
        if (b2.getQos() != MqttQos.AT_LEAST_ONCE) {
            this.pendingIndex.put(remove);
            error(channelHandlerContext, "PUBACK must not be received for a QoS 2 PUBLISH");
        } else {
            completePending(channelHandlerContext, hVar);
            onPubAck(b2, mqttPubAck);
            hVar.a().b(new MqttPublishResult.MqttQos1Result(b2, ((Mqtt5PubAckReasonCode) mqttPubAck.getReasonCode()).isError() ? new Mqtt5PubAckException(mqttPubAck, "PUBACK contained an Error Code") : null, mqttPubAck));
        }
    }

    private void readPubComp(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubComp mqttPubComp) {
        e remove = this.pendingIndex.remove(mqttPubComp.getPacketIdentifier());
        if (remove == null) {
            error(channelHandlerContext, "PUBCOMP contained unknown packet identifier");
            return;
        }
        if (!(remove instanceof f)) {
            this.pendingIndex.put(remove);
            if (((h) remove).b().getQos() == MqttQos.AT_LEAST_ONCE) {
                error(channelHandlerContext, "PUBCOMP must not be received for a QoS 1 PUBLISH");
                return;
            } else {
                error(channelHandlerContext, "PUBCOMP must not be received when the PUBREL has not been sent yet");
                return;
            }
        }
        f fVar = (f) remove;
        MqttPubRel b2 = fVar.b();
        a a2 = fVar.a();
        completePending(channelHandlerContext, fVar);
        onPubComp(b2, mqttPubComp);
        if (((f.a) fVar).getAsBoolean()) {
            a2.a(1L);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readPubRec(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubRec mqttPubRec) {
        int packetIdentifier = mqttPubRec.getPacketIdentifier();
        e eVar = this.pendingIndex.get(packetIdentifier);
        if (eVar == null) {
            error(channelHandlerContext, "PUBREC contained unknown packet identifier");
            return;
        }
        if (!(eVar instanceof h)) {
            error(channelHandlerContext, "PUBREC must not be received when the PUBREL has already been sent");
            return;
        }
        h hVar = (h) eVar;
        MqttPublish b2 = hVar.b();
        if (b2.getQos() != MqttQos.EXACTLY_ONCE) {
            error(channelHandlerContext, "PUBREC must not be received for a QoS 1 PUBLISH");
            return;
        }
        a a2 = hVar.a();
        if (((Mqtt5PubRecReasonCode) mqttPubRec.getReasonCode()).isError()) {
            this.pendingIndex.remove(packetIdentifier);
            completePending(channelHandlerContext, hVar);
            onPubRecError(b2, mqttPubRec);
            a2.b(new MqttPublishResult.MqttQos2Result(b2, new Mqtt5PubRecException(mqttPubRec, "PUBREC contained an Error Code"), mqttPubRec));
            return;
        }
        MqttPubRel buildPubRel = buildPubRel(b2, mqttPubRec);
        f.a aVar = new f.a(buildPubRel, a2);
        replacePending(hVar, aVar);
        a2.b(new MqttPublishResult.MqttQos2IntermediateResult(b2, mqttPubRec, aVar));
        writePubRel(channelHandlerContext, buildPubRel);
        channelHandlerContext.flush();
    }

    private void replacePending(@NotNull h hVar, @NotNull f fVar) {
        fVar.f16234b = hVar.f16234b;
        this.pendingIndex.put(fVar);
        this.pending.replace(hVar, fVar);
    }

    private void resend(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull e eVar) {
        this.pendingIndex.put(eVar);
        if (!(eVar instanceof h)) {
            writePubRel(channelHandlerContext, ((f) eVar).b());
        } else {
            h hVar = (h) eVar;
            writeQos1Or2Publish(channelHandlerContext, hVar.b().createStateful(hVar.f16234b, true, this.topicAliasMapping), hVar);
        }
    }

    private void writePubRel(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubRel mqttPubRel) {
        channelHandlerContext.write(mqttPubRel, channelHandlerContext.voidPromise());
    }

    private void writePublish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull h hVar) {
        if (hVar.b().getQos() == MqttQos.AT_MOST_ONCE) {
            writeQos0Publish(channelHandlerContext, hVar);
        } else {
            writeQos1Or2Publish(channelHandlerContext, hVar);
        }
    }

    private void writeQos0Publish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull h hVar) {
        channelHandlerContext.write(hVar.b().createStateful(-1, false, this.topicAliasMapping), new DefaultContextPromise(channelHandlerContext.channel(), hVar)).addListener((GenericFutureListener<? extends Future<? super Void>>) this);
    }

    private void writeQos1Or2Publish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull h hVar) {
        int id = this.packetIdentifiers.getId();
        if (id < 0) {
            LOGGER.error("No Packet Identifier available for QoS 1 or 2 PUBLISH. This must not happen and is a bug.");
            return;
        }
        hVar.f16234b = id;
        this.pendingIndex.put(hVar);
        this.pending.add(hVar);
        writeQos1Or2Publish(channelHandlerContext, hVar.b().createStateful(id, false, this.topicAliasMapping), hVar);
    }

    private void writeQos1Or2Publish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttStatefulPublish mqttStatefulPublish, @NotNull h hVar) {
        this.currentPending = hVar;
        channelHandlerContext.write(mqttStatefulPublish, channelHandlerContext.voidPromise());
        this.currentPending = null;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj) {
        if (obj instanceof MqttPubAck) {
            readPubAck(channelHandlerContext, (MqttPubAck) obj);
            return;
        }
        if (obj instanceof MqttPubRec) {
            readPubRec(channelHandlerContext, (MqttPubRec) obj);
        } else if (obj instanceof MqttPubComp) {
            readPubComp(channelHandlerContext, (MqttPubComp) obj);
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelWritabilityChanged(@NotNull ChannelHandlerContext channelHandlerContext) {
        Channel channel = channelHandlerContext.channel();
        if (channel.isWritable()) {
            channel.eventLoop().execute(this);
        }
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Throwable th) {
        h hVar;
        if ((th instanceof IOException) || (hVar = this.currentPending) == null) {
            channelHandlerContext.fireExceptionCaught(th);
            return;
        }
        this.pendingIndex.remove(hVar.f16234b);
        this.currentPending.a().b(new MqttPublishResult(this.currentPending.b(), th));
        completePending(channelHandlerContext, this.currentPending);
        this.currentPending = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @NotNull
    public MqttPublishFlowables getPublishFlowables() {
        return this.publishFlowables;
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        LOGGER.error("MqttPublishFlowables is global and must never complete. This must not happen and is a bug.");
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(@NotNull Throwable th) {
        LOGGER.error("MqttPublishFlowables is global and must never error. This must not happen and is a bug.", th);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(@NotNull h hVar) {
        this.queue.offer(hVar);
        if (this.queuedCounter.getAndIncrement() == 0) {
            hVar.a().getEventLoop().execute(this);
        }
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionEnd(@NotNull Throwable th) {
        super.onSessionEnd(th);
        this.pendingIndex.clear();
        this.resendPending = null;
        if (isRepublishIfSessionExpired()) {
            return;
        }
        e first = this.pending.getFirst();
        while (true) {
            e eVar = first;
            if (eVar == null) {
                this.pending.clear();
                clearQueued(th);
                return;
            }
            this.packetIdentifiers.returnId(eVar.f16234b);
            if (eVar instanceof h) {
                eVar.a().b(new MqttPublishResult(((h) eVar).b(), th));
            } else {
                f.a aVar = (f.a) eVar;
                if (aVar.getAsBoolean()) {
                    aVar.a().a(1L);
                }
            }
            first = eVar.getNext();
        }
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionStartOrResume(@NotNull MqttClientConnectionConfig mqttClientConnectionConfig, @NotNull EventLoop eventLoop) {
        int i2 = this.sendMaximum;
        int min = Math.min(mqttClientConnectionConfig.getSendMaximum(), 65525);
        this.sendMaximum = min;
        this.packetIdentifiers.resize(min);
        if (i2 == 0) {
            this.publishFlowables.flatMap(new Function() { // from class: com.hivemq.client.internal.mqtt.handler.publish.outgoing.c
                @Override // io.reactivex.functions.Function
                public final Object apply(Object obj) {
                    Publisher lambda$onSessionStartOrResume$1;
                    lambda$onSessionStartOrResume$1 = MqttOutgoingQosHandler.lambda$onSessionStartOrResume$1((Flowable) obj);
                    return lambda$onSessionStartOrResume$1;
                }
            }, true, 64, Math.min(min, Flowable.bufferSize())).subscribe((FlowableSubscriber<? super R>) this);
            this.subscription.request(min);
        } else {
            int i3 = (min - i2) - this.shrinkRequests;
            if (i3 > 0) {
                this.shrinkRequests = 0;
                this.subscription.request(i3);
            } else {
                this.shrinkRequests = -i3;
            }
        }
        this.topicAliasMapping = mqttClientConnectionConfig.getSendTopicAliasMapping();
        this.pendingIndex.clear();
        e first = this.pending.getFirst();
        this.resendPending = first;
        if (first != null || this.queuedCounter.get() > 0) {
            eventLoop.execute(this);
        }
        super.onSessionStartOrResume(mqttClientConnectionConfig, eventLoop);
    }

    @Override // io.reactivex.FlowableSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(@NotNull Subscription subscription) {
        this.subscription = subscription;
    }

    @Override // io.netty.util.concurrent.GenericFutureListener
    public void operationComplete(@NotNull ContextFuture<? extends h> contextFuture) {
        h context = contextFuture.getContext();
        MqttPublish b2 = context.b();
        a a2 = context.a();
        Throwable cause = contextFuture.cause();
        if (!(cause instanceof IOException)) {
            a2.b(new MqttPublishResult(b2, cause));
        } else {
            a2.b(new MqttPublishResult(b2, new ConnectionClosedException(cause)));
            contextFuture.channel().pipeline().fireExceptionCaught(cause);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @CallByThread("Netty EventLoop")
    public void request(long j2) {
        int i2 = this.shrinkRequests;
        if (i2 == 0) {
            this.subscription.request(j2);
            return;
        }
        long j3 = i2;
        if (j2 <= j3) {
            this.shrinkRequests = (int) (i2 - j2);
        } else {
            this.shrinkRequests = 0;
            this.subscription.request(j2 - j3);
        }
    }

    @Override // java.lang.Runnable
    @CallByThread("Netty EventLoop")
    public void run() {
        if (!this.hasSession) {
            if (isRepublishIfSessionExpired()) {
                return;
            }
            clearQueued(MqttClientStateExceptions.notConnected());
            return;
        }
        ChannelHandlerContext channelHandlerContext = this.ctx;
        if (channelHandlerContext == null) {
            return;
        }
        Channel channel = channelHandlerContext.channel();
        int size = this.sendMaximum - this.pendingIndex.size();
        e eVar = this.resendPending;
        int i2 = 0;
        int i3 = 0;
        while (eVar != null && i3 < size && channel.isWritable()) {
            resend(channelHandlerContext, eVar);
            i3++;
            eVar = eVar.getNext();
            this.resendPending = eVar;
        }
        while (i3 < size && channel.isWritable()) {
            h hVar = (h) this.queue.poll();
            if (hVar == null) {
                break;
            }
            writePublish(channelHandlerContext, hVar);
            i3++;
            i2++;
        }
        if (i3 > 0) {
            boolean isWritable = channel.isWritable();
            channelHandlerContext.flush();
            if (i2 <= 0 || this.queuedCounter.addAndGet(-i2) <= 0 || !isWritable) {
                return;
            }
            channel.eventLoop().execute(this);
        }
    }
}
