import { BlockingQueue } from 'rxjs-blocking-queue';

import { Observable, of, SubscriptionLike } from 'rxjs';
import { mergeMap, tap, take, reduce } from 'rxjs/operators';

export interface IBlockingArrayRunner {
  jobs: Array<any>;
  jobRunner: Function;
  progressNotifier?: Function;
  resultReceiver?: Function;
  finalLength?: number;
}

export const blockingArrayRunner$ = (props: IBlockingArrayRunner) => {
  // prettier-ignore
  var {  jobs, jobRunner, progressNotifier, resultReceiver, finalLength } = props;

  // console.log('blockingArrayRunner$ - setting queue');
  var bQ: BlockingQueue<any> = new BlockingQueue();
  // indicates initial queue length
  var arrLen,
    // counter for potential later queue elements to run
    curCount = 0,
    // flag for initial queue push
    bInitQ = true,
    // subscription for extra safety
    // eslint-disable-next-line @typescript-eslint/no-unused-vars
    bqSub: SubscriptionLike,
    resultReceivers: Array<Function> = [];

  /**
   * runs next job, verifies if the queue is complete and notifies about progress
   */
  const queueNext = () => {
    // console.log('blockingArrayRunner$ curCount: ' + curCount + ' arrLen: ' + arrLen);
    curCount++;
    if (curCount === arrLen) {
      bQ.complete();
    } else {
      bQ.next();
    }
    if (typeof progressNotifier === 'function') {
      var pct = Math.round((curCount / arrLen) * 100);
      progressNotifier(pct);
    }
  };

  /**
   * pushes result receivers for each job in sequence
   * @param receiver
   */
  const setResultReceiver = (receiver: Function) => {
    if (typeof receiver === 'function') {
      resultReceivers.push(receiver);
    }
  };

  /**
   * allows early completion, e.g. if errors occurred
   */
  const completer = () => {
    // console.log('blockingArrayRunner$ completer ');
    bQ.complete();
    if (typeof progressNotifier === 'function') {
      var pct = 100;
      progressNotifier(pct);
    }
  };

  // will collect all results, if we don't use notifier instead
  const queueReducer$ = (obs: Observable<any>) => {
    return obs.pipe(
      reduce(
        (acc, val) => {
          // @ts-ignore
          var { data, messages } = val;
          if (Array.isArray(data)) {
            acc.data = [...acc.data, ...data];
          } else {
            if (data) {
              acc.data.push(data);
            }
          }
          if (Array.isArray(messages)) {
            acc.data = [...acc.messages, ...messages];
          } else {
            if (messages) {
              acc.messages.push(messages);
            }
          }
          return acc;
        },
        {
          data: [] as any,
          messages: [] as any
        }
      ),
      take(1)
      // tap((el) => {
      //   console.log(JSON.stringify(el));
      //   if (typeof resultReceiver === 'function') {
      //     resultReceiver(el);
      //   }
      // })
    );
  };

  // run all jobs sequentially
  const queueRunner$ = () => {
    return (bQ.element.pipe(
      // @ts-ignore
      tap((el) => {
        // console.log(JSON.stringify(el));
      }),
      mergeMap((dArr) => {
        return jobRunner(dArr);
      }),
      tap((el) => {
        // console.log(' queueNext ' + JSON.stringify(el));
        // allow next run as we have the current one completed
        queueNext();
      }),
      mergeMap((res) => {
        var myReceiver;
        if (resultReceivers.length > 0) {
          myReceiver = resultReceivers.pop();
          // if notifier is defined, send result to it
          if (typeof myReceiver === 'function') {
            // console.log('sending result to notifier ' + JSON.stringify(res));
            myReceiver(res);
            return of({ data: [], messages: [] });
          }
        }
        // console.log('running reducer ' + JSON.stringify(res));
        return queueReducer$(of(res));
      })
    ) as unknown) as Observable<any>;
  };

  /**
   * pushes new job and it's result receiver
   * @param runArr
   * @param receiver
   */
  const queuePusher = (runArr: Array<any>, receiver: Function) => {
    // console.log(
    //   'blocking arrayRunner - queuePusher: ' + JSON.stringify(runArr)
    // );
    if (Array.isArray(runArr)) {
      if (bInitQ) {
        bInitQ = false;
      } else {
        if (!finalLength || finalLength < arrLen) {
          console.error(
            'queuePusher - finalLength not set correctly - exiting'
          );
          return;
        }
      }
      runArr.forEach((el) => {
        bQ.push(el);
      });
      if (runArr.length > 0) {
        setResultReceiver(receiver);
      }
    } else {
      bQ.push(runArr);
      setResultReceiver(receiver);
    }
  };

  if (Array.isArray(jobs)) {
    arrLen = jobs.length;
  } else {
    if (jobs) {
      arrLen = 1;
    } else {
      return {
        runner: of({
          data: [],
          messages: ['invalid number of Arrayrunner elements ']
        }),
        pusher: () => {}
      };
    }
  }
  // finalLength will be set if more jobs follow later
  if (finalLength && finalLength > arrLen) {
    arrLen = finalLength;
  }
  if (typeof resultReceiver === 'function') {
    // ensure we have at least one subscription
    bqSub = queueRunner$().subscribe((res) => {
      // console.log('bqSub :' + JSON.stringify(res));
    });
  }
  // push jobs if we have them already
  queuePusher(jobs, resultReceiver as Function);

  // return runner and pusher for new jobs as well as completer for errors
  return { runner: queueRunner$(), pusher: queuePusher, completer };
};
