query.ts 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. import { AnyAction, createAction, PayloadAction } from '@reduxjs/toolkit';
  2. import deepEqual from 'fast-deep-equal';
  3. import { identity, Observable, of, SubscriptionLike, Unsubscribable } from 'rxjs';
  4. import { mergeMap, throttleTime } from 'rxjs/operators';
  5. import {
  6. AbsoluteTimeRange,
  7. DataQuery,
  8. DataQueryErrorType,
  9. DataQueryResponse,
  10. DataSourceApi,
  11. hasLogsVolumeSupport,
  12. hasQueryExportSupport,
  13. hasQueryImportSupport,
  14. HistoryItem,
  15. LoadingState,
  16. PanelData,
  17. PanelEvents,
  18. QueryFixAction,
  19. toLegacyResponseData,
  20. } from '@grafana/data';
  21. import { config, reportInteraction } from '@grafana/runtime';
  22. import {
  23. buildQueryTransaction,
  24. ensureQueries,
  25. generateEmptyQuery,
  26. generateNewKeyAndAddRefIdIfMissing,
  27. getQueryKeys,
  28. hasNonEmptyQuery,
  29. stopQueryState,
  30. updateHistory,
  31. } from 'app/core/utils/explore';
  32. import { getShiftedTimeRange } from 'app/core/utils/timePicker';
  33. import { getTimeZone } from 'app/features/profile/state/selectors';
  34. import { ExploreItemState, ExplorePanelData, ThunkDispatch, ThunkResult } from 'app/types';
  35. import { ExploreId, ExploreState, QueryOptions } from 'app/types/explore';
  36. import { notifyApp } from '../../../core/actions';
  37. import { createErrorNotification } from '../../../core/copy/appNotification';
  38. import { runRequest } from '../../query/state/runRequest';
  39. import { decorateData } from '../utils/decorators';
  40. import { addHistoryItem, historyUpdatedAction, loadRichHistory } from './history';
  41. import { stateSave } from './main';
  42. import { updateTime } from './time';
  43. import { createCacheKey, getResultsFromCache } from './utils';
  44. //
  45. // Actions and Payloads
  46. //
  47. /**
  48. * Adds a query row after the row with the given index.
  49. */
  50. export interface AddQueryRowPayload {
  51. exploreId: ExploreId;
  52. index: number;
  53. query: DataQuery;
  54. }
  55. export const addQueryRowAction = createAction<AddQueryRowPayload>('explore/addQueryRow');
  56. /**
  57. * Query change handler for the query row with the given index.
  58. * If `override` is reset the query modifications and run the queries. Use this to set queries via a link.
  59. */
  60. export interface ChangeQueriesPayload {
  61. exploreId: ExploreId;
  62. queries: DataQuery[];
  63. }
  64. export const changeQueriesAction = createAction<ChangeQueriesPayload>('explore/changeQueries');
  65. /**
  66. * Cancel running queries.
  67. */
  68. export interface CancelQueriesPayload {
  69. exploreId: ExploreId;
  70. }
  71. export const cancelQueriesAction = createAction<CancelQueriesPayload>('explore/cancelQueries');
  72. export interface QueriesImportedPayload {
  73. exploreId: ExploreId;
  74. queries: DataQuery[];
  75. }
  76. export const queriesImportedAction = createAction<QueriesImportedPayload>('explore/queriesImported');
  77. /**
  78. * Action to modify a query given a datasource-specific modifier action.
  79. * @param exploreId Explore area
  80. * @param modification Action object with a type, e.g., ADD_FILTER
  81. * @param index Optional query row index. If omitted, the modification is applied to all query rows.
  82. * @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`.
  83. */
  84. export interface ModifyQueriesPayload {
  85. exploreId: ExploreId;
  86. modification: QueryFixAction;
  87. index?: number;
  88. modifier: (query: DataQuery, modification: QueryFixAction) => DataQuery;
  89. }
  90. export const modifyQueriesAction = createAction<ModifyQueriesPayload>('explore/modifyQueries');
  91. export interface QueryStoreSubscriptionPayload {
  92. exploreId: ExploreId;
  93. querySubscription: Unsubscribable;
  94. }
  95. export const queryStoreSubscriptionAction = createAction<QueryStoreSubscriptionPayload>(
  96. 'explore/queryStoreSubscription'
  97. );
  98. export interface StoreLogsVolumeDataProvider {
  99. exploreId: ExploreId;
  100. logsVolumeDataProvider?: Observable<DataQueryResponse>;
  101. }
  102. /**
  103. * Stores available logs volume provider after running the query. Used internally by runQueries().
  104. */
  105. export const storeLogsVolumeDataProviderAction = createAction<StoreLogsVolumeDataProvider>(
  106. 'explore/storeLogsVolumeDataProviderAction'
  107. );
  108. export const cleanLogsVolumeAction = createAction<{ exploreId: ExploreId }>('explore/cleanLogsVolumeAction');
  109. export interface StoreLogsVolumeDataSubscriptionPayload {
  110. exploreId: ExploreId;
  111. logsVolumeDataSubscription?: SubscriptionLike;
  112. }
  113. /**
  114. * Stores current logs volume subscription for given explore pane.
  115. */
  116. const storeLogsVolumeDataSubscriptionAction = createAction<StoreLogsVolumeDataSubscriptionPayload>(
  117. 'explore/storeLogsVolumeDataSubscriptionAction'
  118. );
  119. /**
  120. * Stores data returned by the provider. Used internally by loadLogsVolumeData().
  121. */
  122. const updateLogsVolumeDataAction = createAction<{
  123. exploreId: ExploreId;
  124. logsVolumeData: DataQueryResponse;
  125. }>('explore/updateLogsVolumeDataAction');
  126. export interface QueryEndedPayload {
  127. exploreId: ExploreId;
  128. response: ExplorePanelData;
  129. }
  130. export const queryStreamUpdatedAction = createAction<QueryEndedPayload>('explore/queryStreamUpdated');
  131. /**
  132. * Reset queries to the given queries. Any modifications will be discarded.
  133. * Use this action for clicks on query examples. Triggers a query run.
  134. */
  135. export interface SetQueriesPayload {
  136. exploreId: ExploreId;
  137. queries: DataQuery[];
  138. }
  139. export const setQueriesAction = createAction<SetQueriesPayload>('explore/setQueries');
  140. export interface ChangeLoadingStatePayload {
  141. exploreId: ExploreId;
  142. loadingState: LoadingState;
  143. }
  144. export const changeLoadingStateAction = createAction<ChangeLoadingStatePayload>('changeLoadingState');
  145. export interface SetPausedStatePayload {
  146. exploreId: ExploreId;
  147. isPaused: boolean;
  148. }
  149. export const setPausedStateAction = createAction<SetPausedStatePayload>('explore/setPausedState');
  150. /**
  151. * Start a scan for more results using the given scanner.
  152. * @param exploreId Explore area
  153. * @param scanner Function that a) returns a new time range and b) triggers a query run for the new range
  154. */
  155. export interface ScanStartPayload {
  156. exploreId: ExploreId;
  157. }
  158. export const scanStartAction = createAction<ScanStartPayload>('explore/scanStart');
  159. /**
  160. * Stop any scanning for more results.
  161. */
  162. export interface ScanStopPayload {
  163. exploreId: ExploreId;
  164. }
  165. export const scanStopAction = createAction<ScanStopPayload>('explore/scanStop');
  166. /**
  167. * Adds query results to cache.
  168. * This is currently used to cache last 5 query results for log queries run from logs navigation (pagination).
  169. */
  170. export interface AddResultsToCachePayload {
  171. exploreId: ExploreId;
  172. cacheKey: string;
  173. queryResponse: ExplorePanelData;
  174. }
  175. export const addResultsToCacheAction = createAction<AddResultsToCachePayload>('explore/addResultsToCache');
  176. /**
  177. * Clears cache.
  178. */
  179. export interface ClearCachePayload {
  180. exploreId: ExploreId;
  181. }
  182. export const clearCacheAction = createAction<ClearCachePayload>('explore/clearCache');
  183. //
  184. // Action creators
  185. //
  186. /**
  187. * Adds a query row after the row with the given index.
  188. */
  189. export function addQueryRow(exploreId: ExploreId, index: number): ThunkResult<void> {
  190. return (dispatch, getState) => {
  191. const queries = getState().explore[exploreId]!.queries;
  192. const query = generateEmptyQuery(queries, index);
  193. dispatch(addQueryRowAction({ exploreId, index, query }));
  194. };
  195. }
  196. /**
  197. * Cancel running queries
  198. */
  199. export function cancelQueries(exploreId: ExploreId): ThunkResult<void> {
  200. return (dispatch, getState) => {
  201. dispatch(scanStopAction({ exploreId }));
  202. dispatch(cancelQueriesAction({ exploreId }));
  203. dispatch(
  204. storeLogsVolumeDataProviderAction({
  205. exploreId,
  206. logsVolumeDataProvider: undefined,
  207. })
  208. );
  209. // clear any incomplete data
  210. if (getState().explore[exploreId]!.logsVolumeData?.state !== LoadingState.Done) {
  211. dispatch(cleanLogsVolumeAction({ exploreId }));
  212. }
  213. dispatch(stateSave());
  214. };
  215. }
  216. /**
  217. * Import queries from previous datasource if possible eg Loki and Prometheus have similar query language so the
  218. * labels part can be reused to get similar data.
  219. * @param exploreId
  220. * @param queries
  221. * @param sourceDataSource
  222. * @param targetDataSource
  223. */
  224. export const importQueries = (
  225. exploreId: ExploreId,
  226. queries: DataQuery[],
  227. sourceDataSource: DataSourceApi | undefined | null,
  228. targetDataSource: DataSourceApi
  229. ): ThunkResult<void> => {
  230. return async (dispatch) => {
  231. if (!sourceDataSource) {
  232. // explore not initialized
  233. dispatch(queriesImportedAction({ exploreId, queries }));
  234. return;
  235. }
  236. let importedQueries = queries;
  237. // Check if queries can be imported from previously selected datasource
  238. if (sourceDataSource.meta?.id === targetDataSource.meta?.id) {
  239. // Keep same queries if same type of datasource, but delete datasource query property to prevent mismatch of new and old data source instance
  240. importedQueries = queries.map(({ datasource, ...query }) => query);
  241. } else if (hasQueryExportSupport(sourceDataSource) && hasQueryImportSupport(targetDataSource)) {
  242. const abstractQueries = await sourceDataSource.exportToAbstractQueries(queries);
  243. importedQueries = await targetDataSource.importFromAbstractQueries(abstractQueries);
  244. } else if (targetDataSource.importQueries) {
  245. // Datasource-specific importers
  246. importedQueries = await targetDataSource.importQueries(queries, sourceDataSource);
  247. } else {
  248. // Default is blank queries
  249. importedQueries = ensureQueries();
  250. }
  251. const nextQueries = ensureQueries(importedQueries);
  252. dispatch(queriesImportedAction({ exploreId, queries: nextQueries }));
  253. };
  254. };
  255. /**
  256. * Action to modify a query given a datasource-specific modifier action.
  257. * @param exploreId Explore area
  258. * @param modification Action object with a type, e.g., ADD_FILTER
  259. * @param index Optional query row index. If omitted, the modification is applied to all query rows.
  260. * @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`.
  261. */
  262. export function modifyQueries(
  263. exploreId: ExploreId,
  264. modification: QueryFixAction,
  265. modifier: any,
  266. index?: number
  267. ): ThunkResult<void> {
  268. return (dispatch) => {
  269. dispatch(modifyQueriesAction({ exploreId, modification, index, modifier }));
  270. if (!modification.preventSubmit) {
  271. dispatch(runQueries(exploreId));
  272. }
  273. };
  274. }
  275. async function handleHistory(
  276. dispatch: ThunkDispatch,
  277. state: ExploreState,
  278. history: Array<HistoryItem<DataQuery>>,
  279. datasource: DataSourceApi,
  280. queries: DataQuery[],
  281. exploreId: ExploreId
  282. ) {
  283. const datasourceId = datasource.meta.id;
  284. const nextHistory = updateHistory(history, datasourceId, queries);
  285. dispatch(historyUpdatedAction({ exploreId, history: nextHistory }));
  286. dispatch(addHistoryItem(datasource.uid, datasource.name, queries));
  287. // Because filtering happens in the backend we cannot add a new entry without checking if it matches currently
  288. // used filters. Instead, we refresh the query history list.
  289. // TODO: run only if Query History list is opened (#47252)
  290. await dispatch(loadRichHistory(ExploreId.left));
  291. await dispatch(loadRichHistory(ExploreId.right));
  292. }
  293. /**
  294. * Main action to run queries and dispatches sub-actions based on which result viewers are active
  295. */
  296. export const runQueries = (
  297. exploreId: ExploreId,
  298. options?: { replaceUrl?: boolean; preserveCache?: boolean }
  299. ): ThunkResult<void> => {
  300. return (dispatch, getState) => {
  301. dispatch(updateTime({ exploreId }));
  302. // We always want to clear cache unless we explicitly pass preserveCache parameter
  303. const preserveCache = options?.preserveCache === true;
  304. if (!preserveCache) {
  305. dispatch(clearCache(exploreId));
  306. }
  307. const exploreItemState = getState().explore[exploreId]!;
  308. const {
  309. datasourceInstance,
  310. containerWidth,
  311. isLive: live,
  312. range,
  313. scanning,
  314. queryResponse,
  315. querySubscription,
  316. refreshInterval,
  317. absoluteRange,
  318. cache,
  319. } = exploreItemState;
  320. let newQuerySub;
  321. const queries = exploreItemState.queries.map((query) => ({
  322. ...query,
  323. datasource: query.datasource || datasourceInstance?.getRef(),
  324. }));
  325. if (datasourceInstance != null) {
  326. handleHistory(dispatch, getState().explore, exploreItemState.history, datasourceInstance, queries, exploreId);
  327. }
  328. dispatch(stateSave({ replace: options?.replaceUrl }));
  329. const cachedValue = getResultsFromCache(cache, absoluteRange);
  330. // If we have results saved in cache, we are going to use those results instead of running queries
  331. if (cachedValue) {
  332. newQuerySub = of(cachedValue)
  333. .pipe(
  334. mergeMap((data: PanelData) =>
  335. decorateData(
  336. data,
  337. queryResponse,
  338. absoluteRange,
  339. refreshInterval,
  340. queries,
  341. datasourceInstance != null && hasLogsVolumeSupport(datasourceInstance)
  342. )
  343. )
  344. )
  345. .subscribe((data) => {
  346. if (!data.error) {
  347. dispatch(stateSave());
  348. }
  349. dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
  350. });
  351. // If we don't have results saved in cache, run new queries
  352. } else {
  353. if (!hasNonEmptyQuery(queries)) {
  354. dispatch(stateSave({ replace: options?.replaceUrl })); // Remember to save to state and update location
  355. return;
  356. }
  357. if (!datasourceInstance) {
  358. return;
  359. }
  360. // Some datasource's query builders allow per-query interval limits,
  361. // but we're using the datasource interval limit for now
  362. const minInterval = datasourceInstance?.interval;
  363. stopQueryState(querySubscription);
  364. const queryOptions: QueryOptions = {
  365. minInterval,
  366. // maxDataPoints is used in:
  367. // Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that.
  368. // Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit.
  369. // Influx - used to correctly display logs in graph
  370. // TODO:unification
  371. // maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth,
  372. maxDataPoints: containerWidth,
  373. liveStreaming: live,
  374. };
  375. const timeZone = getTimeZone(getState().user);
  376. const transaction = buildQueryTransaction(exploreId, queries, queryOptions, range, scanning, timeZone);
  377. dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Loading }));
  378. newQuerySub = runRequest(datasourceInstance, transaction.request)
  379. .pipe(
  380. // Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
  381. // rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
  382. // actually can see what is happening.
  383. live ? throttleTime(500) : identity,
  384. mergeMap((data: PanelData) =>
  385. decorateData(
  386. data,
  387. queryResponse,
  388. absoluteRange,
  389. refreshInterval,
  390. queries,
  391. datasourceInstance != null && hasLogsVolumeSupport(datasourceInstance)
  392. )
  393. )
  394. )
  395. .subscribe({
  396. next(data) {
  397. if (data.logsResult !== null) {
  398. reportInteraction('grafana_explore_logs_result_displayed', {
  399. datasourceType: datasourceInstance.type,
  400. });
  401. }
  402. dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
  403. // Keep scanning for results if this was the last scanning transaction
  404. if (getState().explore[exploreId]!.scanning) {
  405. if (data.state === LoadingState.Done && data.series.length === 0) {
  406. const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
  407. dispatch(updateTime({ exploreId, absoluteRange: range }));
  408. dispatch(runQueries(exploreId));
  409. } else {
  410. // We can stop scanning if we have a result
  411. dispatch(scanStopAction({ exploreId }));
  412. }
  413. }
  414. },
  415. error(error) {
  416. dispatch(notifyApp(createErrorNotification('Query processing error', error)));
  417. dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Error }));
  418. console.error(error);
  419. },
  420. complete() {
  421. // In case we don't get any response at all but the observable completed, make sure we stop loading state.
  422. // This is for cases when some queries are noop like running first query after load but we don't have any
  423. // actual query input.
  424. if (getState().explore[exploreId]!.queryResponse.state === LoadingState.Loading) {
  425. dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Done }));
  426. }
  427. },
  428. });
  429. if (live) {
  430. dispatch(
  431. storeLogsVolumeDataProviderAction({
  432. exploreId,
  433. logsVolumeDataProvider: undefined,
  434. })
  435. );
  436. dispatch(cleanLogsVolumeAction({ exploreId }));
  437. } else if (hasLogsVolumeSupport(datasourceInstance)) {
  438. const sourceRequest = {
  439. ...transaction.request,
  440. requestId: transaction.request.requestId + '_log_volume',
  441. };
  442. const logsVolumeDataProvider = datasourceInstance.getLogsVolumeDataProvider(sourceRequest);
  443. dispatch(
  444. storeLogsVolumeDataProviderAction({
  445. exploreId,
  446. logsVolumeDataProvider,
  447. })
  448. );
  449. const { logsVolumeData, absoluteRange } = getState().explore[exploreId]!;
  450. if (!canReuseLogsVolumeData(logsVolumeData, queries, absoluteRange)) {
  451. dispatch(cleanLogsVolumeAction({ exploreId }));
  452. dispatch(loadLogsVolumeData(exploreId));
  453. }
  454. } else {
  455. dispatch(
  456. storeLogsVolumeDataProviderAction({
  457. exploreId,
  458. logsVolumeDataProvider: undefined,
  459. })
  460. );
  461. }
  462. }
  463. dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySub }));
  464. };
  465. };
  466. /**
  467. * Checks if after changing the time range the existing data can be used to show logs volume.
  468. * It can happen if queries are the same and new time range is within existing data time range.
  469. */
  470. function canReuseLogsVolumeData(
  471. logsVolumeData: DataQueryResponse | undefined,
  472. queries: DataQuery[],
  473. selectedTimeRange: AbsoluteTimeRange
  474. ): boolean {
  475. if (logsVolumeData && logsVolumeData.data[0]) {
  476. // check if queries are the same
  477. if (!deepEqual(logsVolumeData.data[0].meta?.custom?.targets, queries)) {
  478. return false;
  479. }
  480. const dataRange = logsVolumeData && logsVolumeData.data[0] && logsVolumeData.data[0].meta?.custom?.absoluteRange;
  481. // if selected range is within loaded logs volume
  482. if (dataRange && dataRange.from <= selectedTimeRange.from && selectedTimeRange.to <= dataRange.to) {
  483. return true;
  484. }
  485. }
  486. return false;
  487. }
  488. /**
  489. * Reset queries to the given queries. Any modifications will be discarded.
  490. * Use this action for clicks on query examples. Triggers a query run.
  491. */
  492. export function setQueries(exploreId: ExploreId, rawQueries: DataQuery[]): ThunkResult<void> {
  493. return (dispatch, getState) => {
  494. // Inject react keys into query objects
  495. const queries = getState().explore[exploreId]!.queries;
  496. const nextQueries = rawQueries.map((query, index) => generateNewKeyAndAddRefIdIfMissing(query, queries, index));
  497. dispatch(setQueriesAction({ exploreId, queries: nextQueries }));
  498. dispatch(runQueries(exploreId));
  499. };
  500. }
  501. /**
  502. * Start a scan for more results using the given scanner.
  503. * @param exploreId Explore area
  504. * @param scanner Function that a) returns a new time range and b) triggers a query run for the new range
  505. */
  506. export function scanStart(exploreId: ExploreId): ThunkResult<void> {
  507. return (dispatch, getState) => {
  508. // Register the scanner
  509. dispatch(scanStartAction({ exploreId }));
  510. // Scanning must trigger query run, and return the new range
  511. const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
  512. // Set the new range to be displayed
  513. dispatch(updateTime({ exploreId, absoluteRange: range }));
  514. dispatch(runQueries(exploreId));
  515. };
  516. }
  517. export function addResultsToCache(exploreId: ExploreId): ThunkResult<void> {
  518. return (dispatch, getState) => {
  519. const queryResponse = getState().explore[exploreId]!.queryResponse;
  520. const absoluteRange = getState().explore[exploreId]!.absoluteRange;
  521. const cacheKey = createCacheKey(absoluteRange);
  522. // Save results to cache only when all results recived and loading is done
  523. if (queryResponse.state === LoadingState.Done) {
  524. dispatch(addResultsToCacheAction({ exploreId, cacheKey, queryResponse }));
  525. }
  526. };
  527. }
  528. export function clearCache(exploreId: ExploreId): ThunkResult<void> {
  529. return (dispatch, getState) => {
  530. dispatch(clearCacheAction({ exploreId }));
  531. };
  532. }
  533. /**
  534. * Initializes loading logs volume data and stores emitted value.
  535. */
  536. export function loadLogsVolumeData(exploreId: ExploreId): ThunkResult<void> {
  537. return (dispatch, getState) => {
  538. const { logsVolumeDataProvider } = getState().explore[exploreId]!;
  539. if (logsVolumeDataProvider) {
  540. const logsVolumeDataSubscription = logsVolumeDataProvider.subscribe({
  541. next: (logsVolumeData: DataQueryResponse) => {
  542. dispatch(updateLogsVolumeDataAction({ exploreId, logsVolumeData }));
  543. },
  544. });
  545. dispatch(storeLogsVolumeDataSubscriptionAction({ exploreId, logsVolumeDataSubscription }));
  546. }
  547. };
  548. }
  549. //
  550. // Reducer
  551. //
  552. // Redux Toolkit uses ImmerJs as part of their solution to ensure that state objects are not mutated.
  553. // ImmerJs has an autoFreeze option that freezes objects from change which means this reducer can't be migrated to createSlice
  554. // because the state would become frozen and during run time we would get errors because flot (Graph lib) would try to mutate
  555. // the frozen state.
  556. // https://github.com/reduxjs/redux-toolkit/issues/242
  557. export const queryReducer = (state: ExploreItemState, action: AnyAction): ExploreItemState => {
  558. if (addQueryRowAction.match(action)) {
  559. const { queries } = state;
  560. const { index, query } = action.payload;
  561. // Add to queries, which will cause a new row to be rendered
  562. const nextQueries = [...queries.slice(0, index + 1), { ...query }, ...queries.slice(index + 1)];
  563. return {
  564. ...state,
  565. queries: nextQueries,
  566. queryKeys: getQueryKeys(nextQueries, state.datasourceInstance),
  567. };
  568. }
  569. if (changeQueriesAction.match(action)) {
  570. const { queries } = action.payload;
  571. return {
  572. ...state,
  573. queries,
  574. };
  575. }
  576. if (cancelQueriesAction.match(action)) {
  577. stopQueryState(state.querySubscription);
  578. return {
  579. ...state,
  580. loading: false,
  581. };
  582. }
  583. if (modifyQueriesAction.match(action)) {
  584. const { queries } = state;
  585. const { modification, index, modifier } = action.payload;
  586. let nextQueries: DataQuery[];
  587. if (index === undefined) {
  588. // Modify all queries
  589. nextQueries = queries.map((query, i) => {
  590. const nextQuery = modifier({ ...query }, modification);
  591. return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i);
  592. });
  593. } else {
  594. // Modify query only at index
  595. nextQueries = queries.map((query, i) => {
  596. if (i === index) {
  597. const nextQuery = modifier({ ...query }, modification);
  598. return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i);
  599. }
  600. return query;
  601. });
  602. }
  603. return {
  604. ...state,
  605. queries: nextQueries,
  606. queryKeys: getQueryKeys(nextQueries, state.datasourceInstance),
  607. };
  608. }
  609. if (setQueriesAction.match(action)) {
  610. const { queries } = action.payload;
  611. return {
  612. ...state,
  613. queries: queries.slice(),
  614. queryKeys: getQueryKeys(queries, state.datasourceInstance),
  615. };
  616. }
  617. if (queriesImportedAction.match(action)) {
  618. const { queries } = action.payload;
  619. return {
  620. ...state,
  621. queries,
  622. queryKeys: getQueryKeys(queries, state.datasourceInstance),
  623. };
  624. }
  625. if (queryStoreSubscriptionAction.match(action)) {
  626. const { querySubscription } = action.payload;
  627. return {
  628. ...state,
  629. querySubscription,
  630. };
  631. }
  632. if (storeLogsVolumeDataProviderAction.match(action)) {
  633. let { logsVolumeDataProvider } = action.payload;
  634. if (state.logsVolumeDataSubscription) {
  635. state.logsVolumeDataSubscription.unsubscribe();
  636. }
  637. return {
  638. ...state,
  639. logsVolumeDataProvider,
  640. logsVolumeDataSubscription: undefined,
  641. };
  642. }
  643. if (cleanLogsVolumeAction.match(action)) {
  644. return {
  645. ...state,
  646. logsVolumeData: undefined,
  647. };
  648. }
  649. if (storeLogsVolumeDataSubscriptionAction.match(action)) {
  650. const { logsVolumeDataSubscription } = action.payload;
  651. return {
  652. ...state,
  653. logsVolumeDataSubscription,
  654. };
  655. }
  656. if (updateLogsVolumeDataAction.match(action)) {
  657. let { logsVolumeData } = action.payload;
  658. return {
  659. ...state,
  660. logsVolumeData,
  661. };
  662. }
  663. if (queryStreamUpdatedAction.match(action)) {
  664. return processQueryResponse(state, action);
  665. }
  666. if (queriesImportedAction.match(action)) {
  667. const { queries } = action.payload;
  668. return {
  669. ...state,
  670. queries,
  671. queryKeys: getQueryKeys(queries, state.datasourceInstance),
  672. };
  673. }
  674. if (changeLoadingStateAction.match(action)) {
  675. const { loadingState } = action.payload;
  676. return {
  677. ...state,
  678. queryResponse: {
  679. ...state.queryResponse,
  680. state: loadingState,
  681. },
  682. loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
  683. };
  684. }
  685. if (setPausedStateAction.match(action)) {
  686. const { isPaused } = action.payload;
  687. return {
  688. ...state,
  689. isPaused: isPaused,
  690. };
  691. }
  692. if (scanStartAction.match(action)) {
  693. return { ...state, scanning: true };
  694. }
  695. if (scanStopAction.match(action)) {
  696. return {
  697. ...state,
  698. scanning: false,
  699. scanRange: undefined,
  700. };
  701. }
  702. if (addResultsToCacheAction.match(action)) {
  703. const CACHE_LIMIT = 5;
  704. const { cache } = state;
  705. const { queryResponse, cacheKey } = action.payload;
  706. let newCache = [...cache];
  707. const isDuplicateKey = newCache.some((c) => c.key === cacheKey);
  708. if (!isDuplicateKey) {
  709. const newCacheItem = { key: cacheKey, value: queryResponse };
  710. newCache = [newCacheItem, ...newCache].slice(0, CACHE_LIMIT);
  711. }
  712. return {
  713. ...state,
  714. cache: newCache,
  715. };
  716. }
  717. if (clearCacheAction.match(action)) {
  718. return {
  719. ...state,
  720. cache: [],
  721. };
  722. }
  723. return state;
  724. };
  725. export const processQueryResponse = (
  726. state: ExploreItemState,
  727. action: PayloadAction<QueryEndedPayload>
  728. ): ExploreItemState => {
  729. const { response } = action.payload;
  730. const {
  731. request,
  732. state: loadingState,
  733. series,
  734. error,
  735. graphResult,
  736. logsResult,
  737. tableResult,
  738. traceFrames,
  739. nodeGraphFrames,
  740. } = response;
  741. if (error) {
  742. if (error.type === DataQueryErrorType.Timeout) {
  743. return {
  744. ...state,
  745. queryResponse: response,
  746. loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
  747. };
  748. } else if (error.type === DataQueryErrorType.Cancelled) {
  749. return state;
  750. }
  751. // Send error to Angular editors
  752. // When angularSupportEnabled is removed we can remove this code and all references to eventBridge
  753. if (config.angularSupportEnabled && state.datasourceInstance?.components?.QueryCtrl) {
  754. state.eventBridge.emit(PanelEvents.dataError, error);
  755. }
  756. }
  757. if (!request) {
  758. return { ...state };
  759. }
  760. // Send legacy data to Angular editors
  761. // When angularSupportEnabled is removed we can remove this code and all references to eventBridge
  762. if (config.angularSupportEnabled && state.datasourceInstance?.components?.QueryCtrl) {
  763. const legacy = series.map((v) => toLegacyResponseData(v));
  764. state.eventBridge.emit(PanelEvents.dataReceived, legacy);
  765. }
  766. return {
  767. ...state,
  768. queryResponse: response,
  769. graphResult,
  770. tableResult,
  771. logsResult,
  772. loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
  773. showLogs: !!logsResult,
  774. showMetrics: !!graphResult,
  775. showTable: !!tableResult,
  776. showTrace: !!traceFrames.length,
  777. showNodeGraph: !!nodeGraphFrames.length,
  778. };
  779. };