AlertingQueryRunner.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import { Observable, of, OperatorFunction, ReplaySubject, Unsubscribable } from 'rxjs';
  2. import { catchError, map, share } from 'rxjs/operators';
  3. import { v4 as uuidv4 } from 'uuid';
  4. import {
  5. dataFrameFromJSON,
  6. DataFrameJSON,
  7. getDefaultTimeRange,
  8. LoadingState,
  9. PanelData,
  10. rangeUtil,
  11. TimeRange,
  12. withLoadingIndicator,
  13. } from '@grafana/data';
  14. import { FetchResponse, getDataSourceSrv, toDataQueryError } from '@grafana/runtime';
  15. import { BackendSrv, getBackendSrv } from 'app/core/services/backend_srv';
  16. import { isExpressionQuery } from 'app/features/expressions/guards';
  17. import { cancelNetworkRequestsOnUnsubscribe } from 'app/features/query/state/processing/canceler';
  18. import { setStructureRevision } from 'app/features/query/state/processing/revision';
  19. import { preProcessPanelData } from 'app/features/query/state/runRequest';
  20. import { AlertQuery } from 'app/types/unified-alerting-dto';
  21. import { getTimeRangeForExpression } from '../utils/timeRange';
  22. export interface AlertingQueryResult {
  23. frames: DataFrameJSON[];
  24. }
  25. export interface AlertingQueryResponse {
  26. results: Record<string, AlertingQueryResult>;
  27. }
  28. export class AlertingQueryRunner {
  29. private subject: ReplaySubject<Record<string, PanelData>>;
  30. private subscription?: Unsubscribable;
  31. private lastResult: Record<string, PanelData>;
  32. constructor(private backendSrv = getBackendSrv(), private dataSourceSrv = getDataSourceSrv()) {
  33. this.subject = new ReplaySubject(1);
  34. this.lastResult = {};
  35. }
  36. get(): Observable<Record<string, PanelData>> {
  37. return this.subject.asObservable();
  38. }
  39. async run(queries: AlertQuery[]) {
  40. if (queries.length === 0) {
  41. const empty = initialState(queries, LoadingState.Done);
  42. return this.subject.next(empty);
  43. }
  44. // do not execute if one more of the queries are not runnable,
  45. // for example not completely configured
  46. for (const query of queries) {
  47. if (!isExpressionQuery(query.model)) {
  48. const ds = await this.dataSourceSrv.get(query.datasourceUid);
  49. if (ds.filterQuery && !ds.filterQuery(query.model)) {
  50. const empty = initialState(queries, LoadingState.Done);
  51. return this.subject.next(empty);
  52. }
  53. }
  54. }
  55. this.subscription = runRequest(this.backendSrv, queries).subscribe({
  56. next: (dataPerQuery) => {
  57. const nextResult = applyChange(dataPerQuery, (refId, data) => {
  58. const previous = this.lastResult[refId];
  59. const preProcessed = preProcessPanelData(data, previous);
  60. return setStructureRevision(preProcessed, previous);
  61. });
  62. this.lastResult = nextResult;
  63. this.subject.next(this.lastResult);
  64. },
  65. error: (error: Error) => {
  66. this.lastResult = mapErrorToPanelData(this.lastResult, error);
  67. this.subject.next(this.lastResult);
  68. },
  69. });
  70. }
  71. cancel() {
  72. if (!this.subscription) {
  73. return;
  74. }
  75. this.subscription.unsubscribe();
  76. let requestIsRunning = false;
  77. const nextResult = applyChange(this.lastResult, (refId, data) => {
  78. if (data.state === LoadingState.Loading) {
  79. requestIsRunning = true;
  80. }
  81. return {
  82. ...data,
  83. state: LoadingState.Done,
  84. };
  85. });
  86. if (requestIsRunning) {
  87. this.subject.next(nextResult);
  88. }
  89. }
  90. destroy() {
  91. if (this.subject) {
  92. this.subject.complete();
  93. }
  94. this.cancel();
  95. }
  96. }
  97. const runRequest = (backendSrv: BackendSrv, queries: AlertQuery[]): Observable<Record<string, PanelData>> => {
  98. const initial = initialState(queries, LoadingState.Loading);
  99. const request = {
  100. data: { data: queries },
  101. url: '/api/v1/eval',
  102. method: 'POST',
  103. requestId: uuidv4(),
  104. };
  105. return withLoadingIndicator({
  106. whileLoading: initial,
  107. source: backendSrv.fetch<AlertingQueryResponse>(request).pipe(
  108. mapToPanelData(initial),
  109. catchError((error) => of(mapErrorToPanelData(initial, error))),
  110. cancelNetworkRequestsOnUnsubscribe(backendSrv, request.requestId),
  111. share()
  112. ),
  113. });
  114. };
  115. const initialState = (queries: AlertQuery[], state: LoadingState): Record<string, PanelData> => {
  116. return queries.reduce((dataByQuery: Record<string, PanelData>, query) => {
  117. dataByQuery[query.refId] = {
  118. state,
  119. series: [],
  120. timeRange: getTimeRange(query, queries),
  121. };
  122. return dataByQuery;
  123. }, {});
  124. };
  125. const getTimeRange = (query: AlertQuery, queries: AlertQuery[]): TimeRange => {
  126. if (isExpressionQuery(query.model)) {
  127. const relative = getTimeRangeForExpression(query.model, queries);
  128. return rangeUtil.relativeToTimeRange(relative);
  129. }
  130. if (!query.relativeTimeRange) {
  131. console.warn(`Query with refId: ${query.refId} did not have any relative time range, using default.`);
  132. return getDefaultTimeRange();
  133. }
  134. return rangeUtil.relativeToTimeRange(query.relativeTimeRange);
  135. };
  136. const mapToPanelData = (
  137. dataByQuery: Record<string, PanelData>
  138. ): OperatorFunction<FetchResponse<AlertingQueryResponse>, Record<string, PanelData>> => {
  139. return map((response) => {
  140. const { data } = response;
  141. const results: Record<string, PanelData> = {};
  142. for (const [refId, result] of Object.entries(data.results)) {
  143. results[refId] = {
  144. timeRange: dataByQuery[refId].timeRange,
  145. state: LoadingState.Done,
  146. series: result.frames.map(dataFrameFromJSON),
  147. };
  148. }
  149. return results;
  150. });
  151. };
  152. const mapErrorToPanelData = (lastResult: Record<string, PanelData>, error: Error): Record<string, PanelData> => {
  153. const queryError = toDataQueryError(error);
  154. return applyChange(lastResult, (refId, data) => {
  155. return {
  156. ...data,
  157. state: LoadingState.Error,
  158. error: queryError,
  159. };
  160. });
  161. };
  162. const applyChange = (
  163. initial: Record<string, PanelData>,
  164. change: (refId: string, data: PanelData) => PanelData
  165. ): Record<string, PanelData> => {
  166. const nextResult: Record<string, PanelData> = {};
  167. for (const [refId, data] of Object.entries(initial)) {
  168. nextResult[refId] = change(refId, data);
  169. }
  170. return nextResult;
  171. };