QueryRunner.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import { cloneDeep } from 'lodash';
  2. import { from, Observable, ReplaySubject, Unsubscribable } from 'rxjs';
  3. import { first } from 'rxjs/operators';
  4. import {
  5. CoreApp,
  6. DataQueryRequest,
  7. DataSourceApi,
  8. PanelData,
  9. rangeUtil,
  10. ScopedVars,
  11. QueryRunnerOptions,
  12. QueryRunner as QueryRunnerSrv,
  13. LoadingState,
  14. DataSourceRef,
  15. } from '@grafana/data';
  16. import { getTemplateSrv } from '@grafana/runtime';
  17. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  18. import { getNextRequestId } from './PanelQueryRunner';
  19. import { setStructureRevision } from './processing/revision';
  20. import { preProcessPanelData, runRequest } from './runRequest';
  21. export class QueryRunner implements QueryRunnerSrv {
  22. private subject: ReplaySubject<PanelData>;
  23. private subscription?: Unsubscribable;
  24. private lastResult?: PanelData;
  25. constructor() {
  26. this.subject = new ReplaySubject(1);
  27. }
  28. get(): Observable<PanelData> {
  29. return this.subject.asObservable();
  30. }
  31. run(options: QueryRunnerOptions): void {
  32. const {
  33. queries,
  34. timezone,
  35. datasource,
  36. panelId,
  37. app,
  38. dashboardId,
  39. timeRange,
  40. timeInfo,
  41. cacheTimeout,
  42. maxDataPoints,
  43. scopedVars,
  44. minInterval,
  45. } = options;
  46. if (this.subscription) {
  47. this.subscription.unsubscribe();
  48. }
  49. const request: DataQueryRequest = {
  50. app: app ?? CoreApp.Unknown,
  51. requestId: getNextRequestId(),
  52. timezone,
  53. panelId,
  54. dashboardId,
  55. range: timeRange,
  56. timeInfo,
  57. interval: '',
  58. intervalMs: 0,
  59. targets: cloneDeep(queries),
  60. maxDataPoints: maxDataPoints,
  61. scopedVars: scopedVars || {},
  62. cacheTimeout,
  63. startTime: Date.now(),
  64. };
  65. // Add deprecated property
  66. request.rangeRaw = timeRange.raw;
  67. from(getDataSource(datasource, request.scopedVars))
  68. .pipe(first())
  69. .subscribe({
  70. next: (ds) => {
  71. // Attach the datasource name to each query
  72. request.targets = request.targets.map((query) => {
  73. if (!query.datasource) {
  74. query.datasource = ds.getRef();
  75. }
  76. return query;
  77. });
  78. const lowerIntervalLimit = minInterval
  79. ? getTemplateSrv().replace(minInterval, request.scopedVars)
  80. : ds.interval;
  81. const norm = rangeUtil.calculateInterval(timeRange, maxDataPoints, lowerIntervalLimit);
  82. // make shallow copy of scoped vars,
  83. // and add built in variables interval and interval_ms
  84. request.scopedVars = Object.assign({}, request.scopedVars, {
  85. __interval: { text: norm.interval, value: norm.interval },
  86. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  87. });
  88. request.interval = norm.interval;
  89. request.intervalMs = norm.intervalMs;
  90. this.subscription = runRequest(ds, request).subscribe({
  91. next: (data) => {
  92. const results = preProcessPanelData(data, this.lastResult);
  93. this.lastResult = setStructureRevision(results, this.lastResult);
  94. // Store preprocessed query results for applying overrides later on in the pipeline
  95. this.subject.next(this.lastResult);
  96. },
  97. });
  98. },
  99. error: (error) => console.error('PanelQueryRunner Error', error),
  100. });
  101. }
  102. cancel(): void {
  103. if (!this.subscription) {
  104. return;
  105. }
  106. this.subscription.unsubscribe();
  107. // If we have an old result with loading state, send it with done state
  108. if (this.lastResult && this.lastResult.state === LoadingState.Loading) {
  109. this.subject.next({
  110. ...this.lastResult,
  111. state: LoadingState.Done,
  112. });
  113. }
  114. }
  115. destroy(): void {
  116. // Tell anyone listening that we are done
  117. if (this.subject) {
  118. this.subject.complete();
  119. }
  120. if (this.subscription) {
  121. this.subscription.unsubscribe();
  122. }
  123. }
  124. }
  125. async function getDataSource(
  126. datasource: DataSourceRef | DataSourceApi | null,
  127. scopedVars: ScopedVars
  128. ): Promise<DataSourceApi> {
  129. if (datasource && 'query' in datasource) {
  130. return datasource;
  131. }
  132. return getDatasourceSrv().get(datasource, scopedVars);
  133. }