package cz.seznam.mapy.logger;

import cz.seznam.kommons.rx.IRxSchedulers;
import cz.seznam.mapy.logger.writer.ILogWriter;
import cz.seznam.mapy.utils.Log;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;

/* compiled from: AbstractLogger.kt */
/* loaded from: classes.dex */
public abstract class AbstractLogger implements ILogger {
    public static final Companion Companion = new Companion(null);
    public static final String TAG = "AbstractLogger";
    private Disposable disposable;
    private final LinkedBlockingQueue<String> messageQueue;
    private final IRxSchedulers schedulers;
    private final FlowableOnSubscribe<String> source;

    /* compiled from: AbstractLogger.kt */
    /* loaded from: classes.dex */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public AbstractLogger(IRxSchedulers schedulers) {
        Intrinsics.checkNotNullParameter(schedulers, "schedulers");
        this.schedulers = schedulers;
        this.messageQueue = new LinkedBlockingQueue<>();
        this.source = new FlowableOnSubscribe<String>() { // from class: cz.seznam.mapy.logger.AbstractLogger$source$1
            @Override // io.reactivex.FlowableOnSubscribe
            public final void subscribe(FlowableEmitter<String> it) {
                LinkedBlockingQueue linkedBlockingQueue;
                Intrinsics.checkNotNullParameter(it, "it");
                while (true) {
                    linkedBlockingQueue = AbstractLogger.this.messageQueue;
                    String str = (String) linkedBlockingQueue.poll(1L, TimeUnit.SECONDS);
                    if (str == null) {
                        it.onComplete();
                        return;
                    }
                    it.onNext(str);
                }
            }
        };
    }

    private final void processQueue() {
        Disposable disposable = this.disposable;
        if (disposable == null || disposable.isDisposed()) {
            this.disposable = Flowable.create(this.source, BackpressureStrategy.BUFFER).doOnNext(new Consumer<String>() { // from class: cz.seznam.mapy.logger.AbstractLogger$processQueue$1
                @Override // io.reactivex.functions.Consumer
                public final void accept(String it) {
                    ILogWriter logWriter = AbstractLogger.this.getLogWriter();
                    Intrinsics.checkNotNullExpressionValue(it, "it");
                    logWriter.writeLine(it);
                }
            }).doFinally(new Action() { // from class: cz.seznam.mapy.logger.AbstractLogger$processQueue$2
                @Override // io.reactivex.functions.Action
                public final void run() {
                    AbstractLogger.this.disposable = null;
                    AbstractLogger.this.getLogWriter().close();
                }
            }).subscribeOn(this.schedulers.io()).subscribe(new Consumer<String>() { // from class: cz.seznam.mapy.logger.AbstractLogger$processQueue$3
                @Override // io.reactivex.functions.Consumer
                public final void accept(String str) {
                }
            }, new Consumer<Throwable>() { // from class: cz.seznam.mapy.logger.AbstractLogger$processQueue$4
                @Override // io.reactivex.functions.Consumer
                public final void accept(Throwable th) {
                    Log.e(AbstractLogger.TAG, "Queue processing error.", th);
                }
            }, new Action() { // from class: cz.seznam.mapy.logger.AbstractLogger$processQueue$5
                @Override // io.reactivex.functions.Action
                public final void run() {
                    Log.d(AbstractLogger.TAG, "Queue processing complete.");
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract ILogWriter getLogWriter();

    protected abstract Function1<String, String> getMessageDecorator();

    @Override // cz.seznam.mapy.logger.ILogger
    public void log(String message) {
        String invoke;
        Intrinsics.checkNotNullParameter(message, "message");
        LinkedBlockingQueue<String> linkedBlockingQueue = this.messageQueue;
        Function1<String, String> messageDecorator = getMessageDecorator();
        if (messageDecorator != null && (invoke = messageDecorator.invoke(message)) != null) {
            message = invoke;
        }
        linkedBlockingQueue.add(message);
        processQueue();
    }

    protected abstract void setMessageDecorator(Function1<? super String, String> function1);
}
