import { MonoTypeOperatorFunction, Operator, Subscriber, TeardownLogic, Observable, Subject } from 'rxjs';

import { switchMap } from 'rxjs/operators';

export type delayCondition<T> = (value: T) => boolean;

export function delayWhen<T>(predicate: delayCondition<T>, delayInterval: number) {
  return (source: Observable<T>): Observable<T> => {

    return source.pipe(switchMap((x) => {

      const subject = new Subject<T>();

      const interval = predicate(x) ? delayInterval : 0;

      setTimeout(() => {
        if (!subject.closed) {
          subject.next(x);
        }
      }, interval);

      return subject;
    }));

  };
}

export function fromPromise<T, P>(promiseFactory: (x: T) => Promise<P>) {
  return (source: Observable<T>): Observable<P> => {

    return source.pipe(switchMap((x) => {

      const promise = promiseFactory(x);

      const subject = new Subject<P>();

      promise.then((value) => {
        if (!subject.closed) {
          subject.next(value);

        }
      }, (error) => {
        subject.error(error);
      });

      return subject;
    }));
  };
}

export type predicateType<T> = (value: T, index: number) => boolean;

class TakeWhileInclusiveSubscriber<T> extends Subscriber<T> {
  private index = 0;

  private readonly predicate: predicateType<T>;

  protected readonly destination: Subscriber<T>;

  constructor(
    destination: Subscriber<T>,
    predicate: predicateType<T>,
  ) {
    super(destination);
    this.destination = destination;
    this.predicate = predicate;
  }

  protected _next(value: T): void {
    const destination = this.destination;
    let result: boolean;
    try {
      result = this.predicate(value, this.index++);
    } catch (err) {
      destination.error(err);
      return;
    }

    destination.next(value);

    if (!result) {
      destination.complete();
    }
  }
}

// tslint:disable-next-line:max-classes-per-file
class TakeWhileInclusiveOperator<T> implements Operator<T, T> {

  private readonly predicate: predicateType<T>;

  constructor(predicate: predicateType<T>) {
    this.predicate = predicate;
  }

  call(subscriber: Subscriber<T>, source: any): TeardownLogic {
    return source.subscribe(new TakeWhileInclusiveSubscriber(subscriber, this.predicate));
  }
}

/**
 * Emits values emitted by the source Observable so long as each value satisfies
 * the given `predicate`, and then completes after the last emitted value
 * satisfies the `predicate`.
 *
 * `takeWhileInclusive` subscribes and begins mirroring the source Observable.
 * Each value emitted on the source is emitted then given to the `predicate`
 * function which returns a boolean, representing a condition to be satisfied by
 * the source values. The output Observable emits the source values until such
 * time as the `predicate` returns false, at which point `takeWhileInclusive`
 * stops mirroring the source Observable and completes the output Observable.
 *
 * @param {function(value: T, index: number): boolean} predicate A function that
 * evaluates a value emitted by the source Observable and returns a boolean.
 * Also takes the (zero-based) index as the second argument.
 * @return {Observable<T>} An Observable that emits the values from the source
 * Observable and completes after emitting a value that satisfies the condition
 * defined by the `predicate`.
 * @method takeWhileInclusive
 * @owner Observable
 */
export function takeWhileInclusive<T>(predicate: predicateType<T>): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => source.lift(new TakeWhileInclusiveOperator(predicate));
}
