PanelQueryRunner.ts 11 KB

  1. import { cloneDeep } from 'lodash';
  2. import { MonoTypeOperatorFunction, Observable, of, ReplaySubject, Unsubscribable } from 'rxjs';
  3. import { map, mergeMap } from 'rxjs/operators';
  4. import {
  5. applyFieldOverrides,
  6. compareArrayValues,
  7. compareDataFrameStructures,
  8. CoreApp,
  9. DataConfigSource,
  10. DataFrame,
  11. DataQuery,
  12. DataQueryRequest,
  13. DataSourceApi,
  14. DataSourceJsonData,
  15. DataSourceRef,
  16. DataTransformerConfig,
  17. getDefaultTimeRange,
  18. LoadingState,
  19. PanelData,
  20. rangeUtil,
  21. ScopedVars,
  22. TimeRange,
  23. TimeZone,
  24. toDataFrame,
  25. transformDataFrame,
  26. } from '@grafana/data';
  27. import { getTemplateSrv } from '@grafana/runtime';
  28. import { ExpressionDatasourceRef } from '@grafana/runtime/src/utils/DataSourceWithBackend';
  29. import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
  30. import { isStreamingDataFrame } from 'app/features/live/data/utils';
  31. import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
  32. import { isSharedDashboardQuery, runSharedRequest } from '../../../plugins/datasource/dashboard';
  33. import { PanelModel } from '../../dashboard/state';
  34. import { getDashboardQueryRunner } from './DashboardQueryRunner/DashboardQueryRunner';
  35. import { mergePanelAndDashData } from './mergePanelAndDashData';
  36. import { preProcessPanelData, runRequest } from './runRequest';
  37. export interface QueryRunnerOptions<
  38. TQuery extends DataQuery = DataQuery,
  39. TOptions extends DataSourceJsonData = DataSourceJsonData
  40. > {
  41. datasource: DataSourceRef | DataSourceApi<TQuery, TOptions> | null;
  42. queries: TQuery[];
  43. panelId?: number;
  44. dashboardId?: number;
  45. timezone: TimeZone;
  46. timeRange: TimeRange;
  47. timeInfo?: string; // String description of time range for display
  48. maxDataPoints: number;
  49. minInterval: string | undefined | null;
  50. scopedVars?: ScopedVars;
  51. cacheTimeout?: string | null;
  52. transformations?: DataTransformerConfig[];
  53. }
  54. let counter = 100;
  55. export function getNextRequestId() {
  56. return 'Q' + counter++;
  57. }
  58. export interface GetDataOptions {
  59. withTransforms: boolean;
  60. withFieldConfig: boolean;
  61. }
  62. export class PanelQueryRunner {
  63. private subject: ReplaySubject<PanelData>;
  64. private subscription?: Unsubscribable;
  65. private lastResult?: PanelData;
  66. private dataConfigSource: DataConfigSource;
  67. private lastRequest?: DataQueryRequest;
  68. constructor(dataConfigSource: DataConfigSource) {
  69. this.subject = new ReplaySubject(1);
  70. this.dataConfigSource = dataConfigSource;
  71. }
  72. /**
  73. * Returns an observable that subscribes to the shared multi-cast subject (that reply last result).
  74. */
  75. getData(options: GetDataOptions): Observable<PanelData> {
  76. const { withFieldConfig, withTransforms } = options;
  77. let structureRev = 1;
  78. let lastData: DataFrame[] = [];
  79. let isFirstPacket = true;
  80. let lastConfigRev = -1;
  81. if (this.dataConfigSource.snapshotData) {
  82. const snapshotPanelData: PanelData = {
  83. state: LoadingState.Done,
  84. series: => toDataFrame(v)),
  85. timeRange: getDefaultTimeRange(), // Don't need real time range for snapshots
  86. };
  87. return of(snapshotPanelData);
  88. }
  89. return this.subject.pipe(
  90. this.getTransformationsStream(withTransforms),
  91. map((data: PanelData) => {
  92. let processedData = data;
  93. let streamingPacketWithSameSchema = false;
  94. if (withFieldConfig && data.series?.length) {
  95. if (lastConfigRev === this.dataConfigSource.configRev) {
  96. const streamingDataFrame = data.series.find((data) => isStreamingDataFrame(data)) as
  97. | StreamingDataFrame
  98. | undefined;
  99. if (
  100. streamingDataFrame &&
  101. !streamingDataFrame.packetInfo.schemaChanged &&
  102. // TODO: remove the condition below after fixing
  103. //
  104. lastData[0].fields.length === streamingDataFrame.fields.length
  105. ) {
  106. processedData = {
  107. ...processedData,
  108. series:, frameIndex) => ({
  109. ...frame,
  110. length: data.series[frameIndex].length,
  111. fields:, fieldIndex) => ({
  112. ...field,
  113. values: data.series[frameIndex].fields[fieldIndex].values,
  114. state: {
  115. ...field.state,
  116. calcs: undefined,
  117. range: undefined,
  118. },
  119. })),
  120. })),
  121. };
  122. streamingPacketWithSameSchema = true;
  123. }
  124. }
  125. // Apply field defaults and overrides
  126. let fieldConfig = this.dataConfigSource.getFieldOverrideOptions();
  127. if (fieldConfig != null && (isFirstPacket || !streamingPacketWithSameSchema)) {
  128. lastConfigRev = this.dataConfigSource.configRev!;
  129. processedData = {
  130. ...processedData,
  131. series: applyFieldOverrides({
  132. timeZone: data.request?.timezone ?? 'browser',
  133. data: processedData.series,
  134. ...fieldConfig!,
  135. }),
  136. };
  137. isFirstPacket = false;
  138. }
  139. }
  140. if (
  141. !streamingPacketWithSameSchema &&
  142. !compareArrayValues(lastData, processedData.series, compareDataFrameStructures)
  143. ) {
  144. structureRev++;
  145. }
  146. lastData = processedData.series;
  147. return { ...processedData, structureRev };
  148. })
  149. );
  150. }
  151. private getTransformationsStream = (withTransforms: boolean): MonoTypeOperatorFunction<PanelData> => {
  152. return (inputStream) =>
  153. inputStream.pipe(
  154. mergeMap((data) => {
  155. if (!withTransforms) {
  156. return of(data);
  157. }
  158. const transformations = this.dataConfigSource.getTransformations();
  159. if (!transformations || transformations.length === 0) {
  160. return of(data);
  161. }
  162. const replace = (option: string): string => {
  163. return getTemplateSrv().replace(option, data?.request?.scopedVars);
  164. };
  165. transformations.forEach((transform: any) => {
  166. transform.replace = replace;
  167. });
  168. return transformDataFrame(transformations, data.series).pipe(map((series) => ({, series })));
  169. })
  170. );
  171. };
  172. async run(options: QueryRunnerOptions) {
  173. const {
  174. queries,
  175. timezone,
  176. datasource,
  177. panelId,
  178. dashboardId,
  179. timeRange,
  180. timeInfo,
  181. cacheTimeout,
  182. maxDataPoints,
  183. scopedVars,
  184. minInterval,
  185. } = options;
  186. if (isSharedDashboardQuery(datasource)) {
  187. this.pipeToSubject(runSharedRequest(options), panelId);
  188. return;
  189. }
  190. const request: DataQueryRequest = {
  191. app: CoreApp.Dashboard,
  192. requestId: getNextRequestId(),
  193. timezone,
  194. panelId,
  195. dashboardId,
  196. range: timeRange,
  197. timeInfo,
  198. interval: '',
  199. intervalMs: 0,
  200. targets: cloneDeep(queries),
  201. maxDataPoints: maxDataPoints,
  202. scopedVars: scopedVars || {},
  203. cacheTimeout,
  204. startTime:,
  205. };
  206. // Add deprecated property
  207. (request as any).rangeRaw = timeRange.raw;
  208. try {
  209. const ds = await getDataSource(datasource, request.scopedVars);
  210. const isMixedDS = ds.meta?.mixed;
  211. // Attach the data source to each query
  212. request.targets = => {
  213. const isExpressionQuery = query.datasource?.type === ExpressionDatasourceRef.type;
  214. // When using a data source variable, the panel might have the incorrect datasource
  215. // stored, so when running the query make sure it is done with the correct one
  216. if (!query.datasource || (query.datasource.uid !== ds.uid && !isMixedDS && !isExpressionQuery)) {
  217. query.datasource = ds.getRef();
  218. }
  219. return query;
  220. });
  221. const lowerIntervalLimit = minInterval ? getTemplateSrv().replace(minInterval, request.scopedVars) : ds.interval;
  222. const norm = rangeUtil.calculateInterval(timeRange, maxDataPoints, lowerIntervalLimit);
  223. // make shallow copy of scoped vars,
  224. // and add built in variables interval and interval_ms
  225. request.scopedVars = Object.assign({}, request.scopedVars, {
  226. __interval: { text: norm.interval, value: norm.interval },
  227. __interval_ms: { text: norm.intervalMs.toString(), value: norm.intervalMs },
  228. });
  229. request.interval = norm.interval;
  230. request.intervalMs = norm.intervalMs;
  231. this.lastRequest = request;
  232. this.pipeToSubject(runRequest(ds, request), panelId);
  233. } catch (err) {
  234. console.error('PanelQueryRunner Error', err);
  235. }
  236. }
  237. private pipeToSubject(observable: Observable<PanelData>, panelId?: number) {
  238. if (this.subscription) {
  239. this.subscription.unsubscribe();
  240. }
  241. let panelData = observable;
  242. const dataSupport = this.dataConfigSource.getDataSupport();
  243. if (dataSupport.alertStates || dataSupport.annotations) {
  244. const panel = this.dataConfigSource as unknown as PanelModel;
  245. panelData = mergePanelAndDashData(observable, getDashboardQueryRunner().getResult(;
  246. }
  247. this.subscription = panelData.subscribe({
  248. next: (data) => {
  249. this.lastResult = preProcessPanelData(data, this.lastResult);
  250. // Store preprocessed query results for applying overrides later on in the pipeline
  252. },
  253. });
  254. }
  255. cancelQuery() {
  256. if (!this.subscription) {
  257. return;
  258. }
  259. this.subscription.unsubscribe();
  260. // If we have an old result with loading state, send it with done state
  261. if (this.lastResult && this.lastResult.state === LoadingState.Loading) {
  263. ...this.lastResult,
  264. state: LoadingState.Done,
  265. });
  266. }
  267. }
  268. resendLastResult = () => {
  269. if (this.lastResult) {
  271. }
  272. };
  273. clearLastResult() {
  274. this.lastResult = undefined;
  275. // A new subject is also needed since it's a replay subject that remembers/sends last value
  276. this.subject = new ReplaySubject(1);
  277. }
  278. /**
  279. * Called when the panel is closed
  280. */
  281. destroy() {
  282. // Tell anyone listening that we are done
  283. if (this.subject) {
  284. this.subject.complete();
  285. }
  286. if (this.subscription) {
  287. this.subscription.unsubscribe();
  288. }
  289. }
  290. useLastResultFrom(runner: PanelQueryRunner) {
  291. this.lastResult = runner.getLastResult();
  292. if (this.lastResult) {
  293. // The subject is a replay subject so anyone subscribing will get this last result
  295. }
  296. }
  297. getLastResult(): PanelData | undefined {
  298. return this.lastResult;
  299. }
  300. getLastRequest(): DataQueryRequest | undefined {
  301. return this.lastRequest;
  302. }
  303. }
  304. async function getDataSource(
  305. datasource: DataSourceRef | string | DataSourceApi | null,
  306. scopedVars: ScopedVars
  307. ): Promise<DataSourceApi> {
  308. if (datasource && (datasource as any).query) {
  309. return datasource as DataSourceApi;
  310. }
  311. return await getDatasourceSrv().get(datasource as string, scopedVars);
  312. }