package com.xiaomi.micloud.kafka;

import com.xiaomi.micloud.kafka.exception.KafkaException;
import com.xiaomi.micloud.kafka.exception.KafkaInitializeException;
import com.xiaomi.micloud.util.Constants;
import com.xiaomi.micloud.util.ZKSerializeHelper;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class KafkaOffsetManager {
    private static Logger logger = LoggerFactory.getLogger(KafkaOffsetManager.class);
    private ZkClient client;
    private Kafka kafka;
    private String zkPathPrefix;
    private int zkOptRetry = 5;
    private Map<Integer, KafkaMeta> partitionMeta = new HashMap();
    private String PATH_SEPERATOR = "/";

    public KafkaOffsetManager(Map map) {
        this.zkPathPrefix = "";
        this.kafka = new Kafka((Map<String, Object>) map);
        if (map.containsKey(Constants.KAFKA_OFFSET_MANAGER_PREFIX_PATH)) {
            this.zkPathPrefix = (String) map.get(Constants.KAFKA_OFFSET_MANAGER_PREFIX_PATH);
            if (this.zkPathPrefix.endsWith(this.PATH_SEPERATOR)) {
                this.zkPathPrefix = this.zkPathPrefix.substring(0, this.zkPathPrefix.length() - 1);
            }
        }
        Object obj = map.get(Constants.KAFKA_OFFSET_MANAGER_ZOOKEEPER_SERVERS);
        if (obj == null) {
            throw new KafkaInitializeException(String.format("KafkaOffsetManager config doesn't contain [%s] info, which is a MUST!", Constants.KAFKA_OFFSET_MANAGER_ZOOKEEPER_SERVERS));
        }
        this.client = getClient((String) obj);
    }

    private ZkClient getClient(String str) {
        return new ZkClient(str, 30000, 30000, new BytesPushThroughSerializer());
    }

    private Properties getProperties(String str) {
        byte[] bArr = (byte[]) this.client.readData(str);
        if (bArr == null) {
            return null;
        }
        return (Properties) ZKSerializeHelper.find(Properties.class).deserialize(bArr);
    }

    private String getRealPath(String str) {
        if (StringUtils.isBlank(str)) {
            return this.zkPathPrefix;
        }
        if (!StringUtils.isEmpty(this.zkPathPrefix)) {
            str = str.startsWith(this.PATH_SEPERATOR) ? this.zkPathPrefix + str : this.zkPathPrefix + this.PATH_SEPERATOR + str;
        }
        return !str.equals("/") ? StringUtils.removeEnd(str, this.PATH_SEPERATOR) : str;
    }

    private boolean updatePersistent(String str, Properties properties) {
        try {
            this.client.updatePersistent(str, ZKSerializeHelper.find(Properties.class).serialize(properties));
            return true;
        } catch (Exception e) {
            logger.error("updatePersistent {} failed.", str, e);
            return false;
        }
    }

    public void clearAll() {
        int partitionNum = this.kafka.getPartitionNum();
        for (int i = 0; i < partitionNum; i++) {
            clearLastCommitted(i);
        }
    }

    public void clearLastCommitted(int i) {
        String realPath = getRealPath(String.valueOf(i));
        if (this.client.exists(realPath)) {
            this.client.delete(realPath);
        }
    }

    public void close() {
        if (this.client != null) {
            this.client.close();
        }
    }

    public void commit(KafkaMeta kafkaMeta) {
        String realPath = getRealPath(String.valueOf(kafkaMeta.partition));
        int i = 0;
        boolean z = false;
        while (true) {
            if (i >= this.zkOptRetry) {
                break;
            }
            if (this.client.exists(realPath)) {
                z = updatePersistent(realPath, kafkaMeta.toProperties());
            } else {
                this.client.createPersistent(realPath, true);
                z = updatePersistent(realPath, kafkaMeta.toProperties());
            }
            if (z) {
                this.partitionMeta.put(Integer.valueOf(kafkaMeta.partition), kafkaMeta);
                break;
            }
            i++;
        }
        if (!z) {
            throw new KafkaException(String.format("Save kafka metadata [%s] to Zookeeper failed after %d retries!", kafkaMeta, Integer.valueOf(this.zkOptRetry)));
        }
    }

    public Map<Integer, KafkaMeta> getAllCommited() {
        int partitionNum = this.kafka.getPartitionNum();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < partitionNum; i++) {
            hashMap.put(Integer.valueOf(i), getLastCommited(i));
        }
        return hashMap;
    }

    public KafkaDataAndMetaContainer getData(int i) {
        boolean z;
        KafkaMeta kafkaMeta;
        List<Object> takeBatch;
        Properties properties;
        KafkaMeta kafkaMeta2 = this.partitionMeta.get(Integer.valueOf(i));
        if (kafkaMeta2 == null) {
            String realPath = getRealPath(String.valueOf(i));
            if (this.client.exists(realPath) && (properties = getProperties(realPath)) != null) {
                kafkaMeta2 = new KafkaMeta(properties);
                this.partitionMeta.put(Integer.valueOf(i), kafkaMeta2);
            }
        }
        long startOffSet = this.kafka.getStartOffSet(i);
        if (kafkaMeta2 != null) {
            long j = kafkaMeta2.beginOffset + kafkaMeta2.toTake;
            if (j >= startOffSet) {
                startOffSet = kafkaMeta2.beginOffset + kafkaMeta2.toTake;
                z = false;
            } else {
                logger.error("Data expired between {} to {}, reset the start index to {}", new Object[]{Long.valueOf(j), Long.valueOf(startOffSet), Long.valueOf(startOffSet)});
                z = true;
            }
        } else {
            z = false;
        }
        long toFetchCount = this.kafka.getToFetchCount(i, startOffSet);
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("[%d] start: %d, num: %d", Integer.valueOf(i), Long.valueOf(startOffSet), Long.valueOf(toFetchCount)));
        }
        KafkaMeta kafkaMeta3 = new KafkaMeta(i, startOffSet, toFetchCount);
        if (toFetchCount == 0) {
            if (!this.partitionMeta.containsKey(Integer.valueOf(i)) || z) {
                KafkaMeta kafkaMeta4 = new KafkaMeta(i, startOffSet, toFetchCount);
                this.partitionMeta.put(Integer.valueOf(i), kafkaMeta4);
                commit(kafkaMeta4);
                kafkaMeta = kafkaMeta4;
                takeBatch = null;
            } else {
                kafkaMeta = kafkaMeta3;
                takeBatch = null;
            }
        } else if (toFetchCount < 0) {
            logger.error("Data in Partition: " + i + " has been reset!");
            KafkaMeta kafkaMeta5 = new KafkaMeta(i, startOffSet + toFetchCount, 0L);
            commit(kafkaMeta5);
            kafkaMeta = kafkaMeta5;
            takeBatch = null;
        } else {
            kafkaMeta = kafkaMeta3;
            takeBatch = this.kafka.takeBatch(i, startOffSet, toFetchCount);
        }
        return new KafkaDataAndMetaContainer(takeBatch, kafkaMeta);
    }

    public Map<Integer, KafkaDataAndMetaContainer> getData(Set<Integer> set) {
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            hashMap.put(Integer.valueOf(intValue), getData(intValue));
        }
        return hashMap;
    }

    public KafkaMeta getLastCommited(int i) {
        Properties properties;
        String realPath = getRealPath(String.valueOf(i));
        if (!this.client.exists(realPath) || (properties = getProperties(realPath)) == null) {
            return null;
        }
        return new KafkaMeta(properties);
    }

    public long getLastestOffset(int i) {
        return this.kafka.getLatestOffsite(i);
    }

    public long getStartOffset(int i) {
        return this.kafka.getStartOffSet(i);
    }
}
