package com.xiaomi.micloud.kafka;

import com.xiaomi.micloud.kafka.exception.KafkaException;
import com.xiaomi.micloud.kafka.exception.KafkaInitializeException;
import com.xiaomi.micloud.util.Constants;
import com.xiaomi.micloud.util.FaceCommonBiz;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class Kafka {
    public static Logger LOG = LoggerFactory.getLogger(Kafka.class);
    private int connectionTimeOut;
    private int consumerBufferSize;
    private int consumerMaxRetry;
    private long fetchSize;
    private boolean isSchemaList;
    private long maxFetchCount;
    private Map<String, Integer> seedBrokers;
    private String topic;
    private Map<Integer, PartitionMetadata> partitionToMeta = new HashMap();
    private Map<Integer, SimpleConsumer> partitionToConsumer = new HashMap();
    private Map<Integer, Long> partitionToLatestOffset = new HashMap();

    public Kafka(Map<String, Object> map) {
        this.isSchemaList = false;
        this.connectionTimeOut = FaceCommonBiz.MAX_GALLERY_RESION_TIMEOUT;
        this.consumerBufferSize = 65536;
        this.consumerMaxRetry = 3;
        this.topic = (String) map.get(Constants.KAFKA_CONSUMER_TOPIC);
        makeSureExist(this.topic, Constants.KAFKA_CONSUMER_TOPIC);
        this.isSchemaList = map.containsKey(Constants.KAFKA_SCHEMA_LIST);
        Object obj = map.get(Constants.KAFKA_CONSUMER_FETCH_SIZE);
        this.fetchSize = 10485760L;
        if (obj != null) {
            this.fetchSize = Long.parseLong(obj.toString());
        }
        this.maxFetchCount = 1000L;
        Object obj2 = map.get(Constants.KAFKA_CONSUMER_MAX_FETCH_COUNT);
        if (obj2 != null) {
            this.maxFetchCount = Long.parseLong(obj2.toString());
        }
        this.seedBrokers = new HashMap();
        String str = (String) map.get(Constants.KAFKA_METADATA_BROKER_LIST);
        Object obj3 = map.get(Constants.KAFKA_CONSUMER_BROKER_LIST);
        parseSeedBrokers(obj3 != null ? (String) obj3 : str);
        Object obj4 = map.get(Constants.KAFKA_CONSUMER_CONNECTION_TIMEOUT);
        if (obj4 != null) {
            this.connectionTimeOut = Integer.parseInt(obj4.toString());
        }
        Object obj5 = map.get(Constants.KAFKA_CONSUMER_BUFFER_SIZE);
        if (obj5 != null) {
            this.consumerBufferSize = Integer.parseInt(obj5.toString());
        }
        Object obj6 = map.get(Constants.KAFKA_CONSUMER_MAX_RETRY);
        if (obj6 != null) {
            this.consumerMaxRetry = Integer.parseInt(obj6.toString());
        }
        if (this.fetchSize < 1024) {
            LOG.error("Fetch size is smaller than 1024!");
            this.fetchSize = 10485760L;
        }
        initialize();
    }

    public Kafka(Properties properties) {
        this.isSchemaList = false;
        this.connectionTimeOut = FaceCommonBiz.MAX_GALLERY_RESION_TIMEOUT;
        this.consumerBufferSize = 65536;
        this.consumerMaxRetry = 3;
        this.topic = properties.getProperty(Constants.KAFKA_CONSUMER_TOPIC);
        makeSureExist(this.topic, Constants.KAFKA_CONSUMER_TOPIC);
        this.isSchemaList = properties.contains(Constants.KAFKA_SCHEMA_LIST);
        String property = properties.getProperty(Constants.KAFKA_CONSUMER_FETCH_SIZE);
        this.fetchSize = 10485760L;
        if (property != null) {
            this.fetchSize = Long.parseLong(property);
        }
        this.maxFetchCount = 1000L;
        String property2 = properties.getProperty(Constants.KAFKA_CONSUMER_MAX_FETCH_COUNT);
        if (property2 != null) {
            this.maxFetchCount = Long.parseLong(property2);
        }
        this.seedBrokers = new HashMap();
        parseSeedBrokers(properties.getProperty(Constants.KAFKA_CONSUMER_BROKER_LIST, properties.getProperty(Constants.KAFKA_METADATA_BROKER_LIST)));
        String property3 = properties.getProperty(Constants.KAFKA_CONSUMER_CONNECTION_TIMEOUT);
        if (property3 != null) {
            this.connectionTimeOut = Integer.parseInt(property3);
        }
        if (properties.getProperty(Constants.KAFKA_CONSUMER_BUFFER_SIZE) != null) {
            this.consumerBufferSize = Integer.parseInt(Constants.KAFKA_CONSUMER_BUFFER_SIZE);
        }
        if (properties.getProperty(Constants.KAFKA_CONSUMER_MAX_RETRY) != null) {
            this.consumerMaxRetry = Integer.parseInt(Constants.KAFKA_CONSUMER_MAX_RETRY);
        }
        if (this.fetchSize < 1024) {
            LOG.error("Fetch size is smaller than 1024!");
            this.fetchSize = 10485760L;
        }
        initialize();
    }

    private void initialize() {
        refreshAllPartitionMetadata();
    }

    private void makeSureExist(String str, String str2) {
        if (StringUtils.isBlank(str)) {
            throw new KafkaInitializeException(String.format("Kafka Consumer config doesn't contain [%s] info, which is a MUST!", str2));
        }
    }

    private void parseSeedBrokers(String str) {
        makeSureExist(str, Constants.KAFKA_METADATA_BROKER_LIST);
        for (String str2 : str.split(",")) {
            if (!StringUtils.isBlank(str2)) {
                String[] split = str2.split(":");
                if (split.length != 2) {
                    throw new KafkaInitializeException(String.format("Invalidate schema for Kafka config property [%s]!", str2));
                }
                this.seedBrokers.put(split[0].trim(), Integer.valueOf(Integer.parseInt(split[1])));
            }
        }
    }

    public void close() {
        if (this.partitionToConsumer != null) {
            Iterator<SimpleConsumer> it = this.partitionToConsumer.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }

    public SimpleConsumer getConsumer(int i) {
        LOG.debug("getConsumer for partition: " + i);
        if (this.partitionToConsumer.containsKey(Integer.valueOf(i))) {
            return this.partitionToConsumer.get(Integer.valueOf(i));
        }
        PartitionMetadata partitionMetadata = this.partitionToMeta.get(Integer.valueOf(i));
        if (partitionMetadata == null) {
            refreshAllPartitionMetadata();
            partitionMetadata = this.partitionToMeta.get(Integer.valueOf(i));
            if (partitionMetadata == null) {
                throw new RuntimeException(String.format("Can't get PartitionMetadata for Partition: %d, please check if the kafka server is normal!", Integer.valueOf(i)));
            }
        }
        PartitionMetadata partitionMetadata2 = partitionMetadata;
        SimpleConsumer simpleConsumer = new SimpleConsumer(partitionMetadata2.leader().host(), partitionMetadata2.leader().port(), this.connectionTimeOut, this.consumerBufferSize, "fetchClient_" + this.topic + i);
        this.partitionToConsumer.put(Integer.valueOf(i), simpleConsumer);
        return simpleConsumer;
    }

    public long getLatestOffsite(int i) {
        return getOffset(i, OffsetRequest.LatestTime());
    }

    public long getOffset(int i, long j) {
        OffsetResponse offsetsBefore;
        long j2 = -1;
        TopicAndPartition topicAndPartition = new TopicAndPartition(this.topic, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        LOG.debug("Fetch partition " + i + " Offset for time " + j);
        Exception e = null;
        int i2 = 0;
        while (i2 < this.consumerMaxRetry) {
            SimpleConsumer consumer = getConsumer(i);
            try {
                offsetsBefore = consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), consumer.clientId()));
            } catch (Exception e2) {
                e = e2;
                LOG.error("Error fetching Offset from Broker. Error: ", e);
                i2++;
                consumer.close();
                this.partitionToConsumer.remove(Integer.valueOf(i));
                refreshAllPartitionMetadata();
            }
            if (!offsetsBefore.hasError()) {
                j2 = offsetsBefore.offsets(this.topic, i)[0];
                break;
            }
            short errorCode = offsetsBefore.errorCode(this.topic, i);
            LOG.error("Error fetching Offset from Broker. Reason: " + ((int) errorCode));
            i2++;
            e = new Exception("Error fetching Offset from Broker. Reason: " + ((int) errorCode));
            consumer.close();
            this.partitionToConsumer.remove(Integer.valueOf(i));
            refreshAllPartitionMetadata();
        }
        if (i2 == this.consumerMaxRetry) {
            throw new KafkaException(String.format("Get Offset for partition [%d] with timestamp [%d] failed after %d retries!", Integer.valueOf(i), Long.valueOf(j), Integer.valueOf(this.consumerMaxRetry)), e);
        }
        LOG.debug("response offset: " + j2);
        return j2;
    }

    public int getPartitionNum() {
        refreshAllPartitionMetadata();
        return this.partitionToMeta.size();
    }

    public long getStartOffSet(int i) {
        return getOffset(i, OffsetRequest.EarliestTime());
    }

    public long getToFetchCount(int i, long j) {
        if (this.partitionToLatestOffset.containsKey(Integer.valueOf(i)) && this.partitionToLatestOffset.get(Integer.valueOf(i)).longValue() - j >= this.maxFetchCount) {
            return this.maxFetchCount;
        }
        long latestOffsite = getLatestOffsite(i);
        long j2 = latestOffsite - j;
        if (j2 <= this.maxFetchCount) {
            return j2;
        }
        long j3 = this.maxFetchCount;
        this.partitionToLatestOffset.put(Integer.valueOf(i), Long.valueOf(latestOffsite));
        return j3;
    }

    public String getTopic() {
        return this.topic;
    }

    public boolean isSchemaList() {
        return this.isSchemaList;
    }

    /* JADX WARN: Removed duplicated region for block: B:37:0x00fb  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void refreshAllPartitionMetadata() {
        /*
            Method dump skipped, instructions count: 265
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.xiaomi.micloud.kafka.Kafka.refreshAllPartitionMetadata():void");
    }

    public List<Object> takeBatch(int i, long j, long j2) {
        ArrayList arrayList = new ArrayList();
        LOG.info("Read " + j2 + " messages from partition " + i + " starting at offset " + j);
        int i2 = 0;
        int i3 = 0;
        long j3 = j;
        while (i2 < j2 && i3 < this.consumerMaxRetry) {
            SimpleConsumer consumer = getConsumer(i);
            FetchResponse fetch = consumer.fetch(new FetchRequestBuilder().clientId(consumer.clientId()).addFetch(this.topic, i, j3, (int) this.fetchSize).build());
            if (fetch.hasError()) {
                short errorCode = fetch.errorCode(this.topic, i);
                LOG.error("Error fetching data from Broker " + this.partitionToMeta.get(Integer.valueOf(i)).leader().host() + ". Reason: " + ((int) errorCode));
                int i4 = i3 + 1;
                if (i4 == this.consumerMaxRetry && i2 < j2) {
                    LOG.error("takeBatch maximum retry reached");
                    throw new KafkaException(String.format("Fetch data from kafka partition [%d] failed after %d retries, expected: %d, actual fetched: %d. Error Code: %d", Integer.valueOf(i), Integer.valueOf(this.consumerMaxRetry), Long.valueOf(j2), Integer.valueOf(i2), Short.valueOf(errorCode)));
                }
                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                    consumer.close();
                    this.partitionToConsumer.remove(Integer.valueOf(i));
                    refreshAllPartitionMetadata();
                    i3 = i4;
                } else {
                    i3 = i4;
                }
            } else {
                Iterator it = fetch.messageSet(this.topic, i).iterator();
                int i5 = i2;
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    long offset = messageAndOffset.offset();
                    if (offset >= j3) {
                        j3 = messageAndOffset.nextOffset();
                        ByteBuffer payload = messageAndOffset.message().payload();
                        byte[] bArr = new byte[payload.remaining()];
                        payload.get(bArr, 0, bArr.length);
                        if (i5 >= j2) {
                            break;
                        }
                        arrayList.add(bArr);
                        i5++;
                    } else {
                        LOG.error("Found an old offset: " + offset + " Expecting: " + j3);
                    }
                }
                i2 = i5;
            }
        }
        LOG.info("Read " + arrayList.size() + " messages");
        return arrayList;
    }
}
