package com.rabbitmq.client.test.functional;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.test.BrokerTestCase;
import java.io.IOException;

/* loaded from: classes.dex */
public abstract class TTLHandling extends BrokerTestCase {
    protected static final String[] MSG = {"one", "two", "three"};
    protected static final String TTL_EXCHANGE = "ttl.exchange";
    protected static final String TTL_INVALID_QUEUE_NAME = "invalid.queue.ttl";
    protected static final String TTL_QUEUE_NAME = "queue.ttl";

    /* JADX INFO: Access modifiers changed from: protected */
    public void bindQueue() throws IOException {
        this.channel.queueBind(TTL_QUEUE_NAME, TTL_EXCHANGE, TTL_QUEUE_NAME);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.client.test.BrokerTestCase
    public void createResources() throws IOException {
        this.channel.exchangeDeclare(TTL_EXCHANGE, "direct");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void declareAndBindQueue(Object obj) throws IOException {
        declareQueue(obj);
        bindQueue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AMQP.Queue.DeclareOk declareQueue(Object obj) throws IOException {
        return declareQueue(TTL_QUEUE_NAME, obj);
    }

    protected abstract AMQP.Queue.DeclareOk declareQueue(String str, Object obj) throws IOException;

    protected void expectBodyAndRemainingMessages(String str, int i) throws IOException {
        GetResponse basicGet = this.channel.basicGet(TTL_QUEUE_NAME, false);
        assertNotNull(basicGet);
        assertEquals(str, new String(basicGet.getBody()));
        assertEquals(i, basicGet.getMessageCount());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String get() throws IOException {
        GetResponse basicGet = basicGet(TTL_QUEUE_NAME);
        if (basicGet == null) {
            return null;
        }
        return new String(basicGet.getBody());
    }

    protected void publish(String str) throws IOException {
        basicPublishVolatile(str.getBytes(), TTL_EXCHANGE, TTL_QUEUE_NAME);
    }

    protected void publishAndSync(String str) throws IOException {
        publish(str);
        this.channel.basicQos(0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.client.test.BrokerTestCase
    public void releaseResources() throws IOException {
        this.channel.exchangeDelete(TTL_EXCHANGE);
    }

    public void testExpiryWithReQueueAfterConsume() throws Exception {
        declareAndBindQueue(100);
        QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
        this.channel.basicConsume(TTL_QUEUE_NAME, queueingConsumer);
        publish(MSG[0]);
        assertNotNull(queueingConsumer.nextDelivery(100L));
        closeChannel();
        Thread.sleep(150L);
        openChannel();
        assertNull("Re-queued message not expired", get());
    }

    public void testExpiryWithRequeue() throws Exception {
        declareAndBindQueue(200);
        publish(MSG[0]);
        Thread.sleep(100L);
        publish(MSG[1]);
        publish(MSG[2]);
        expectBodyAndRemainingMessages(MSG[0], 2);
        expectBodyAndRemainingMessages(MSG[1], 1);
        closeChannel();
        openChannel();
        Thread.sleep(150L);
        expectBodyAndRemainingMessages(MSG[1], 1);
        expectBodyAndRemainingMessages(MSG[2], 0);
    }

    public void testInvalidTypeUsedInTTL() throws Exception {
        try {
            declareAndBindQueue("foobar");
            publishAndSync(MSG[0]);
            fail("Should not be able to set TTL using non-numeric values");
        } catch (IOException e) {
            checkShutdownSignal(406, e);
        }
    }

    public void testMessagesExpireWhenUsingBasicGet() throws Exception {
        declareAndBindQueue(200);
        publish(MSG[0]);
        Thread.sleep(1000L);
        String str = get();
        assertNull("expected message " + str + " to have been removed", str);
    }

    public void testMultipleTTLTypes() throws IOException {
        for (Object obj : new Object[]{200, (short) 200, 200, 200L}) {
            try {
                declareAndBindQueue(obj);
                publishAndSync(MSG[0]);
            } catch (IOException e) {
                fail("Should be able to use " + obj.getClass().getName() + " when setting TTL");
            }
        }
    }

    public void testPublishAndGetWithExpiry() throws Exception {
        declareAndBindQueue(200);
        publish(MSG[0]);
        Thread.sleep(150L);
        publish(MSG[1]);
        Thread.sleep(100L);
        publish(MSG[2]);
        assertEquals(MSG[1], get());
        assertEquals(MSG[2], get());
        assertNull(get());
    }

    public void testTTLAllowZero() throws Exception {
        try {
            declareQueue(0);
            publishAndSync(MSG[0]);
        } catch (IOException e) {
            fail("Should be able to set ttl to zero");
        }
    }

    public void testTTLMustBePositive() throws Exception {
        try {
            declareAndBindQueue(-10);
            publishAndSync(MSG[0]);
            fail("Should not be able to set TTL using negative values");
        } catch (IOException e) {
            checkShutdownSignal(406, e);
        }
    }

    public void testTrailingCharsUsedInTTL() throws Exception {
        try {
            declareAndBindQueue("10000foobar");
            publishAndSync(MSG[0]);
            fail("Should not be able to set TTL using non-numeric values");
        } catch (IOException e) {
            checkShutdownSignal(406, e);
        }
    }

    public void testTransactionalPublishWithGet() throws Exception {
        declareAndBindQueue(100);
        this.channel.txSelect();
        publish(MSG[0]);
        Thread.sleep(150L);
        publish(MSG[1]);
        this.channel.txCommit();
        Thread.sleep(50L);
        assertEquals(MSG[0], get());
        Thread.sleep(80L);
        assertNull(get());
    }

    public void testZeroTTLDelivery() throws Exception {
        declareAndBindQueue(0);
        publish(MSG[0]);
        assertNull(get());
        QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
        this.channel.basicConsume(TTL_QUEUE_NAME, queueingConsumer);
        publish(MSG[0]);
        QueueingConsumer.Delivery nextDelivery = queueingConsumer.nextDelivery(100L);
        assertNotNull(nextDelivery);
        this.channel.basicReject(nextDelivery.getEnvelope().getDeliveryTag(), true);
        assertNull(queueingConsumer.nextDelivery(100L));
    }
}
