VariableQueryRunner.ts 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. import { merge, Observable, of, Subject, throwError, Unsubscribable } from 'rxjs';
  2. import { catchError, filter, finalize, mergeMap, take, takeUntil } from 'rxjs/operators';
  3. import { v4 as uuidv4 } from 'uuid';
  4. import {
  5. CoreApp,
  6. DataQuery,
  7. DataQueryRequest,
  8. DataSourceApi,
  9. getDefaultTimeRange,
  10. LoadingState,
  11. PanelData,
  12. ScopedVars,
  13. } from '@grafana/data';
  14. import { dispatch, getState } from '../../../store/store';
  15. import { StoreState, ThunkDispatch } from '../../../types';
  16. import { getTimeSrv } from '../../dashboard/services/TimeSrv';
  17. import { runRequest } from '../../query/state/runRequest';
  18. import { getLastKey, getVariable } from '../state/selectors';
  19. import { KeyedVariableIdentifier } from '../state/types';
  20. import { QueryVariableModel, VariableRefresh } from '../types';
  21. import { getTemplatedRegex } from '../utils';
  22. import { toMetricFindValues, updateOptionsState, validateVariableSelection } from './operators';
  23. import { QueryRunners } from './queryRunners';
  24. interface UpdateOptionsArgs {
  25. identifier: KeyedVariableIdentifier;
  26. datasource: DataSourceApi;
  27. searchFilter?: string;
  28. }
  29. export interface UpdateOptionsResults {
  30. state: LoadingState;
  31. identifier: KeyedVariableIdentifier;
  32. error?: any;
  33. cancelled?: boolean;
  34. }
  35. interface VariableQueryRunnerArgs {
  36. dispatch: ThunkDispatch;
  37. getState: () => StoreState;
  38. getVariable: typeof getVariable;
  39. getTemplatedRegex: typeof getTemplatedRegex;
  40. getTimeSrv: typeof getTimeSrv;
  41. queryRunners: QueryRunners;
  42. runRequest: typeof runRequest;
  43. }
  44. export class VariableQueryRunner {
  45. private readonly updateOptionsRequests: Subject<UpdateOptionsArgs>;
  46. private readonly updateOptionsResults: Subject<UpdateOptionsResults>;
  47. private readonly cancelRequests: Subject<{ identifier: KeyedVariableIdentifier }>;
  48. private readonly subscription: Unsubscribable;
  49. constructor(
  50. private dependencies: VariableQueryRunnerArgs = {
  51. dispatch,
  52. getState,
  53. getVariable,
  54. getTemplatedRegex,
  55. getTimeSrv,
  56. queryRunners: new QueryRunners(),
  57. runRequest,
  58. }
  59. ) {
  60. this.updateOptionsRequests = new Subject<UpdateOptionsArgs>();
  61. this.updateOptionsResults = new Subject<UpdateOptionsResults>();
  62. this.cancelRequests = new Subject<{ identifier: KeyedVariableIdentifier }>();
  63. this.onNewRequest = this.onNewRequest.bind(this);
  64. this.subscription = this.updateOptionsRequests.subscribe(this.onNewRequest);
  65. }
  66. queueRequest(args: UpdateOptionsArgs): void {
  67. this.updateOptionsRequests.next(args);
  68. }
  69. getResponse(identifier: KeyedVariableIdentifier): Observable<UpdateOptionsResults> {
  70. return this.updateOptionsResults.asObservable().pipe(filter((result) => result.identifier === identifier));
  71. }
  72. cancelRequest(identifier: KeyedVariableIdentifier): void {
  73. this.cancelRequests.next({ identifier });
  74. }
  75. destroy(): void {
  76. this.subscription.unsubscribe();
  77. }
  78. private onNewRequest(args: UpdateOptionsArgs): void {
  79. const { datasource, identifier, searchFilter } = args;
  80. try {
  81. const {
  82. dispatch,
  83. runRequest,
  84. getTemplatedRegex: getTemplatedRegexFunc,
  85. getVariable,
  86. queryRunners,
  87. getTimeSrv,
  88. getState,
  89. } = this.dependencies;
  90. const beforeKey = getLastKey(getState());
  91. this.updateOptionsResults.next({ identifier, state: LoadingState.Loading });
  92. const variable = getVariable<QueryVariableModel>(identifier, getState());
  93. const timeSrv = getTimeSrv();
  94. const runnerArgs = { variable, datasource, searchFilter, timeSrv, runRequest };
  95. const runner = queryRunners.getRunnerForDatasource(datasource);
  96. const target = runner.getTarget({ datasource, variable });
  97. const request = this.getRequest(variable, args, target);
  98. runner
  99. .runRequest(runnerArgs, request)
  100. .pipe(
  101. filter(() => {
  102. // Lets check if we started another batch during the execution of the observable. If so we just want to abort the rest.
  103. const afterKey = getLastKey(getState());
  104. return beforeKey === afterKey;
  105. }),
  106. filter((data) => data.state === LoadingState.Done || data.state === LoadingState.Error), // we only care about done or error for now
  107. take(1), // take the first result, using first caused a bug where it in some situations throw an uncaught error because of no results had been received yet
  108. mergeMap((data: PanelData) => {
  109. if (data.state === LoadingState.Error) {
  110. return throwError(() => data.error);
  111. }
  112. return of(data);
  113. }),
  114. toMetricFindValues(),
  115. updateOptionsState({ variable, dispatch, getTemplatedRegexFunc }),
  116. validateVariableSelection({ variable, dispatch, searchFilter }),
  117. takeUntil(
  118. merge(this.updateOptionsRequests, this.cancelRequests).pipe(
  119. filter((args) => {
  120. let cancelRequest = false;
  121. if (args.identifier.id === identifier.id) {
  122. cancelRequest = true;
  123. this.updateOptionsResults.next({ identifier, state: LoadingState.Loading, cancelled: cancelRequest });
  124. }
  125. return cancelRequest;
  126. })
  127. )
  128. ),
  129. catchError((error) => {
  130. if (error.cancelled) {
  131. return of({});
  132. }
  133. this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error });
  134. return throwError(() => error);
  135. }),
  136. finalize(() => {
  137. this.updateOptionsResults.next({ identifier, state: LoadingState.Done });
  138. })
  139. )
  140. .subscribe();
  141. } catch (error) {
  142. this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error });
  143. }
  144. }
  145. private getRequest(variable: QueryVariableModel, args: UpdateOptionsArgs, target: DataQuery) {
  146. const { searchFilter } = args;
  147. const variableAsVars = { variable: { text: variable.current.text, value: variable.current.value } };
  148. const searchFilterScope = { searchFilter: { text: searchFilter, value: searchFilter } };
  149. const searchFilterAsVars = searchFilter ? searchFilterScope : {};
  150. const scopedVars = { ...searchFilterAsVars, ...variableAsVars } as ScopedVars;
  151. const range =
  152. variable.refresh === VariableRefresh.onTimeRangeChanged
  153. ? this.dependencies.getTimeSrv().timeRange()
  154. : getDefaultTimeRange();
  155. const request: DataQueryRequest = {
  156. app: CoreApp.Dashboard,
  157. requestId: uuidv4(),
  158. timezone: '',
  159. range,
  160. interval: '',
  161. intervalMs: 0,
  162. targets: [target],
  163. scopedVars,
  164. startTime: Date.now(),
  165. };
  166. return request;
  167. }
  168. }
  169. let singleton: VariableQueryRunner;
  170. export function setVariableQueryRunner(runner: VariableQueryRunner): void {
  171. singleton = runner;
  172. }
  173. export function getVariableQueryRunner(): VariableQueryRunner {
  174. return singleton;
  175. }