import { DataSource } from '@angular/cdk/table';
import { CollectionViewer } from '@angular/cdk/collections';
import { BehaviorSubject, Observable, throwError, merge, of, Subject } from 'rxjs';
import { map, catchError, tap, distinctUntilChanged, shareReplay, switchMapTo, filter, switchMap, takeUntil } from 'rxjs/operators';

import { ContactDataService } from 'src/app/services/gp/contact-data.service';
import { InvestorContactReqRes } from '../../models/investorContactReqRes.model';
import { InvestorContactsTableService } from './investor-contacts-table.service';
import { SearchOptionsResponse } from 'src/app/shared/models/SearchOptionsResponse.model';
import { ContactSearchOptionsRequest } from '../../../../shared/models/contactSearchOptionsRequest.model';

export class InvestorContactDataSource extends DataSource<InvestorContactReqRes> {

  private destroy$ = new Subject<void>();

  // Triggers for refreshing data
  private searchOptions$ = this.contactsTableService.searchOptions$;
  private refreshData$ = this.contactsTableService.refreshData$.pipe(tap(_ => this.contactsTableService.updatePageNumber(0)));

  private _totalRowsCount$ = new BehaviorSubject<number>(0);
  public totalRowsCount$ = this._totalRowsCount$.pipe(takeUntil(this.destroy$), distinctUntilChanged(), shareReplay(1));

  // For a loading indicator
  private _loading$ = new BehaviorSubject<boolean>(false);
  public isLoading$ = this._loading$.pipe(takeUntil(this.destroy$), shareReplay(1));

  public pageRows$ = merge(this.refreshData$, this.searchOptions$)
    .pipe(
      switchMapTo(this.searchOptions$),
      takeUntil(this.destroy$),
      filter(searchOptions => {
        return searchOptions !== null;
      }),
      tap(() => this._loading$.next(true)),
      switchMap(searchOptions => this.getServerContacts(searchOptions)),
      tap(response => {
        this._totalRowsCount$.next(response.totalCount);
      }),
      map(response => response.rows),
      tap(() => this._loading$.next(false)),
      shareReplay(1)
    );

  constructor(private contactDataService: ContactDataService, private contactsTableService: InvestorContactsTableService) {
    super();
  }

  // This creates an inner observable, which if fails, it will not cause the subscription for the pageRows to complete.
  private getServerContacts(searchOptions: ContactSearchOptionsRequest) {
    return this.contactDataService.getList(searchOptions)
      .pipe(catchError(error => {
        return of(new SearchOptionsResponse<InvestorContactReqRes>());
      }));
  }

  connect(collectionViewer: CollectionViewer = null): Observable<InvestorContactReqRes[]> {
    this._loading$.next(true);
    return this.pageRows$.pipe(
      tap(() => this._loading$.next(false)),
      catchError((error) => {
        this._loading$.next(false);
        return throwError(error);
      })
    );
  }

  disconnect(collectionViewer: CollectionViewer = null): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}
