import { DataSource } from '@angular/cdk/table';
import { CollectionViewer } from '@angular/cdk/collections';
import { BehaviorSubject, Observable, throwError, merge, of, Subject } from 'rxjs';
import { map, catchError, tap, distinctUntilChanged, shareReplay, switchMapTo, filter, switchMap, takeUntil } from 'rxjs/operators';

import {InvestmentReqRes} from '../../../../../../models/investment.model';
import {EngagementTableService} from './engagement-table.service';
import {SearchOptionsResponse} from '../../../../../../../shared/models/SearchOptionsResponse.model';
import {EngagementSearchRequest} from './engagementSearchRequest.model';
import {GpInvestmentDataService} from '../../../../../../../services/gp/gp-investment-data.service';


export class EngagementDataSource extends DataSource<InvestmentReqRes> {
  private destroy$ = new Subject<void>();

  // Triggers for refreshing data
  private searchOptions$ = this.engagementTableService.searchOptions$;
  private refreshData$ = this.engagementTableService.refreshData$.pipe(tap(_ => this.engagementTableService.updatePageNumber(0)));

  private _totalRowsCount$ = new BehaviorSubject<number>(0);
  public totalRowsCount$ = this._totalRowsCount$.pipe(takeUntil(this.destroy$), distinctUntilChanged(), shareReplay(1));

  private _isLoading$ = new BehaviorSubject<boolean>(false);
  public isLoading$ = this._isLoading$.pipe(takeUntil(this.destroy$), shareReplay(1));

  public pageRows$ = merge(this.refreshData$, this.searchOptions$)
    .pipe(
      switchMapTo(this.searchOptions$),
      takeUntil(this.destroy$),
      filter(searchOptions => {
        return searchOptions !== null;
      }),
      tap(() => this._isLoading$.next(true)),
      switchMap(searchOptions => this.getInvestmentsWithOfferingDeck(searchOptions)),
      tap(response => {
        this._totalRowsCount$.next(response.totalCount);
      }),
      map(response => response.rows),
      tap(() => this._isLoading$.next(false)),
      shareReplay(1)
    );

  constructor(
    private engagementTableService: EngagementTableService,
    private investmentDataService: GpInvestmentDataService) {
    super();
  }

  connect(collectionViewer: CollectionViewer): Observable<InvestmentReqRes[]> {
    this._isLoading$.next(true);
    return this.pageRows$.pipe(
      tap(() => this._isLoading$.next(false)),
      catchError((error) => {
        this._isLoading$.next(false);
        return throwError(error);
      })
    );
  }

  disconnect(collectionViewer: CollectionViewer): void {
    this.destroy$.next();
    this.destroy$.complete();
  }

  // This creates an inner observable, which if fails, it will not cause the subscription for the pageRows to complete.
  private getInvestmentsWithOfferingDeck(searchOptions: EngagementSearchRequest) {
    return this.investmentDataService.getInvestmentsWithOfferingDeck(searchOptions)
      .pipe(catchError(error => {
        return of(new SearchOptionsResponse<InvestmentReqRes>());
      }));
  }
}
