import {
  BehaviorSubject,
  Observable,
  Subject,
  distinct,
  EMPTY,
  takeUntil,
} from 'rxjs';
import { User } from '../user';
import gitStatus from '../../../git-version.json';
export type WebSocketData = {
  sendData: unknown;
  receiveData: unknown;
};

const CONNECTION_VERSION = 2;

const WEBSOCKET_OFF_STATES: number[] = [WebSocket.CLOSED, WebSocket.CLOSING];

export class WebSocketConnection<REQ = unknown, RES = unknown> {
  private undeliveredMessages: RES[] = [];

  private readonly _message$ = new Subject<REQ>();
  readonly message$ = this._message$.asObservable();

  private readonly _unexpectedClose$ = new Subject();
  readonly unexpectedClose$ = this._unexpectedClose$.asObservable();

  private readonly _error$ = new Subject();
  readonly error$ = this._error$.pipe(takeUntil(this._unexpectedClose$));

  private readonly _isConnected = new BehaviorSubject(false);
  readonly isConnected$ = this._isConnected.pipe(distinct());

  private webSocket: WebSocket | null = null;

  constructor(
    private url: string,
    private accessToken: Promise<string>,
    private user: Promise<User>,
    readonly sessionId: string,
  ) {}

  start(): Observable<void> {
    if (
      this.webSocket !== null &&
      !WEBSOCKET_OFF_STATES.includes(this.webSocket.readyState)
    ) {
      return EMPTY;
    }

    this.webSocket = new WebSocket(this.url);
    this.webSocket.onopen = async () => {
      const user = await this.user;
      const accessToken = await this.accessToken;

      this.webSocket?.send(
        JSON.stringify({
          accessToken,
          sessionId: this.sessionId,
          version: CONNECTION_VERSION,
          userId: user.uid,
          commitHash: gitStatus.revision,
        }),
      );
    };

    this.webSocket.onerror = (e) => {
      if (this._isConnected.value === true) {
        console.error('WebSocket error', e);
        this._error$.next(e);
      }
    };

    this.webSocket.onclose = () => {
      if (this._isConnected.value === true) {
        this._unexpectedClose$.next(undefined);
      }
    };

    const onOpen = new Subject<void>();
    this.webSocket.onmessage = (message) => {
      this._isConnected.next(true);

      if (!onOpen.closed) {
        onOpen.next();
        onOpen.complete();

        this.sendUndeliveredMessages();
      }

      const jsonMessage = JSON.parse(message.data);
      this._message$.next(jsonMessage);
    };

    return onOpen.asObservable();
  }

  private sendUndeliveredMessages() {
    for (const message of this.undeliveredMessages) {
      this.send(message);
    }

    this.undeliveredMessages = [];
  }

  close() {
    this._isConnected.next(false);
    this.webSocket?.close();
    this.webSocket = null;
  }

  send(data: RES) {
    if (this.webSocket?.readyState === WebSocket.OPEN) {
      this.webSocket.send(JSON.stringify(data));
    } else {
      this.undeliveredMessages.push(data);
    }
  }
}
