import { Observable, Subject } from 'rxjs';
import { filter, take, takeUntil } from 'rxjs/operators';

import { SignalingConnection } from './signaling-connection';
import { SignalingMessage } from './signaling-server-messages';
import { Finalizable } from '../../../../utils/finalizable';
import { WebRtcPeerConnection } from './webrtc-peer-connection';
import { sleep } from '../../../../utils/sleep';
import { completeAll } from '../../../../utils/complete-all';

const RETRY_RECOVER_CONNECTION_DELAY = 1000;

export class WebRtcManager extends Finalizable {
  private readonly webRtcPeerConnection: WebRtcPeerConnection;
  private _sessionDescriptionOffered$ = new Subject();
  private _iceCandidateOffered$ = new Subject();

  // Observables.
  readonly dataChannel$: Observable<RTCDataChannel>;
  readonly videoStream$: Observable<MediaStreamTrack>;
  readonly stats$: Observable<RTCStatsReport>;
  readonly robotAudioTrack$: Observable<MediaStreamTrack>;
  readonly operatorAudioTrack$: Observable<MediaStreamTrack>;

  constructor(private readonly signalingConnection: SignalingConnection) {
    super();
    this.webRtcPeerConnection = new WebRtcPeerConnection(
      this.signalingConnection,
    );

    this.dataChannel$ = this.webRtcPeerConnection.dataChannel$;
    this.videoStream$ = this.webRtcPeerConnection.videoStream$;
    this.stats$ = this.webRtcPeerConnection.stats$;
    this.robotAudioTrack$ = this.webRtcPeerConnection.robotAudioTrack$;
    this.operatorAudioTrack$ = this.webRtcPeerConnection.operatorAudioTrack$;

    this.setupSubscriptions();
  }

  async connect(forceConnectionSwitch = false) {
    await this.waitSignalingConnected();
    const isNewConnection =
      await this.webRtcPeerConnection.initPeerConnection();
    await this.webRtcPeerConnection.startSignaling(
      forceConnectionSwitch,
      isNewConnection,
    );
  }

  close() {
    this.webRtcPeerConnection.close();
  }

  private setupSubscriptions() {
    // This connection becomes useless once the signaling connection is finalized, so finalize this, too.
    this.signalingConnection.finalized$
      .pipe(takeUntil(this.finalized$))
      .subscribe((_) => {
        this.finalize();
      });

    this.signalingConnection.signalingMessage$
      .pipe(takeUntil(this.finalized$))
      .subscribe((message: SignalingMessage) => {
        this.onSignalingMessage(message);
      });
  }

  async waitCompleteSignalingOffer() {
    await Promise.all([
      this._sessionDescriptionOffered$.pipe(take(1)).toPromise(),
      this._iceCandidateOffered$.pipe(take(1)).toPromise(),
    ]);
  }

  private async waitSignalingConnected() {
    await this.signalingConnection.connected$
      .pipe(filter(Boolean), take(1))
      .toPromise();
  }

  private async onSignalingMessage(message: SignalingMessage) {
    try {
      if (message.createNewPeerConnection && message.sessionDescription) {
        if (
          message.sessionDescription.type === 'answer' ||
          message.sessionDescription.type === 'pranswer'
        ) {
          console.log(
            'Reconnect with new peer connection, as requested by remote peer',
          );
          this.close();
          await this.connect();
          return;
        } else {
          console.error(
            'Got request to create new peer connection without appropriate session description',
          );
        }
      }
    } catch (e) {
      console.error('Failed handle to signalling message', e);
    }

    if (!this.webRtcPeerConnection.isPeerConnectionMatch(message)) {
      console.warn(
        'Received signaling message from non-matching peer connection',
      );
      return;
    }

    if (!this.webRtcPeerConnection.isNegotiationIndexMatch(message)) {
      console.log('Dropping signaling message from old negotiation');
      return;
    }
    try {
      if (message.sessionDescription) {
        await this.webRtcPeerConnection.setRemoteSessionDescription(
          message.sessionDescription,
        );
        this._sessionDescriptionOffered$.next(undefined);
      }

      if (message.iceCandidate) {
        await this.webRtcPeerConnection.addIceCandidate(message.iceCandidate);
        this._iceCandidateOffered$.next(undefined);
      }
    } catch (e) {
      console.error('Failed to update peer connection state', e);
      await this.recoverConnection();
    }
  }

  private async recoverConnection() {
    try {
      this.webRtcPeerConnection.close();
      await this.connect();
    } catch (e) {
      console.error('Failed to recover connection state', e);
      await sleep(RETRY_RECOVER_CONNECTION_DELAY);
      await this.recoverConnection();
    }
  }

  protected async onFinalize(): Promise<void> {
    completeAll(this._sessionDescriptionOffered$);
    this.close();
    await this.webRtcPeerConnection.finalize();
  }
}
