
import { Subject,  Observable,  forkJoin } from 'rxjs';
import { tap } from 'rxjs/operators';

export type ProgressSubject = Subject<StreamProgressEvent>;

export interface IStreamProgressEventInit extends ProgressEventInit {
  error?: string;
  message?: string;
  complete: boolean;
}

export class StreamProgressEvent {

  public readonly loaded: number;
  public readonly total: number;
  public readonly complete: boolean;
  public readonly error: string;
  public readonly message: string;

  constructor(args: IStreamProgressEventInit) {
    this.loaded = args.loaded || 0;
    this.total = args.total || 1;
    this.complete = args.complete;
    this.error = args.error;
    this.message = args.message;
  }
}

export class StreamProgressManager {

  public loaded: number = 0;

  public tasks: Observable<any>[] = [];

  constructor(private subject: ProgressSubject) {
  }

  /**
   * Add task to reporter query
   * @param {Observable<any>} task
   */
  public push(task: Observable<any>) {

    // push task onto queue with added 'do' handler
    this.tasks.push(task.pipe(tap(res => this.next())));
  }

  public get total(): number {
    return this.tasks.length;
  }

  public get isComplete() {
    return this.loaded === this.tasks.length;
  }

  private next() {
    if (this.loaded >= this.total) {
      return;
    }

    this.loaded++;
    this.emit();
  }

  public error(error: string) {
    this.emit(null, error);
  }

  private emit(message: string = null, error: string = null) {

    const args: IStreamProgressEventInit = {
      loaded: this.loaded,
      total: this.total,
      error: error,
      message: message,
      complete: this.isComplete
    };

    this.subject.next(new StreamProgressEvent(args));
  }

  public forkJoin(): Observable<any> {

    // initial event
    this.emit(null, null);

    // run all tasks and return the generated data at the end
    return forkJoin(this.tasks).pipe(
      tap(
        res => {},
        err => this.error(err)
      )
    );
  }
}
