import { Observable, Subject } from 'rxjs'; import { filter } from 'rxjs/operators'; import { BackendSrvRequest, FetchResponse } from '@grafana/runtime'; import { FetchQueue } from './FetchQueue'; interface FetchWorkEntry { id: string; options: BackendSrvRequest; } interface FetchResponsesEntry { id: string; observable: Observable>; } export class ResponseQueue { private queue: Subject = new Subject(); // internal stream for requests that are to be executed private responses: Subject> = new Subject>(); // external stream with responses from fetch constructor(fetchQueue: FetchQueue, fetch: (options: BackendSrvRequest) => Observable>) { // This will create an implicit live subscription for as long as this class lives. // But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives // I think this ok. We could add some disposable pattern later if the need arises. this.queue.subscribe((entry) => { const { id, options } = entry; // Let the fetchQueue know that this id has started data fetching. fetchQueue.setInProgress(id); this.responses.next({ id, observable: fetch(options) }); }); } add = (id: string, options: BackendSrvRequest): void => { this.queue.next({ id, options }); }; getResponses = (id: string): Observable> => this.responses.asObservable().pipe(filter((entry) => entry.id === id)); }