import { Injectable, OnDestroy } from '@angular/core';
import { XpoLtlChangeFeed, XpoLtlChangeFeedService } from '@xpo-ltl/change-feed';
import { ConfigManagerService } from '@xpo-ltl/config-manager';
import { XpoLtlAuthenticationService } from '@xpo-ltl/ngx-auth';
import { Unsubscriber } from '@xpo-ltl/ngx-ltl';
import { XrtChangedDocument, XrtChangedDocuments } from '@xpo/ngx-xrt';
import { XrtFireMessagingService } from '@xpo/ngx-xrt-firebase';
import { ConfigManagerProperties } from 'core/enums';
import { concat as _concat, invoke as _invoke, isEqualWith as _isEqualWith, uniqWith as _uniqWith } from 'lodash';
import { BehaviorSubject, Observable, of, ReplaySubject, Subscription } from 'rxjs';
import { catchError, filter, map, skipWhile, switchMap, takeUntil } from 'rxjs/operators';

// Provide generic interfaces that cover all required common properties for these requests/responses
export interface RegisterFilterRqst {
  connectionToken: string;
  filter?: object; // this should always exist, but we don't know the type
}

export interface RegisterFilterResp {
  filterHash: string;
}

export interface UnregisterFilterRqst {
  connectionToken: string;
  filterHash: string;
}

export interface PndXrtChangedDocument extends XrtChangedDocument {
  // tslint:disable-next-line: no-any
  record?: any; // the changed document associated with the update from the change feed
}

const INITIAL_UNREGISTER = 'initial-unregister';

/**
 * Base class for managing registration for Xrt Document change notifications
 *
 * subscribe to changedDocuments$ to receive notications when documents are updated
 */
@Injectable()
export abstract class AutoRefreshBaseService<_SeachCriteriaType, _ChangeDocumenType> implements OnDestroy {
  static readonly beginAutoRefresh$ = new ReplaySubject<void>(1);

  protected unsubscriber = new Unsubscriber();
  private channel: BroadcastChannel;

  protected filterHash: string = INITIAL_UNREGISTER; // id of the currently registered filter criteria
  protected previousFilter: object;

  private changedDocumentsSubject = new BehaviorSubject<PndXrtChangedDocument[]>([]);
  readonly changedDocuments$ = this.changedDocumentsSubject.asObservable();

  private changeFeedSubscription: Subscription;
  private lastRegisteredTimestamp: Date;

  constructor(
    protected xpoLtlChangeFeedService: XpoLtlChangeFeedService,
    protected configManagerService: ConfigManagerService,
    protected xpoLtlAuthenticationService: XpoLtlAuthenticationService,
    protected xrtFireMessagingService: XrtFireMessagingService
  ) {}

  /**
   * Unregister any old filters and subscribe to auto refresh
   */
  onInit() {
    this.changeFeedSubscription?.unsubscribe();

    // Wait until we are allowed to begin auto-refresh, usually after log in occurs
    AutoRefreshBaseService.beginAutoRefresh$
      .pipe(
        switchMap(() => {
          return this.unregisterFilter();
        })
      )
      .subscribe(() => {
        this.subscribeToAutoRefresh();
      });
  }

  ngOnDestroy(): void {
    this.unsubscriber.complete();
    this.unregisterFilter();
    _invoke(this.channel, 'close');
  }

  /**
   * Clear the list of changed documents
   */
  clearChangedDocuments() {
    this.changedDocumentsSubject.next([]);
  }

  /**
   * Return a valid connection token
   */
  protected abstract getMessagingToken$(): Observable<string>;

  /**
   * Begin process of register a filter to start recieving notifications from the backend.
   */
  protected abstract subscribeToAutoRefresh();

  /**
   * Implementations of this build the correct RegisterFilterRqst to then pass along to
   * the `performRegisterRequest` function.
   *
   * Ex:
   *    protected registerFilter(criteria: UnassignedDeliveriesSearchCriteria): Observable<string> {
   *      return this.pndStore$.select(GlobalFilterStoreSelectors.globalFilterPlanDate).pipe(
   *        take(1),
   *        switchMap((planDate: Date) => this.unassignedDeliveriesCriteriaService.filterFromCriteria(criteria, planDate)),
   *        switchMap((filter) => {
   *          const request = new RegisterFilterUnassignedDeliveriesRqst();
   *          request.filter = filter;
   *          return this.performRegisterRequest(request);
   *        })
   *      );
   *    }
   */
  protected abstract registerFilter(criteria: _SeachCriteriaType): Observable<string>;

  /**
   * Implement this method with API call to register a filter
   */
  protected abstract executeRegisterFilter(request: RegisterFilterRqst): Observable<RegisterFilterResp>;

  /**
   * Must always implement this method to unregister filter
   * Ex:
   *   protected unregisterFilter(): Observable<void> {
   *     const request = new UnregisterFilterUnassignedDeliveriesRqst();
   *     return this.performUnregisterRequest(request);
   *   }
   */
  protected abstract unregisterFilter(): Observable<void>;

  /**
   * Implement this method with API call to unregister a filter
   */
  protected abstract executeUnregisterFilter(request: UnregisterFilterRqst): Observable<void>;

  /**
   * Listen for remote updates notifying us of document changes using the FireMessaging service
   * @note we should be using the new XpoLtlChangeFeedService when available
   */
  protected subscribeToFireMessagingAutoRefresh() {
    // listen for document change message from service
    this.xrtFireMessagingService
      .changedDocument$()
      .pipe(takeUntil(this.unsubscriber.done$))
      .subscribe((changedDocument: XrtChangedDocuments) => {
        this.updateChangedDocuments({
          filterHash: changedDocument.filterHash,
          documents: JSON.parse(changedDocument.documents.toString()),
        });
      });

    // listen for document change message from webworker
    this.channel = new BroadcastChannel('pnd-notification-broadcast-channel');
    this.channel.onmessage = (event: MessageEvent) => {
      this.updateChangedDocuments({
        filterHash: event.data.data.filterHash,
        documents: JSON.parse(event.data.data.changedDocumentsJson),
      });
    };
  }

  /**
   * Process updated document notification.  We need to keep a running list of all changed documents
   * until the user refreshes the data.
   *
   * NOTE: We don't get actual documents unless we have updated to the new change feed,
   * just notifications that documents have changed.
   */
  protected updateChangedDocuments(changedDocuments: XrtChangedDocuments) {
    if (this.filterHash === changedDocuments.filterHash) {
      const currentChanges = this.changedDocumentsSubject.value;
      const newChanges = changedDocuments.documents;

      const allChanges: PndXrtChangedDocument[] = _uniqWith(
        _concat(newChanges, currentChanges),
        (a: PndXrtChangedDocument, b: PndXrtChangedDocument) => a?.documentKey === b?.documentKey
      );

      this.changedDocumentsSubject.next(allChanges);
    }
  }

  /**
   * Register Filter to begin recieving updates for matching files that change
   * @param request the original registration request.
   * @param registerApiFn callback function that executes the request against the correct API
   * @return the registered filterHash value for the requested filter
   */
  protected performRegisterRequest(request: RegisterFilterRqst): Observable<string> {
    // Only register a new filter if there as not a previous one, or the new filter is
    // different from the previous
    if (
      !!this.filterHash &&
      _isEqualWith(request.filter, this.previousFilter, (a: object, b: object) => {
        return JSON.stringify(a) === JSON.stringify(b);
      })
    ) {
      return of(this.filterHash);
    } else {
      // unregister the existing filter and register a new filter, returning the filterHash
      return this.unregisterFilter().pipe(
        switchMap(() => this.getMessagingToken$()),
        switchMap((token: string) => {
          request.connectionToken = token;
          return this.executeRegisterFilter(request);
        }),
        catchError(() => {
          return of(undefined);
        }),
        map((response: RegisterFilterResp): string => {
          this.filterHash = response?.filterHash; // store the new filterHash
          this.previousFilter = request.filter; // store the filter criteria
          return this.filterHash;
        })
      );
    }
  }

  /**
   * Unregister filter to stop recieving changed document updates
   * @param request the original unregistration request.
   * @param unregisterApiFn callback function that executes the unrequest against the correct API
   */
  protected performUnregisterRequest(request: UnregisterFilterRqst): Observable<void> {
    this.changeFeedSubscription?.unsubscribe();
    this.previousFilter = undefined; // always clear out any previous filter

    if (!this.filterHash) {
      // there was no previous filter and we aren't doing our initial unregister, so don't do anything
      return of(undefined);
    } else {
      return this.xpoLtlAuthenticationService.isAuthorized().pipe(
        skipWhile((authorized) => !authorized),
        switchMap(() => this.getMessagingToken$()),
        switchMap((token: string) => {
          request.connectionToken = token;

          request.filterHash = this.filterHash !== INITIAL_UNREGISTER ? this.filterHash : undefined;
          if (request.filterHash) {
            return this.executeUnregisterFilter(request);
          } else {
            return of(undefined);
          }
        }),
        catchError(() => {
          return of(undefined);
        }),
        map(() => {
          // clear the filterHash now that it is unregistered
          this.filterHash = undefined;
        })
      );
    }
  }

  /**
   * Register to listen for changes from the given filter hash
   * @param component app component to listen to
   * @param filterHash filter hash to listen to for change updates
   */
  protected registerForChangeFeed(component: string, filterHash: string) {
    // stop listening to old change feed
    this.changeFeedSubscription?.unsubscribe();

    // if there is no filterHash, then we have no registered filters to listen
    if (!filterHash) {
      return;
    }

    let region = this.configManagerService.getSetting<string>(ConfigManagerProperties.region);
    if (this.configManagerService.getSetting<boolean>(ConfigManagerProperties.production)) {
      region = 'prod'; // Firestore sends messages with region prod instead of wave.
    }

    this.lastRegisteredTimestamp = new Date();

    this.changeFeedSubscription = this.xpoLtlChangeFeedService
      .listenChanges(component, region, filterHash)
      .pipe(
        filter((xpoLtlChangeFeed: XpoLtlChangeFeed) => {
          return xpoLtlChangeFeed?.timestamp?.toDate() > this.lastRegisteredTimestamp; // ignore changes from old registrations
        }),
        takeUntil(this.unsubscriber.done$)
      )
      .subscribe((xpoLtlChangeFeed: XpoLtlChangeFeed) => {
        if (xpoLtlChangeFeed?.changedDocumentsJson) {
          const changeFeedData: _ChangeDocumenType[] = JSON.parse(
            xpoLtlChangeFeed.changedDocumentsJson
          ) as _ChangeDocumenType[];
          this.updateFromChangeFeed(changeFeedData);
        }
      });
  }

  /**
   * Apply the passed changes to an existing trip
   * @param changeData changes to apply to a trip
   */
  protected abstract updateFromChangeFeed(changeData: _ChangeDocumenType[]): void;
}
