import { Injectable } from '@angular/core';
import { delay, interval, Observable, retryWhen, share, Subject, Subscription, switchMap, tap } from 'rxjs';
import { Project } from './domain/project';
import { QueueingSubject } from 'queueing-subject';
import makeWebSocketObservable, { GetWebSocketResponses, WebSocketOptions } from 'rxjs-websockets';
import { AuthenticationService } from '../auth/services/authentication.service';
import { PathwayConfigurationService } from '../auth/services/pathway-configuration.service';
import { ICredentials } from 'aws-amplify/lib/Common/types/types';

@Injectable({
  providedIn: 'root'
})
export class ProjectUpdateService {
  private errorRetryMillis = 10000;
  private keepAliveIntervalMillis = 60000;
  private keepConnectionAliveSubscription: Subscription;
  private projectUpdatesSubject: Subject<Project> = new Subject();
  private auditLogUpdatesSubject: Subject<boolean> = new Subject();

  input$ = new QueueingSubject<string>();
  socket$: Observable<GetWebSocketResponses<any>> = undefined;

  public projectUpdates: Observable<Project> = this.projectUpdatesSubject.asObservable();
  public auditLogUpdates: Observable<boolean> = this.auditLogUpdatesSubject.asObservable();

  constructor(
    private pathwayConfiguration: PathwayConfigurationService,
    private authenticationService: AuthenticationService
  ) {
    this.authenticationService.signOutObservable.subscribe(() => {
      this.cancelKeepConnectionAlive();
    });
  }

  async getWebSocketOptions(credentials: ICredentials): Promise<WebSocketOptions> {
    return {
      makeWebSocket: (url: string, protocols?: string | string[]) => {
        const signedUrl = this.getSignedUrl(credentials, url);
        return new WebSocket(signedUrl, protocols);
      },
      protocols: undefined
    };
  }

  nextProjectUpdates(project: Project) {
    this.projectUpdatesSubject.next(project);
  }

  nextAuditLogUpdates() {
    this.auditLogUpdatesSubject.next(true);
  }

  async makeJsonWebSocketObservable(projectId: string, tenant: string, url?: string): Promise<Observable<unknown>> {
    const credentials = await this.authenticationService.getCurrentCredentials();

    this.queueConnectionMessage(projectId, tenant);
    this.keepConnectionAlive();

    this.socket$ = makeWebSocketObservable(url, await this.getWebSocketOptions(credentials));
    return this.socket$.pipe(
      switchMap((getResponses: GetWebSocketResponses<string>) => getResponses(this.input$)),
      retryWhen(errors =>
        errors.pipe(
          delay(this.errorRetryMillis),
          tap(() => {
            this.queueConnectionMessage(projectId, tenant);
          })
        )
      ),
      share()
    );
  }

  private queueConnectionMessage(projectId: string, tenant: string) {
    const connectionMessage = {
      action: 'subscribeChannel',
      channelId: projectId,
      tenant
    };
    this.queueMessage(JSON.stringify(connectionMessage));
  }

  private getSignedUrl(credentials: ICredentials, url: string) {
    return this.authenticationService.signUrl(url || this.pathwayConfiguration.websocket, credentials);
  }

  queueMessage(message: string) {
    this.input$.next(message);
  }

  private keepConnectionAlive() {
    this.cancelKeepConnectionAlive();
    this.keepConnectionAliveSubscription = interval(this.keepAliveIntervalMillis).subscribe(() => {
      this.queueMessage('PING');
    });
  }

  private cancelKeepConnectionAlive() {
    if (this.keepConnectionAliveSubscription) {
      this.keepConnectionAliveSubscription.unsubscribe();
    }
  }

  close() {
    this.cancelKeepConnectionAlive();
  }
}
