import { Observable, BehaviorSubject, Subject, of, merge, EMPTY as empty } from 'rxjs';
import {
  map,
  exhaustMap,
  switchMap,
  mergeMap,
  withLatestFrom,
  catchError,
  throttleTime,
  filter, tap, shareReplay, share
} from 'rxjs/operators';

import { ChannelItem, AppLibrary } from '../models';

import { ItemFilter, ChannelQueryResultClass, ChannelQueryResultList, LibraryTagData, AppMetaData, ProgressSubject, StreamProgressEvent, StreamProgressManager } from '../classes';

import { AppMetaDataService } from './app-meta-data.service';
import { TagCacheService } from './tag-cache.service';
import { AbstractChannelItemApiService } from './abstract-channel-item-data.service';
import { Injectable, OnDestroy } from '@angular/core';

export abstract class AbstractChannelItemCache<T extends ChannelItem> {
  private readonly resultsSource = new BehaviorSubject<ChannelQueryResultList<T>>(null);
  private readonly updateEventSource = new BehaviorSubject<string>('initial');
  private readonly progressSource: ProgressSubject = new Subject<StreamProgressEvent>();

  public readonly progress$ = this.progressSource.asObservable();
  public readonly results$ = this.resultsSource.asObservable();

  protected constructor(metaService: AppMetaDataService, tagCache: TagCacheService, private dataService: AbstractChannelItemApiService<T>) {

    const name = this.dataService.getName();

    const masterQueryStream$: Observable<ChannelQueryResultList<T>> = tagCache.data$.pipe(
      withLatestFrom(metaService.metaData$),
      switchMap(([libraryTags, meta]) => this.buildMasterQueryStream(libraryTags, meta)),
      catchError( err => {
        console.error(`${name} : Failed to build master library stream`, err);
        return of(null);
      }),
      filter(o => !!o), // remove null results
      share() // prevents the http calls being repeated, but no need to replay as we don't have late subscribers
    );

    const channelInvalidatedQueryStream$: Observable<ChannelQueryResultList<T>> = dataService.channelInvalidated$.pipe(
      withLatestFrom(masterQueryStream$),
      mergeMap(([channelId, results]) => {
        const channelResult = results && results.getItem(channelId);
        if (!channelResult) {
          return empty;
        }

        return this.buildChannelUpdateStream(channelResult)
          .pipe(map(() => results));
      })
    );

    const mergedQueryStream$ = merge(masterQueryStream$, channelInvalidatedQueryStream$);

    this.updateEventSource
      .pipe(throttleTime(1000), exhaustMap(() => mergedQueryStream$))
      .subscribe(this.resultsSource);
  }

  protected destroy(): void {
    this.resultsSource.complete();
    this.updateEventSource.complete();
    this.progressSource.complete();
  }

  refresh() {
    this.updateEventSource.next('refresh');
  }

  /**
   * Get list o required tags from given library mode
   * @param {AppLibrary} library Current library id
   * @returns {Guid[] | null}
   */
  protected abstract getItemFilter(library: AppLibrary): ItemFilter<T>;

  private buildMasterQueryStream(libraryTags: LibraryTagData, meta: AppMetaData): Observable<ChannelQueryResultList<T>> {

    if (!libraryTags || !meta) {
      return of(null);
    }

    const channels = meta.getReadChannels();
    const library = libraryTags.library;
    const channelCount = channels.length;
    const itemFilter = this.getItemFilter(libraryTags.library);

    const results = new ChannelQueryResultList<T>();
    const taskManager = new StreamProgressManager(this.progressSource);

    // loop through all channels and create tasks for snippet and template sources
    for (let i = 0; i < channelCount; i++) {
      const channel = channels[i];
      const result = new ChannelQueryResultClass<T>(library, channel, itemFilter);
      results.items.push(result);

      taskManager.push(this.buildChannelUpdateStream(result));
    }

    // combine the tasks an map result
    return taskManager.forkJoin().pipe(
      map(() => results)
    );
  }

  private buildChannelUpdateStream(
    result: ChannelQueryResultClass<T>)
  : Observable<ChannelQueryResultClass<T>> {

    if (!result) {
      throw Error('invalid channelData');
    }

    return this.dataService.getChannelItemsByToken(result.library, result.channel).pipe(
      map(data => {
        result.setData(data);
        result.error = null;
        return result;
      }),
      catchError(err => {
        result.error = err;
        return of(result);
      })
    );
  }
}
