package awe;

import com.uber.streaming.ramen.Msg;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

/* loaded from: classes2.dex */
public class ad extends q {
    public ad(aoo.a aVar, aah.a aVar2, awg.b bVar, int i2) {
        super(i2, aVar, aVar2, bVar);
    }

    @Override // awe.q
    protected <T> Observable<qt.b<T>> a(final qp.m<T> mVar, final String str) {
        Observable<Msg> filter = this.f14209d.filter(new Predicate<Msg>() { // from class: awe.ad.1
            @Override // io.reactivex.functions.Predicate
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public boolean test(Msg msg) {
                return msg.getType().equals(mVar.getMessageType());
            }
        });
        if (this.f14206a != null) {
            filter = filter.observeOn(Schedulers.a(this.f14206a));
        }
        Observable<qt.b<T>> observable = (Observable<qt.b<T>>) filter.doOnSubscribe(new Consumer<Disposable>() { // from class: awe.ad.4
            @Override // io.reactivex.functions.Consumer
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public void accept(Disposable disposable) throws Exception {
                ad.this.a(mVar.getMessageType(), str);
            }
        }).doOnDispose(new Action() { // from class: awe.ad.3
            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                ad.this.b(mVar.getMessageType(), str);
            }
        }).flatMap(new Function<Msg, ObservableSource<qt.b<T>>>() { // from class: awe.ad.2
            @Override // io.reactivex.functions.Function
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public ObservableSource<qt.b<T>> apply(Msg msg) {
                ad.this.a(msg, mVar.getMessageType());
                ad.this.f14214i.a(msg, ad.this.b(mVar.getMessageType()));
                return Observable.just(ad.this.a(msg, mVar));
            }
        });
        Msg msg = this.f14207b.get(mVar.getMessageType());
        if (msg == null) {
            return observable;
        }
        a(mVar.getMessageType());
        return Observable.merge(observable, Observable.just(a(msg, mVar)));
    }
}
