import { identity } from 'ramda';
import { BehaviorSubject, Observable } from 'rxjs';
import { filter, map, skip, takeUntil } from 'rxjs/operators';
import { millisBetween } from '../../../../utils/millis-between';
import { visiblePageTimer } from '../../../../utils/page-visibility';
import { RobotSessionId } from '../../user-session/user-events.service';

export type WatchdogTriggerCallback = (
  triggersInRowCount: number,
) => Promise<void>;

class SequentialTriggerCounter {
  constructor(private readonly coolDownDurationMillis: number) {}

  private lastTriggerTime?: Date;
  private triggersInRowCount = 0;

  updateTriggerCounter(): number {
    const now = new Date();
    if (
      this.lastTriggerTime &&
      millisBetween(this.lastTriggerTime, now) < this.coolDownDurationMillis
    ) {
      this.triggersInRowCount++;
    } else {
      this.triggersInRowCount = 0;
    }
    this.lastTriggerTime = now;
    return this.triggersInRowCount;
  }
}

export class LatencyWatchDog {
  private lastLatencyCheckInTime?: Date;
  private isStopped$ = new BehaviorSubject(false);

  private sequentialTriggerCounter = new SequentialTriggerCounter(
    this.coolDownDurationMillis,
  );

  constructor(
    private readonly robotSession: RobotSessionId,
    private readonly latency$: Observable<number>,
    private readonly maxLatencyMs: number,
    private readonly latencyGapTimeoutMs: number,
    private readonly watchdogTriggerCallback: WatchdogTriggerCallback,
    private readonly coolDownDurationMillis: number,
  ) {}

  start(startDelayMillis: number) {
    this.isStopped$.next(true);

    const startAfterMillis = Date.now() + startDelayMillis;
    const stop$ = this.isStopped$.pipe(skip(1), filter(identity));

    this.latency$
      .pipe(
        takeUntil(stop$),
        filter(() => Date.now() >= startAfterMillis),
      )
      .subscribe(async (latency) => {
        if (latency >= this.maxLatencyMs) {
          console.warn(
            `Latency of ${latency}ms exceeds max allowed of ${this.maxLatencyMs} => reconnecting`,
            this.robotSession.robotId,
          );
          await this.triggerWatchdog();
        } else {
          this.lastLatencyCheckInTime = new Date();
        }
      });

    this.lastLatencyCheckInTime = new Date(startAfterMillis);

    visiblePageTimer(startDelayMillis, this.latencyGapTimeoutMs)
      .pipe(
        takeUntil(stop$),
        map(() => {
          if (this.lastLatencyCheckInTime === undefined) {
            // it should not happen
            // lastLatencyCheckInTime should only be undefined for stopped watchdog
            // falls back to 0, since 0 is smaller that latencyGapTimeoutMs
            return 0;
          }
          const now = new Date();
          return millisBetween(this.lastLatencyCheckInTime, now);
        }),
        filter((ping) => {
          return ping >= this.latencyGapTimeoutMs;
        }),
      )
      .subscribe(async (ping) => {
        console.warn(
          `No ping for ${ping}ms exceeds max allowed of ${this.latencyGapTimeoutMs} => reconnecting`,
        );
        await this.triggerWatchdog();
      });

    this.isStopped$.next(false);
  }

  private async triggerWatchdog() {
    const isStopped = this.isStopped$.getValue();
    if (!isStopped) {
      this.stop();
      try {
        const triggersInRowCount =
          this.sequentialTriggerCounter.updateTriggerCounter();
        await this.watchdogTriggerCallback(triggersInRowCount);
      } catch (e) {
        console.error('Latency watch dog callback failed', e);
      }
    }
  }

  stop() {
    this.isStopped$.next(true);
    this.lastLatencyCheckInTime = undefined;
  }
}
