package com.nfl.mobile.service.websocket;

import com.nfl.mobile.rx.Errors;
import com.nfl.mobile.service.GameStreamServiceBaseImpl;
import com.nfl.mobile.service.shieldapi.ShieldApiService;
import com.nfl.mobile.shieldmodels.game.DrivesContentPreparingMap;
import com.nfl.mobile.shieldmodels.game.Game;
import com.nfl.mobile.shieldmodels.game.NatsPayloadMessagesToGameMaps;
import com.nfl.mobile.shieldmodels.pagers.DrivePager;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import timber.log.Timber;

/* loaded from: classes2.dex */
public class GameStreamServiceNatsImpl extends GameStreamServiceBaseImpl {
    private final NatsMessageService natsMessageService;
    private Subscription natsMessageSubscription;
    private final ShieldApiService shieldApiService;
    DrivesContentPreparingMap drivesContentPreparingMap = new DrivesContentPreparingMap();
    private ConcurrentHashMap<String, Deque<Game>> cachedGameDiffs = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Subscription> cacheGameSubscriptions = new ConcurrentHashMap<>();
    private HashSet<String> staleGameIds = new HashSet<>();

    public GameStreamServiceNatsImpl(NatsMessageService natsMessageService, ShieldApiService shieldApiService) {
        this.shieldApiService = shieldApiService;
        this.natsMessageService = natsMessageService;
    }

    private synchronized void cacheGame(Game game) {
        Deque<Game> deque = this.cachedGameDiffs.get(game.id);
        if (deque != null) {
            while (deque.peek() != null) {
                Game poll = deque.poll();
                if (poll.drives != null && poll.drives.data != null) {
                    poll.drives = this.drivesContentPreparingMap.call(poll.drives);
                }
                game.merge(poll);
            }
        }
        replaceGameInCache(game);
        this.cachedGameDiffs.remove(game.id);
        this.staleGameIds.remove(game.id);
    }

    private synchronized void cacheGameDiff(Game game) {
        Deque<Game> deque = this.cachedGameDiffs.get(game.id);
        if (deque == null) {
            deque = new LinkedList<>();
            this.cachedGameDiffs.put(game.id, deque);
        }
        deque.add(game);
    }

    private synchronized boolean isStale(Game game) {
        boolean z;
        if (isInCache(game.id)) {
            z = this.staleGameIds.contains(game.id);
        }
        return z;
    }

    public static /* synthetic */ Boolean lambda$connect$721(List list) {
        return Boolean.valueOf(!list.isEmpty());
    }

    public static /* synthetic */ Game lambda$onGamesReceived$722(Game game, DrivePager drivePager) {
        game.drives = drivePager;
        return game;
    }

    public /* synthetic */ void lambda$onGamesReceived$723(Game game, Game game2) {
        cacheGame(game2);
        Subscription subscription = this.cacheGameSubscriptions.get(game.id);
        if (subscription != null && !subscription.isUnsubscribed()) {
            subscription.unsubscribe();
            this.cacheGameSubscriptions.remove(game.id);
        }
        exposeGamesCache();
    }

    public /* synthetic */ void lambda$onGamesReceived$724(Game game, Throwable th) {
        staleGame(game.id);
    }

    private synchronized void staleCache() {
        this.staleGameIds.clear();
        this.staleGameIds.addAll(getStreamedGameIds());
        Timber.w("mark all games in cache as stale! games: %d", Integer.valueOf(this.staleGameIds.size()));
    }

    private synchronized void staleGame(String str) {
        this.staleGameIds.add(str);
        Timber.w("mark game %s in cache as stale!", str);
    }

    @Override // com.nfl.mobile.service.GameStreamService
    public void connect() {
        Func1<? super List<Game>, Boolean> func1;
        Observable<List<Game>> gamesObservable = getGamesObservable();
        func1 = GameStreamServiceNatsImpl$$Lambda$1.instance;
        this.natsMessageSubscription = gamesObservable.filter(func1).observeOn(Schedulers.io()).subscribe(GameStreamServiceNatsImpl$$Lambda$2.lambdaFactory$(this), Errors.log());
    }

    @Override // com.nfl.mobile.service.GameStreamService
    public void disconnect() {
        if (this.natsMessageSubscription != null && !this.natsMessageSubscription.isUnsubscribed()) {
            this.natsMessageSubscription.unsubscribe();
            this.natsMessageSubscription = null;
        }
        staleCache();
    }

    public Observable<List<Game>> getGamesObservable() {
        return this.natsMessageService.getNatsMessagesObservable().buffer(5L, TimeUnit.SECONDS).map(new NatsPayloadMessagesToGameMaps());
    }

    public void onGamesReceived(List<Game> list) {
        Func2 func2;
        for (Game game : list) {
            if (isStale(game)) {
                Timber.w("Game %s is stale! reloading from shield api!", game.id);
                cacheGameDiff(game);
                Subscription subscription = this.cacheGameSubscriptions.get(game.id);
                if (subscription == null || subscription.isUnsubscribed()) {
                    ConcurrentHashMap<String, Subscription> concurrentHashMap = this.cacheGameSubscriptions;
                    String str = game.id;
                    Observable<Game> game2 = this.shieldApiService.getGame(game.id);
                    Observable<R> map = this.shieldApiService.getDrives(game.id).map(new DrivesContentPreparingMap());
                    func2 = GameStreamServiceNatsImpl$$Lambda$3.instance;
                    concurrentHashMap.put(str, Observable.zip(game2, map, func2).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(GameStreamServiceNatsImpl$$Lambda$4.lambdaFactory$(this, game), GameStreamServiceNatsImpl$$Lambda$5.lambdaFactory$(this, game)));
                }
            } else {
                if (game.drives != null && game.drives.data != null) {
                    game.drives = this.drivesContentPreparingMap.call(game.drives);
                }
                refreshGameInCache(game);
            }
        }
        exposeGamesCache();
    }
}
