FetchQueue.ts 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import { Observable, Subject } from 'rxjs';
  2. import { BackendSrvRequest } from '@grafana/runtime';
  3. export interface QueueState extends Record<string, { state: FetchStatus; options: BackendSrvRequest }> {}
  4. export enum FetchStatus {
  5. Pending,
  6. InProgress,
  7. Done,
  8. }
  9. export interface FetchQueueUpdate {
  10. noOfInProgress: number;
  11. noOfPending: number;
  12. state: QueueState;
  13. }
  14. interface QueueStateEntry {
  15. id: string;
  16. options?: BackendSrvRequest;
  17. state: FetchStatus;
  18. }
  19. export class FetchQueue {
  20. private state: QueueState = {}; // internal queue state
  21. private queue: Subject<QueueStateEntry> = new Subject<QueueStateEntry>(); // internal stream for requests that are to be queued
  22. private updates: Subject<FetchQueueUpdate> = new Subject<FetchQueueUpdate>(); // external stream with updates to the queue state
  23. constructor(debug = false) {
  24. // This will create an implicit live subscription for as long as this class lives.
  25. // But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives
  26. // I think this ok. We could add some disposable pattern later if the need arises.
  27. this.queue.subscribe((entry) => {
  28. const { id, state, options } = entry;
  29. if (!this.state[id]) {
  30. this.state[id] = { state: FetchStatus.Pending, options: {} as BackendSrvRequest };
  31. }
  32. if (state === FetchStatus.Done) {
  33. delete this.state[id];
  34. const update = this.getUpdate(this.state);
  35. this.publishUpdate(update, debug);
  36. return;
  37. }
  38. this.state[id].state = state;
  39. if (options) {
  40. this.state[id].options = options;
  41. }
  42. const update = this.getUpdate(this.state);
  43. this.publishUpdate(update, debug);
  44. });
  45. }
  46. add = (id: string, options: BackendSrvRequest): void => this.queue.next({ id, options, state: FetchStatus.Pending });
  47. setInProgress = (id: string): void => this.queue.next({ id, state: FetchStatus.InProgress });
  48. setDone = (id: string): void => this.queue.next({ id, state: FetchStatus.Done });
  49. getUpdates = (): Observable<FetchQueueUpdate> => this.updates.asObservable();
  50. private getUpdate = (state: QueueState): FetchQueueUpdate => {
  51. const noOfInProgress = Object.keys(state).filter((key) => state[key].state === FetchStatus.InProgress).length;
  52. const noOfPending = Object.keys(state).filter((key) => state[key].state === FetchStatus.Pending).length;
  53. return { noOfPending, noOfInProgress, state };
  54. };
  55. private publishUpdate = (update: FetchQueueUpdate, debug: boolean): void => {
  56. this.printState(update, debug);
  57. this.updates.next(update);
  58. };
  59. private printState = (update: FetchQueueUpdate, debug: boolean): void => {
  60. if (!debug) {
  61. return;
  62. }
  63. const entriesWithoutOptions = Object.keys(update.state).reduce((all, key) => {
  64. const entry = { id: key, state: update.state[key].state };
  65. all.push(entry);
  66. return all;
  67. }, [] as Array<{ id: string; state: FetchStatus }>);
  68. console.log('FetchQueue noOfStarted', update.noOfInProgress);
  69. console.log('FetchQueue noOfNotStarted', update.noOfPending);
  70. console.log('FetchQueue state', entriesWithoutOptions);
  71. };
  72. }