import { chunk } from '@gonfalon/collections';
import { of, Subject } from 'rxjs';
import { bufferTime, filter, mergeMap, share, switchMap } from 'rxjs/operators';

import { createDeferred } from './createDeferred';

export interface TaskRunnerConfiguration<Task, Data, BatchedData> {
  name?: string;

  /**
   * Run a batch of tasks
   * @param {Task[]} inputs
   * @returns {Promise<BatchedData>} a promise that resolves with the batched results
   */
  runner: (inputs: Task[]) => Promise<BatchedData>;
  /**
   * Read the result of a single task from the batched task results
   * @param {BatchedData} data
   * @param {Task} input
   * @returns {Data} the data corresponding to `input`
   */
  resolver: (data: BatchedData, input: Task) => Data;
  /**
   * How long to buffer tasks before running them
   */
  bufferDuration?: number;
  /**
   * Batcher configuration
   */
  batching?: (
    | {
        /**
         * Number of tasks per batch
         *
         * If a function is provided, it will be called with the inputs and should return an array of batches.
         */
        maxBatchSize: number;
        batcher?: undefined;
      }
    | {
        /**
         * Split the inputs into batches
         */
        batcher: (inputs: Task[]) => Task[][];
        maxBatchSize?: undefined;
      }
  ) & {
    /**
     * Merge the results of multiple batched tasks into a single batched result
     * @param {BatchedData[]} data array of batched results
     * @returns {BatchedData} single batched result for all tasks
     */
    resultMerger: (data: BatchedData[]) => BatchedData;
  };
}

export type TaskRunner<Task, Data, BatchedData> = ReturnType<typeof createTaskRunner<Task, Data, BatchedData>>;

/**
 * Batch tasks and run them, ensuring that each task resolves with its own data.
 * @param {Object} options batching configuration
 * @param {Function} options runner configuration
 */
export function createTaskRunner<Task, Data, BatchedData>({
  name = 'task-runner',
  runner,
  resolver,
  batching,
  bufferDuration = 1, // 1 tick
}: TaskRunnerConfiguration<Task, Data, BatchedData>) {
  const subject = new Subject<Task>();

  const worker = subject.pipe(
    bufferTime(bufferDuration),
    filter((inputs) => inputs.length > 0),

    switchMap((inputs) => {
      if (batching) {
        const { maxBatchSize, batcher, resultMerger } = batching;
        const batches = maxBatchSize !== undefined ? chunk(inputs, maxBatchSize) : batcher(inputs);
        return of(batches).pipe(mergeMap((batch) => of(Promise.all(batch.map(runner)).then((v) => resultMerger(v)))));
      }

      return of(runner(inputs));
    }),
    share(),
  );

  async function run(input: Task) {
    const task = createDeferred<BatchedData>();
    const subscription = worker.subscribe((v) => task.resolve(v));
    subject.next(input);
    return task.promise.then((data) => resolver(data, input)).finally(() => subscription.unsubscribe());
  }

  return {
    name,
    run,
  };
}
