runRequest.ts 7.2 KB


  1. // Libraries
  2. import { isString, map as isArray } from 'lodash';
  3. import { from, merge, Observable, of, timer } from 'rxjs';
  4. import { catchError, map, mapTo, share, takeUntil, tap } from 'rxjs/operators';
  5. // Utils & Services
  6. // Types
  7. import {
  8. DataFrame,
  9. DataQueryError,
  10. DataQueryRequest,
  11. DataQueryResponse,
  12. DataQueryResponseData,
  13. DataSourceApi,
  14. DataTopic,
  15. dateMath,
  16. guessFieldTypes,
  17. LoadingState,
  18. PanelData,
  19. TimeRange,
  20. toDataFrame,
  21. } from '@grafana/data';
  22. import { toDataQueryError } from '@grafana/runtime';
  23. import { isExpressionReference } from '@grafana/runtime/src/utils/DataSourceWithBackend';
  24. import { backendSrv } from 'app/core/services/backend_srv';
  25. import { dataSource as expressionDatasource } from 'app/features/expressions/ExpressionDatasource';
  26. import { ExpressionQuery } from 'app/features/expressions/types';
  27. import { cancelNetworkRequestsOnUnsubscribe } from './processing/canceler';
  28. import { emitDataRequestEvent } from './queryAnalytics';
  29. type MapOfResponsePackets = { [str: string]: DataQueryResponse };
  30. interface RunningQueryState {
  31. packets: { [key: string]: DataQueryResponse };
  32. panelData: PanelData;
  33. }
  34. /*
  35. * This function should handle composing a PanelData from multiple responses
  36. */
  37. export function processResponsePacket(packet: DataQueryResponse, state: RunningQueryState): RunningQueryState {
  38. const request = state.panelData.request!;
  39. const packets: MapOfResponsePackets = {
  40. ...state.packets,
  41. };
  42. // updates to the same key will replace previous values
  43. const key = packet.key ?? packet.data?.[0]?.refId ?? 'A';
  44. packets[key] = packet;
  45. let loadingState = packet.state || LoadingState.Done;
  46. let error: DataQueryError | undefined = undefined;
  47. const series: DataQueryResponseData[] = [];
  48. const annotations: DataQueryResponseData[] = [];
  49. for (const key in packets) {
  50. const packet = packets[key];
  51. if (packet.error) {
  52. loadingState = LoadingState.Error;
  53. error = packet.error;
  54. }
  55. if (packet.data && packet.data.length) {
  56. for (const dataItem of packet.data) {
  57. if (dataItem.meta?.dataTopic === DataTopic.Annotations) {
  58. annotations.push(dataItem);
  59. continue;
  60. }
  61. series.push(dataItem);
  62. }
  63. }
  64. }
  65. const timeRange = getRequestTimeRange(request, loadingState);
  66. const panelData = {
  67. state: loadingState,
  68. series,
  69. annotations,
  70. error,
  71. request,
  72. timeRange,
  73. };
  74. return { packets, panelData };
  75. }
  76. function getRequestTimeRange(request: DataQueryRequest, loadingState: LoadingState): TimeRange {
  77. const range = request.range;
  78. if (!isString(range.raw.from) || loadingState !== LoadingState.Streaming) {
  79. return range;
  80. }
  81. return {
  82. ...range,
  83. from: dateMath.parse(range.raw.from, false)!,
  84. to: dateMath.parse(range.raw.to, true)!,
  85. };
  86. }
  87. /**
  88. * This function handles the execution of requests & and processes the single or multiple response packets into
  89. * a combined PanelData response. It will
  90. * Merge multiple responses into a single DataFrame array based on the packet key
  91. * Will emit a loading state if no response after 50ms
  92. * Cancel any still running network requests on unsubscribe (using request.requestId)
  93. */
  94. export function runRequest(
  95. datasource: DataSourceApi,
  96. request: DataQueryRequest,
  97. queryFunction?: typeof datasource.query
  98. ): Observable<PanelData> {
  99. let state: RunningQueryState = {
  100. panelData: {
  101. state: LoadingState.Loading,
  102. series: [],
  103. request: request,
  104. timeRange: request.range,
  105. },
  106. packets: {},
  107. };
  108. // Return early if there are no queries to run
  109. if (!request.targets.length) {
  110. request.endTime = Date.now();
  111. state.panelData.state = LoadingState.Done;
  112. return of(state.panelData);
  113. }
  114. const dataObservable = callQueryMethod(datasource, request, queryFunction).pipe(
  115. // Transform response packets into PanelData with merged results
  116. map((packet: DataQueryResponse) => {
  117. if (!isArray(packet.data)) {
  118. throw new Error(`Expected response data to be array, got ${typeof packet.data}.`);
  119. }
  120. request.endTime = Date.now();
  121. state = processResponsePacket(packet, state);
  122. return state.panelData;
  123. }),
  124. // handle errors
  125. catchError((err) => {
  126. const errLog = typeof err === 'string' ? err : JSON.stringify(err);
  127. console.error('runRequest.catchError', errLog);
  128. return of({
  129. ...state.panelData,
  130. state: LoadingState.Error,
  131. error: toDataQueryError(err),
  132. });
  133. }),
  134. tap(emitDataRequestEvent(datasource)),
  135. // finalize is triggered when subscriber unsubscribes
  136. // This makes sure any still running network requests are cancelled
  137. cancelNetworkRequestsOnUnsubscribe(backendSrv, request.requestId),
  138. // this makes it possible to share this observable in takeUntil
  139. share()
  140. );
  141. // If 50ms without a response emit a loading state
  142. // mapTo will translate the timer event into state.panelData (which has state set to loading)
  143. // takeUntil will cancel the timer emit when first response packet is received on the dataObservable
  144. return merge(timer(200).pipe(mapTo(state.panelData), takeUntil(dataObservable)), dataObservable);
  145. }
  146. export function callQueryMethod(
  147. datasource: DataSourceApi,
  148. request: DataQueryRequest,
  149. queryFunction?: typeof datasource.query
  150. ) {
  151. // If any query has an expression, use the expression endpoint
  152. for (const target of request.targets) {
  153. if (isExpressionReference(target.datasource)) {
  154. return expressionDatasource.query(request as DataQueryRequest<ExpressionQuery>);
  155. }
  156. }
  157. // Otherwise it is a standard datasource request
  158. const returnVal = queryFunction ? queryFunction(request) : datasource.query(request);
  159. return from(returnVal);
  160. }
  161. function getProcessedDataFrame(data: DataQueryResponseData): DataFrame {
  162. const dataFrame = guessFieldTypes(toDataFrame(data));
  163. if (dataFrame.fields && dataFrame.fields.length) {
  164. // clear out the cached info
  165. for (const field of dataFrame.fields) {
  166. field.state = null;
  167. }
  168. }
  169. return dataFrame;
  170. }
  171. /**
  172. * All panels will be passed tables that have our best guess at column type set
  173. *
  174. * This is also used by PanelChrome for snapshot support
  175. */
  176. export function getProcessedDataFrames(results?: DataQueryResponseData[]): DataFrame[] {
  177. if (!results || !isArray(results)) {
  178. return [];
  179. }
  180. return results.map((data) => getProcessedDataFrame(data));
  181. }
  182. export function preProcessPanelData(data: PanelData, lastResult?: PanelData): PanelData {
  183. const { series, annotations } = data;
  184. // for loading states with no data, use last result
  185. if (data.state === LoadingState.Loading && series.length === 0) {
  186. if (!lastResult) {
  187. lastResult = data;
  188. }
  189. return {
  190. ...lastResult,
  191. state: LoadingState.Loading,
  192. request: data.request,
  193. };
  194. }
  195. // Make sure the data frames are properly formatted
  196. const STARTTIME = performance.now();
  197. const processedDataFrames = series.map((data) => getProcessedDataFrame(data));
  198. const annotationsProcessed = getProcessedDataFrames(annotations);
  199. const STOPTIME = performance.now();
  200. return {
  201. ...data,
  202. series: processedDataFrames,
  203. annotations: annotationsProcessed,
  204. timings: { dataProcessingTime: STOPTIME - STARTTIME },
  205. };
  206. }