import { Observable, of, OperatorFunction, ReplaySubject, Unsubscribable } from 'rxjs'; import { catchError, map, share } from 'rxjs/operators'; import { v4 as uuidv4 } from 'uuid'; import { dataFrameFromJSON, DataFrameJSON, getDefaultTimeRange, LoadingState, PanelData, rangeUtil, TimeRange, withLoadingIndicator, } from '@grafana/data'; import { FetchResponse, getDataSourceSrv, toDataQueryError } from '@grafana/runtime'; import { BackendSrv, getBackendSrv } from 'app/core/services/backend_srv'; import { isExpressionQuery } from 'app/features/expressions/guards'; import { cancelNetworkRequestsOnUnsubscribe } from 'app/features/query/state/processing/canceler'; import { setStructureRevision } from 'app/features/query/state/processing/revision'; import { preProcessPanelData } from 'app/features/query/state/runRequest'; import { AlertQuery } from 'app/types/unified-alerting-dto'; import { getTimeRangeForExpression } from '../utils/timeRange'; export interface AlertingQueryResult { frames: DataFrameJSON[]; } export interface AlertingQueryResponse { results: Record; } export class AlertingQueryRunner { private subject: ReplaySubject>; private subscription?: Unsubscribable; private lastResult: Record; constructor(private backendSrv = getBackendSrv(), private dataSourceSrv = getDataSourceSrv()) { this.subject = new ReplaySubject(1); this.lastResult = {}; } get(): Observable> { return this.subject.asObservable(); } async run(queries: AlertQuery[]) { if (queries.length === 0) { const empty = initialState(queries, LoadingState.Done); return this.subject.next(empty); } // do not execute if one more of the queries are not runnable, // for example not completely configured for (const query of queries) { if (!isExpressionQuery(query.model)) { const ds = await this.dataSourceSrv.get(query.datasourceUid); if (ds.filterQuery && !ds.filterQuery(query.model)) { const empty = initialState(queries, LoadingState.Done); return this.subject.next(empty); } } } this.subscription = runRequest(this.backendSrv, queries).subscribe({ next: (dataPerQuery) => { const nextResult = applyChange(dataPerQuery, (refId, data) => { const previous = this.lastResult[refId]; const preProcessed = preProcessPanelData(data, previous); return setStructureRevision(preProcessed, previous); }); this.lastResult = nextResult; this.subject.next(this.lastResult); }, error: (error: Error) => { this.lastResult = mapErrorToPanelData(this.lastResult, error); this.subject.next(this.lastResult); }, }); } cancel() { if (!this.subscription) { return; } this.subscription.unsubscribe(); let requestIsRunning = false; const nextResult = applyChange(this.lastResult, (refId, data) => { if (data.state === LoadingState.Loading) { requestIsRunning = true; } return { ...data, state: LoadingState.Done, }; }); if (requestIsRunning) { this.subject.next(nextResult); } } destroy() { if (this.subject) { this.subject.complete(); } this.cancel(); } } const runRequest = (backendSrv: BackendSrv, queries: AlertQuery[]): Observable> => { const initial = initialState(queries, LoadingState.Loading); const request = { data: { data: queries }, url: '/api/v1/eval', method: 'POST', requestId: uuidv4(), }; return withLoadingIndicator({ whileLoading: initial, source: backendSrv.fetch(request).pipe( mapToPanelData(initial), catchError((error) => of(mapErrorToPanelData(initial, error))), cancelNetworkRequestsOnUnsubscribe(backendSrv, request.requestId), share() ), }); }; const initialState = (queries: AlertQuery[], state: LoadingState): Record => { return queries.reduce((dataByQuery: Record, query) => { dataByQuery[query.refId] = { state, series: [], timeRange: getTimeRange(query, queries), }; return dataByQuery; }, {}); }; const getTimeRange = (query: AlertQuery, queries: AlertQuery[]): TimeRange => { if (isExpressionQuery(query.model)) { const relative = getTimeRangeForExpression(query.model, queries); return rangeUtil.relativeToTimeRange(relative); } if (!query.relativeTimeRange) { console.warn(`Query with refId: ${query.refId} did not have any relative time range, using default.`); return getDefaultTimeRange(); } return rangeUtil.relativeToTimeRange(query.relativeTimeRange); }; const mapToPanelData = ( dataByQuery: Record ): OperatorFunction, Record> => { return map((response) => { const { data } = response; const results: Record = {}; for (const [refId, result] of Object.entries(data.results)) { results[refId] = { timeRange: dataByQuery[refId].timeRange, state: LoadingState.Done, series: result.frames.map(dataFrameFromJSON), }; } return results; }); }; const mapErrorToPanelData = (lastResult: Record, error: Error): Record => { const queryError = toDataQueryError(error); return applyChange(lastResult, (refId, data) => { return { ...data, state: LoadingState.Error, error: queryError, }; }); }; const applyChange = ( initial: Record, change: (refId: string, data: PanelData) => PanelData ): Record => { const nextResult: Record = {}; for (const [refId, data] of Object.entries(initial)) { nextResult[refId] = change(refId, data); } return nextResult; };