ResponseQueue.ts 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import { Observable, Subject } from 'rxjs';
  2. import { filter } from 'rxjs/operators';
  3. import { BackendSrvRequest, FetchResponse } from '@grafana/runtime';
  4. import { FetchQueue } from './FetchQueue';
  5. interface FetchWorkEntry {
  6. id: string;
  7. options: BackendSrvRequest;
  8. }
  9. interface FetchResponsesEntry<T> {
  10. id: string;
  11. observable: Observable<FetchResponse<T>>;
  12. }
  13. export class ResponseQueue {
  14. private queue: Subject<FetchWorkEntry> = new Subject<FetchWorkEntry>(); // internal stream for requests that are to be executed
  15. private responses: Subject<FetchResponsesEntry<any>> = new Subject<FetchResponsesEntry<any>>(); // external stream with responses from fetch
  16. constructor(fetchQueue: FetchQueue, fetch: <T>(options: BackendSrvRequest) => Observable<FetchResponse<T>>) {
  17. // This will create an implicit live subscription for as long as this class lives.
  18. // But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives
  19. // I think this ok. We could add some disposable pattern later if the need arises.
  20. this.queue.subscribe((entry) => {
  21. const { id, options } = entry;
  22. // Let the fetchQueue know that this id has started data fetching.
  23. fetchQueue.setInProgress(id);
  24. this.responses.next({ id, observable: fetch(options) });
  25. });
  26. }
  27. add = (id: string, options: BackendSrvRequest): void => {
  28. this.queue.next({ id, options });
  29. };
  30. getResponses = <T>(id: string): Observable<FetchResponsesEntry<T>> =>
  31. this.responses.asObservable().pipe(filter((entry) => entry.id === id));
  32. }