AnnotationsWorker.ts 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import { cloneDeep } from 'lodash';
  2. import { from, merge, Observable, of } from 'rxjs';
  3. import { catchError, filter, finalize, map, mergeAll, mergeMap, reduce, takeUntil } from 'rxjs/operators';
  4. import { AnnotationQuery, DataSourceApi } from '@grafana/data';
  5. import { getDataSourceSrv } from '@grafana/runtime';
  6. import { AnnotationQueryFinished, AnnotationQueryStarted } from '../../../../types/events';
  7. import { AnnotationsQueryRunner } from './AnnotationsQueryRunner';
  8. import { getDashboardQueryRunner } from './DashboardQueryRunner';
  9. import { LegacyAnnotationQueryRunner } from './LegacyAnnotationQueryRunner';
  10. import {
  11. AnnotationQueryRunner,
  12. DashboardQueryRunnerOptions,
  13. DashboardQueryRunnerWorker,
  14. DashboardQueryRunnerWorkerResult,
  15. } from './types';
  16. import { emptyResult, handleDatasourceSrvError, translateQueryResult } from './utils';
  17. export class AnnotationsWorker implements DashboardQueryRunnerWorker {
  18. constructor(
  19. private readonly runners: AnnotationQueryRunner[] = [
  20. new LegacyAnnotationQueryRunner(),
  21. new AnnotationsQueryRunner(),
  22. ]
  23. ) {}
  24. canWork({ dashboard }: DashboardQueryRunnerOptions): boolean {
  25. const annotations = dashboard.annotations.list.find(AnnotationsWorker.getAnnotationsToProcessFilter);
  26. return Boolean(annotations);
  27. }
  28. work(options: DashboardQueryRunnerOptions): Observable<DashboardQueryRunnerWorkerResult> {
  29. if (!this.canWork(options)) {
  30. return emptyResult();
  31. }
  32. const { dashboard, range } = options;
  33. const annotations = dashboard.annotations.list.filter(AnnotationsWorker.getAnnotationsToProcessFilter);
  34. const observables = annotations.map((annotation) => {
  35. const datasourceObservable = from(getDataSourceSrv().get(annotation.datasource)).pipe(
  36. catchError(handleDatasourceSrvError) // because of the reduce all observables need to be completed, so an erroneous observable wont do
  37. );
  38. return datasourceObservable.pipe(
  39. mergeMap((datasource?: DataSourceApi) => {
  40. const runner = this.runners.find((r) => r.canRun(datasource));
  41. if (!runner) {
  42. return of([]);
  43. }
  44. dashboard.events.publish(new AnnotationQueryStarted(annotation));
  45. return runner.run({ annotation, datasource, dashboard, range }).pipe(
  46. takeUntil(
  47. getDashboardQueryRunner()
  48. .cancellations()
  49. .pipe(filter((a) => a === annotation))
  50. ),
  51. map((results) => {
  52. // store response in annotation object if this is a snapshot call
  53. if (dashboard.snapshot) {
  54. annotation.snapshotData = cloneDeep(results);
  55. }
  56. // translate result
  57. return translateQueryResult(annotation, results);
  58. }),
  59. finalize(() => {
  60. dashboard.events.publish(new AnnotationQueryFinished(annotation));
  61. })
  62. );
  63. })
  64. );
  65. });
  66. return merge(observables).pipe(
  67. mergeAll(),
  68. reduce((acc, value) => {
  69. // should we use scan or reduce here
  70. // reduce will only emit when all observables are completed
  71. // scan will emit when any observable is completed
  72. // choosing reduce to minimize re-renders
  73. return acc.concat(value);
  74. }),
  75. map((annotations) => {
  76. return { annotations, alertStates: [] };
  77. })
  78. );
  79. }
  80. private static getAnnotationsToProcessFilter(annotation: AnnotationQuery): boolean {
  81. return annotation.enable && !Boolean(annotation.snapshotData);
  82. }
  83. }