import { BehaviorSubject, Subject } from 'rxjs';
import { takeUntil, distinctUntilChanged, take } from 'rxjs/operators';

import { SignalingService } from './signaling.service';
import {
  SignalingMessage,
  ConnectionError,
  ClientInfo,
  NetworkInterfaceStats,
  NetworkMessage,
} from './signaling-server-messages';
import { TimeoutError } from '../../../../utils/throw-after';
import { sleep } from '../../../../utils/sleep';
import { Finalizable } from '../../../../utils/finalizable';
import { completeAll } from '../../../../utils/complete-all';
import { resolveWithTimeout } from '../../../../utils/resolve-with-timeout';

const RECONNECT_TIMEOUT_MILLIS = 1500;
const SIGNALLING_CONNECTION_TIMEOUT_MILLIS = 15000;

type SignalingConnectionStatus = 'online' | 'connecting' | 'offline';

export class SignalingConnection extends Finalizable {
  private connectionStatus: SignalingConnectionStatus = 'offline';

  private readonly _signalingMessage$ = new Subject<SignalingMessage>();
  readonly signalingMessage$ = this._signalingMessage$.asObservable();

  private readonly _connected$ = new BehaviorSubject<boolean>(false);
  readonly connected$ = this._connected$
    .asObservable()
    .pipe(distinctUntilChanged());

  private readonly _networkInterfaceStats$ = new Subject<
    NetworkInterfaceStats[]
  >();
  readonly networkInterfaceStats$ = this._networkInterfaceStats$.asObservable();

  private connectedClientDisplayName: string;

  constructor(
    private connectedClientId: string,
    private signalingService: SignalingService,
  ) {
    super();
    this.connectedClientDisplayName = `UNKNOWN (id=${connectedClientId})`;
    this.setupSubscriptions();
  }

  sendSessionDescription(
    sessionDescription: RTCSessionDescription,
    peerConnectionId: number,
    negotiationIndex: number,
    createNewRemotePeerConnection: boolean,
    forceRemoteConnectionSwitch: boolean,
  ) {
    return this.signalingService.sendSignalingMessage({
      connectedClientId: this.connectedClientId,
      sessionDescription,
      peerConnectionId,
      negotiationIndex,
      createNewPeerConnection: createNewRemotePeerConnection,
      forceConnectionSwitch: forceRemoteConnectionSwitch,
    });
  }

  sendIceCandidate(
    iceCandidate: RTCIceCandidate,
    peerConnectionId: number,
    negotiationIndex: number,
  ) {
    return this.signalingService.sendSignalingMessage({
      connectedClientId: this.connectedClientId,
      iceCandidate,
      peerConnectionId,
      negotiationIndex,
    });
  }

  selectNetworkInterface(selectedNetworkInterface: string) {
    return this.signalingService.selectNetworkInterfaceMessage({
      connectedClientId: this.connectedClientId,
      selectedNetworkInterface,
    });
  }

  private setupSubscriptions() {
    this.signalingService.connectedToServer$
      .pipe(takeUntil(this.finalized$))
      .subscribe(async (connected) => {
        if (
          connected &&
          !this.signalingService.isClientConnected(this.connectedClientId)
        ) {
          this.connectionStatus = 'connecting';
          await this.connectWithinTimeout();
        }
      });

    this.signalingService.openedClientConnection$
      .pipe(takeUntil(this.finalized$))
      .subscribe((connectedClientInfo: ClientInfo) => {
        if (connectedClientInfo.id !== this.connectedClientId) {
          return;
        }
        this.connectedClientDisplayName = connectedClientInfo.displayName;
        this._connected$.next(true);
        console.info(`Connected to ${this.connectedClientDisplayName}`);
      });

    this.signalingService.closedClientConnection$
      .pipe(takeUntil(this.finalized$))
      .subscribe(async (connectedClientId: string) => {
        if (connectedClientId !== this.connectedClientId) {
          return;
        }
        this._connected$.next(false);
        console.info(`Disconnected from ${this.connectedClientDisplayName}`);
        await this.reconnect();
      });

    this.signalingService.connectionError$
      .pipe(takeUntil(this.finalized$))
      .subscribe(async (error: ConnectionError) => {
        console.info(
          `'${error.connectedClient.displayName}' connection error: ${error.errorMessage}`,
        );
      });

    this.signalingService.signalingMessage$
      .pipe(takeUntil(this.finalized$))
      .subscribe((message: SignalingMessage) => {
        if (message.connectedClient?.id !== this.connectedClientId) {
          return;
        }
        this._signalingMessage$.next(message);
      });

    this.signalingService.networkMessage$
      .pipe(takeUntil(this.finalized$))
      .subscribe((message: NetworkMessage) => {
        if (
          message.connectedClient?.id !== this.connectedClientId ||
          !message.networkInterfaceStats
        ) {
          return;
        }
        this._networkInterfaceStats$.next(message.networkInterfaceStats);
      });
  }

  async reconnect() {
    try {
      if (this.connectionStatus === 'connecting') {
        return;
      }
      this.connectionStatus = 'connecting';
      this.signalingService.disconnectFromClient(this.connectedClientId);
      await sleep(RECONNECT_TIMEOUT_MILLIS);
      console.info(
        `Trying to reconnect to ${this.connectedClientDisplayName}...`,
      );
      await this.connectWithinTimeout();
    } catch (e) {
      console.error('Failed to reconnect with signaling', e);
      this.connectionStatus = 'offline';
      await sleep(RECONNECT_TIMEOUT_MILLIS);
      await this.reconnect();
    }
  }

  private async connectWithinTimeout() {
    try {
      await resolveWithTimeout(
        this.doConnect(),
        SIGNALLING_CONNECTION_TIMEOUT_MILLIS,
        'signalling connection timeout',
      );
      this.connectionStatus = 'online';
    } catch (e) {
      if (e instanceof TimeoutError) {
        await this.reconnect();
      }
    }
  }

  private async doConnect(): Promise<void> {
    await this.signalingService.connectToClient(this.connectedClientId);
    await this.connected$.pipe(take(1)).toPromise();
  }

  protected async onFinalize(): Promise<void> {
    // Finalizing is irreversible.
    completeAll(
      this._signalingMessage$,
      this._connected$,
      this._networkInterfaceStats$,
    );
  }

  getIceServers(): Promise<RTCIceServer[]> {
    return this.signalingService.getIceServers();
  }
}
