import * as _ from 'lodash';
import { BehaviorSubject, Observable } from 'rxjs';
import { debounceTime, skip } from 'rxjs';
import { RequestBusItem } from 'app/core/models/request-bus-item';

export interface RequestBusConfig {
  maxConcurrency: number;
}

export class RequestBus<T> {

  private requests = new BehaviorSubject<RequestBusItem<T>[]>([]);

  constructor(
    private config: RequestBusConfig
  ) {

    this.requests.pipe(
      debounceTime(250)
    )
    .subscribe(() => {
      this.executeSubs();
    });

  }

  private executeSubs() {

    const requests = this.requests.value;
    const idle = _.filter(requests, r => !r.subscribed && !r.results);
    const outstanding = _.filter(requests, r => r.subscribed && !r.results);
    const toExecute = _.take(idle, this.config.maxConcurrency - outstanding.length);

    _.forEach(toExecute, (r: RequestBusItem<T>) => {
      r.execute().subscribe((() => {
        this.requests.next(this.requests.value);
      }));
    });

  }

  public request(id: string, value: Observable<T>): Observable<T> {

    const requests = this.requests.value;
    const existing = _.find(requests, (item: RequestBusItem<T>) => { return item.id === id });

    if (existing) {
      return existing.shared;
    }

    const rbItem = new RequestBusItem<T>(id, value);
    this.requests.next(requests.concat([rbItem]));
    return rbItem.shared.pipe(
      skip(1)
    );

  }

}
