import {DataSource} from '@angular/cdk/table';
import {CollectionViewer} from '@angular/cdk/collections';
import {BehaviorSubject, Observable, throwError, merge, of, Subject} from 'rxjs';
import {
  map,
  catchError,
  tap,
  distinctUntilChanged,
  shareReplay,
  switchMapTo,
  filter,
  switchMap,
  takeUntil
} from 'rxjs/operators';
import {SearchOptionsResponse} from 'src/app/shared/models/SearchOptionsResponse.model';
import {InvestingEntitySearchOptionsRequest} from './InvstingEntitySearchOptionsRequest.model';
import {GpInvestingEntityDataService} from 'src/app/services/gp/gp-investing-entity-data.service';
import {InvestingEntityReqRes} from 'src/app/dashboard/models/InvestingEntityReqRes.model';
import {InvestingEntityTableService} from './investing-entity-table.service';

export class InvestingEntityDataSource extends DataSource<InvestingEntityReqRes> {

  private destroy$ = new Subject<void>();

  // Triggers for refreshing data
  private searchOptions$ = this.investingEntityTableService.searchOptions$;
  private refreshData$ = this.investingEntityTableService.refreshData$.pipe(tap(_ => this.investingEntityTableService.updatePageNumber(0)));

  private _totalRowsCount$ = new BehaviorSubject<number>(0);
  public totalRowsCount$ = this._totalRowsCount$.pipe(takeUntil(this.destroy$), distinctUntilChanged(), shareReplay(1));

  // For a loading indicator
  private _loading$ = new BehaviorSubject<boolean>(false);
  public isLoading$ = this._loading$.pipe(takeUntil(this.destroy$), shareReplay(1));

  public pageRows$ = merge(this.refreshData$, this.searchOptions$)
    .pipe(
      switchMapTo(this.searchOptions$),
      takeUntil(this.destroy$),
      filter(searchOptions => {
        return searchOptions !== null;
      }),
      tap(() => this._loading$.next(true)),
      switchMap(searchOptions => this.getServerInvestingEntities(searchOptions)),
      tap(response => {
        this._totalRowsCount$.next(response.totalCount);
      }),
      map(response => response.rows),
      tap(() => this._loading$.next(false)),
      shareReplay(1)
    );

  constructor(
    private gpInvestingEntityDataService: GpInvestingEntityDataService,
    private investingEntityTableService: InvestingEntityTableService) {
    super();
  }

  // This creates an inner observable, which if fails, it will not cause the subscription for the pageRows to complete.
  private getServerInvestingEntities(searchOptions: InvestingEntitySearchOptionsRequest) {
    return this.gpInvestingEntityDataService.getList(searchOptions)
      .pipe(catchError(error => {
        return of(new SearchOptionsResponse<InvestingEntityReqRes>());
      }));
  }

  connect(collectionViewer: CollectionViewer = null): Observable<InvestingEntityReqRes[]> {
    this._loading$.next(true);
    return this.pageRows$.pipe(
      tap(() => this._loading$.next(false)),
      catchError((error) => {
        this._loading$.next(false);
        return throwError(error);
      })
    );
  }

  disconnect(collectionViewer: CollectionViewer = null): void {
    this.destroy$.next();
    this.destroy$.complete();
  }

  getLoader() {
    return this._loading$;
  }
}
