datasource.ts 33 KB

  1. import { cloneDeep, find, findLast, isEmpty, isString, set } from 'lodash';
  2. import React from 'react';
  3. import { from, lastValueFrom, merge, Observable, of, throwError, zip } from 'rxjs';
  4. import { catchError, concatMap, finalize, map, mergeMap, repeat, scan, share, takeWhile, tap } from 'rxjs/operators';
  5. import {
  6. DataFrame,
  7. DataQueryError,
  8. DataQueryErrorType,
  9. DataQueryRequest,
  10. DataQueryResponse,
  11. DataSourceInstanceSettings,
  12. DataSourceWithLogsContextSupport,
  13. dateMath,
  14. dateTimeFormat,
  15. FieldType,
  16. LoadingState,
  17. LogRowModel,
  18. rangeUtil,
  19. ScopedVars,
  20. TimeRange,
  21. } from '@grafana/data';
  22. import { DataSourceWithBackend, FetchError, getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
  23. import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
  24. import { notifyApp } from 'app/core/actions';
  25. import { config } from 'app/core/config';
  26. import { createErrorNotification } from 'app/core/copy/appNotification';
  27. import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
  28. import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
  29. import { VariableWithMultiSupport } from 'app/features/variables/types';
  30. import { store } from 'app/store/store';
  31. import { AppNotificationTimeout } from 'app/types';
  32. import { CloudWatchAnnotationSupport } from './annotationSupport';
  33. import { SQLCompletionItemProvider } from './cloudwatch-sql/completion/CompletionItemProvider';
  34. import { ThrottlingErrorMessage } from './components/ThrottlingErrorMessage';
  35. import { isCloudWatchAnnotationQuery, isCloudWatchLogsQuery, isCloudWatchMetricsQuery } from './guards';
  36. import { CloudWatchLanguageProvider } from './language_provider';
  37. import memoizedDebounce from './memoizedDebounce';
  38. import { MetricMathCompletionItemProvider } from './metric-math/completion/CompletionItemProvider';
  39. import { migrateMetricQuery } from './migrations/metricQueryMigrations';
  40. import {
  41. CloudWatchAnnotationQuery,
  42. CloudWatchJsonData,
  43. CloudWatchLogsQuery,
  44. CloudWatchLogsQueryStatus,
  45. CloudWatchLogsRequest,
  46. CloudWatchMetricsQuery,
  47. CloudWatchQuery,
  48. DescribeLogGroupsRequest,
  49. Dimensions,
  50. GetLogEventsRequest,
  51. GetLogGroupFieldsRequest,
  52. GetLogGroupFieldsResponse,
  53. LogAction,
  54. MetricEditorMode,
  55. MetricQuery,
  56. MetricQueryType,
  57. MetricRequest,
  58. MultiFilters,
  59. StartQueryRequest,
  60. TSDBResponse,
  61. } from './types';
  62. import { addDataLinksToLogsResponse } from './utils/datalinks';
  63. import { runWithRetry } from './utils/logsRetry';
  64. import { increasingInterval } from './utils/rxjs/increasingInterval';
  65. import { CloudWatchVariableSupport } from './variables';
  66. const DS_QUERY_ENDPOINT = '/api/ds/query';
  67. // Constants also defined in tsdb/cloudwatch/cloudwatch.go
  68. export const LOG_IDENTIFIER_INTERNAL = '__log__grafana_internal__';
  69. export const LOGSTREAM_IDENTIFIER_INTERNAL = '__logstream__grafana_internal__';
  70. const displayAlert = (datasourceName: string, region: string) =>
  71. store.dispatch(
  72. notifyApp(
  73. createErrorNotification(
  74. `CloudWatch request limit reached in ${region} for data source ${datasourceName}`,
  75. '',
  76. undefined,
  77. React.createElement(ThrottlingErrorMessage, { region }, null)
  78. )
  79. )
  80. );
  81. const displayCustomError = (title: string, message: string) =>
  82. store.dispatch(notifyApp(createErrorNotification(title, message)));
  83. export class CloudWatchDatasource
  84. extends DataSourceWithBackend<CloudWatchQuery, CloudWatchJsonData>
  85. implements DataSourceWithLogsContextSupport<CloudWatchLogsQuery>
  86. {
  87. proxyUrl: any;
  88. defaultRegion: any;
  89. datasourceName: string;
  90. languageProvider: CloudWatchLanguageProvider;
  91. sqlCompletionItemProvider: SQLCompletionItemProvider;
  92. metricMathCompletionItemProvider: MetricMathCompletionItemProvider;
  93. tracingDataSourceUid?: string;
  94. logsTimeout: string;
  95. type = 'cloudwatch';
  96. standardStatistics = ['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount'];
  97. debouncedAlert: (datasourceName: string, region: string) => void = memoizedDebounce(
  98. displayAlert,
  99. AppNotificationTimeout.Error
  100. );
  101. debouncedCustomAlert: (title: string, message: string) => void = memoizedDebounce(
  102. displayCustomError,
  103. AppNotificationTimeout.Error
  104. );
  105. logQueries: Record<string, { id: string; region: string; statsQuery: boolean }> = {};
  106. constructor(
  107. instanceSettings: DataSourceInstanceSettings<CloudWatchJsonData>,
  108. private readonly templateSrv: TemplateSrv = getTemplateSrv(),
  109. private readonly timeSrv: TimeSrv = getTimeSrv()
  110. ) {
  111. super(instanceSettings);
  112. this.proxyUrl = instanceSettings.url;
  113. this.defaultRegion = instanceSettings.jsonData.defaultRegion;
  114. this.datasourceName =;
  115. this.languageProvider = new CloudWatchLanguageProvider(this);
  116. this.tracingDataSourceUid = instanceSettings.jsonData.tracingDatasourceUid;
  117. this.logsTimeout = instanceSettings.jsonData.logsTimeout || '15m';
  118. this.sqlCompletionItemProvider = new SQLCompletionItemProvider(this, this.templateSrv);
  119. this.metricMathCompletionItemProvider = new MetricMathCompletionItemProvider(this, this.templateSrv);
  120. this.variables = new CloudWatchVariableSupport(this);
  121. this.annotations = CloudWatchAnnotationSupport;
  122. }
  123. filterQuery(query: CloudWatchQuery) {
  124. return query.hide !== true || (isCloudWatchMetricsQuery(query) && !== '');
  125. }
  126. query(options: DataQueryRequest<CloudWatchQuery>): Observable<DataQueryResponse> {
  127. options = cloneDeep(options);
  128. let queries = options.targets.filter(this.filterQuery);
  129. const { logQueries, metricsQueries, annotationQueries } = this.getTargetsByQueryMode(queries);
  130. const dataQueryResponses: Array<Observable<DataQueryResponse>> = [];
  131. if (logQueries.length > 0) {
  132. dataQueryResponses.push(this.handleLogQueries(logQueries, options));
  133. }
  134. if (metricsQueries.length > 0) {
  135. dataQueryResponses.push(this.handleMetricQueries(metricsQueries, options));
  136. }
  137. if (annotationQueries.length > 0) {
  138. dataQueryResponses.push(this.handleAnnotationQuery(annotationQueries, options));
  139. }
  140. // No valid targets, return the empty result to save a round trip.
  141. if (isEmpty(dataQueryResponses)) {
  142. return of({
  143. data: [],
  144. state: LoadingState.Done,
  145. });
  146. }
  147. return merge(...dataQueryResponses);
  148. }
  149. /**
  150. * Handle log query. The log query works by starting the query on the CloudWatch and then periodically polling for
  151. * results.
  152. * @param logQueries
  153. * @param options
  154. */
  155. handleLogQueries = (
  156. logQueries: CloudWatchLogsQuery[],
  157. options: DataQueryRequest<CloudWatchQuery>
  158. ): Observable<DataQueryResponse> => {
  159. const validLogQueries = logQueries.filter((item) => item.logGroupNames?.length);
  160. if (logQueries.length > validLogQueries.length) {
  161. return of({ data: [], error: { message: 'Log group is required' } });
  162. }
  163. // No valid targets, return the empty result to save a round trip.
  164. if (isEmpty(validLogQueries)) {
  165. return of({ data: [], state: LoadingState.Done });
  166. }
  167. const queryParams = CloudWatchLogsQuery) => ({
  168. queryString: target.expression || '',
  169. refId: target.refId,
  170. logGroupNames: target.logGroupNames,
  171. region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'),
  172. }));
  173. const startTime = new Date();
  174. const timeoutFunc = () => {
  175. return >= startTime.valueOf() + rangeUtil.intervalToMs(this.logsTimeout);
  176. };
  177. return runWithRetry(
  178. (targets: StartQueryRequest[]) => {
  179. return this.makeLogActionRequest('StartQuery', targets, {
  180. makeReplacements: true,
  181. scopedVars: options.scopedVars,
  182. skipCache: true,
  183. });
  184. },
  185. queryParams,
  186. timeoutFunc
  187. ).pipe(
  188. mergeMap(({ frames, error }: { frames: DataFrame[]; error?: DataQueryError }) =>
  189. // This queries for the results
  190. this.logsQuery(
  191. => ({
  192. queryId: dataFrame.fields[0].values.get(0),
  193. region: dataFrame.meta?.custom?.['Region'] ?? 'default',
  194. refId: dataFrame.refId!,
  195. statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery)
  196. .statsGroups,
  197. })),
  198. timeoutFunc
  199. ).pipe(
  200. map((response: DataQueryResponse) => {
  201. if (!response.error && error) {
  202. response.error = error;
  203. }
  204. return response;
  205. })
  206. )
  207. ),
  208. mergeMap((dataQueryResponse) => {
  209. return from(
  210. (async () => {
  211. await addDataLinksToLogsResponse(
  212. dataQueryResponse,
  213. options,
  214. this.timeSrv.timeRange(),
  215. this.replace.bind(this),
  216. this.expandVariableToArray.bind(this),
  217. this.getActualRegion.bind(this),
  218. this.tracingDataSourceUid
  219. );
  220. return dataQueryResponse;
  221. })()
  222. );
  223. })
  224. );
  225. };
  226. filterMetricQuery(query: CloudWatchMetricsQuery): boolean {
  227. const { region, metricQueryType, metricEditorMode, expression, metricName, namespace, sqlExpression, statistic } =
  228. query;
  229. if (!region) {
  230. return false;
  231. }
  232. if (metricQueryType === MetricQueryType.Search && metricEditorMode === MetricEditorMode.Builder) {
  233. return !!namespace && !!metricName && !!statistic;
  234. } else if (metricQueryType === MetricQueryType.Search && metricEditorMode === MetricEditorMode.Code) {
  235. return !!expression;
  236. } else if (metricQueryType === MetricQueryType.Query) {
  237. // still TBD how to validate the visual query builder for SQL
  238. return !!sqlExpression;
  239. }
  240. throw new Error('invalid metric editor mode');
  241. }
  242. replaceMetricQueryVars(
  243. query: CloudWatchMetricsQuery,
  244. options: DataQueryRequest<CloudWatchQuery>
  245. ): CloudWatchMetricsQuery {
  246. query.region = this.templateSrv.replace(this.getActualRegion(query.region), options.scopedVars);
  247. query.namespace = this.replace(query.namespace, options.scopedVars, true, 'namespace');
  248. query.metricName = this.replace(query.metricName, options.scopedVars, true, 'metric name');
  249. query.dimensions = this.convertDimensionFormat(query.dimensions ?? {}, options.scopedVars);
  250. query.statistic = this.templateSrv.replace(query.statistic, options.scopedVars);
  251. query.period = String(this.getPeriod(query, options)); // use string format for period in graph query, and alerting
  252. = this.templateSrv.replace(, options.scopedVars);
  253. query.expression = this.templateSrv.replace(query.expression, options.scopedVars);
  254. query.sqlExpression = this.templateSrv.replace(query.sqlExpression, options.scopedVars, 'raw');
  255. return query;
  256. }
  257. handleMetricQueries = (
  258. metricQueries: CloudWatchMetricsQuery[],
  259. options: DataQueryRequest<CloudWatchQuery>
  260. ): Observable<DataQueryResponse> => {
  261. const timezoneUTCOffset = dateTimeFormat(, {
  262. timeZone: options.timezone,
  263. format: 'Z',
  264. }).replace(':', '');
  265. const validMetricsQueries = metricQueries
  266. .filter(this.filterMetricQuery)
  267. .map((q: CloudWatchMetricsQuery): MetricQuery => {
  268. const migratedQuery = migrateMetricQuery(q);
  269. const migratedAndIterpolatedQuery = this.replaceMetricQueryVars(migratedQuery, options);
  270. return {
  271. timezoneUTCOffset,
  272. intervalMs: options.intervalMs,
  273. maxDataPoints: options.maxDataPoints,
  274. ...migratedAndIterpolatedQuery,
  275. type: 'timeSeriesQuery',
  276. datasource: this.getRef(),
  277. };
  278. });
  279. // No valid targets, return the empty result to save a round trip.
  280. if (isEmpty(validMetricsQueries)) {
  281. return of({ data: [] });
  282. }
  283. const request = {
  284. from: options?.range?.from.valueOf().toString(),
  285. to: options?.range?.to.valueOf().toString(),
  286. queries: validMetricsQueries,
  287. };
  288. return this.performTimeSeriesQuery(request, options.range);
  289. };
  290. handleAnnotationQuery(
  291. queries: CloudWatchAnnotationQuery[],
  292. options: DataQueryRequest<CloudWatchQuery>
  293. ): Observable<DataQueryResponse> {
  294. return this.awsRequest(DS_QUERY_ENDPOINT, {
  295. from: options.range.from.valueOf().toString(),
  296. to:,
  297. queries: => ({
  298. ...query,
  299. statistic: this.templateSrv.replace(query.statistic),
  300. region: this.templateSrv.replace(this.getActualRegion(query.region)),
  301. namespace: this.templateSrv.replace(query.namespace),
  302. metricName: this.templateSrv.replace(query.metricName),
  303. dimensions: this.convertDimensionFormat(query.dimensions ?? {}, {}),
  304. period: query.period ? parseInt(query.period, 10) : 300,
  305. actionPrefix: query.actionPrefix ?? '',
  306. alarmNamePrefix: query.alarmNamePrefix ?? '',
  307. type: 'annotationQuery',
  308. datasource: this.getRef(),
  309. })),
  310. }).pipe(
  311. map((r) => {
  312. const frames = toDataQueryResponse({ data: r }).data as DataFrame[];
  313. return { data: frames };
  314. })
  315. );
  316. }
  317. /**
  318. * Checks progress and polls data of a started logs query with some retry logic.
  319. * @param queryParams
  320. */
  321. logsQuery(
  322. queryParams: Array<{
  323. queryId: string;
  324. refId: string;
  325. limit?: number;
  326. region: string;
  327. statsGroups?: string[];
  328. }>,
  329. timeoutFunc: () => boolean
  330. ): Observable<DataQueryResponse> {
  331. this.logQueries = {};
  332. queryParams.forEach((param) => {
  333. this.logQueries[param.refId] = {
  334. id: param.queryId,
  335. region: param.region,
  336. statsQuery: (param.statsGroups?.length ?? 0) > 0 ?? false,
  337. };
  338. });
  339. const dataFrames = increasingInterval({ startPeriod: 100, endPeriod: 1000, step: 300 }).pipe(
  340. concatMap((_) => this.makeLogActionRequest('GetQueryResults', queryParams, { skipCache: true })),
  341. repeat(),
  342. share()
  343. );
  344. const consecutiveFailedAttempts = dataFrames.pipe(
  345. scan(
  346. ({ failures, prevRecordsMatched }, frames) => {
  347. failures++;
  348. for (const frame of frames) {
  349. const recordsMatched = frame.meta?.stats?.find((stat) => stat.displayName === 'Records scanned')?.value!;
  350. if (recordsMatched > (prevRecordsMatched[frame.refId!] ?? 0)) {
  351. failures = 0;
  352. }
  353. prevRecordsMatched[frame.refId!] = recordsMatched;
  354. }
  355. return { failures, prevRecordsMatched };
  356. },
  357. { failures: 0, prevRecordsMatched: {} as Record<string, number> }
  358. ),
  359. map(({ failures }) => failures),
  360. share()
  361. );
  362. const queryResponse: Observable<DataQueryResponse> = zip(dataFrames, consecutiveFailedAttempts).pipe(
  363. tap(([dataFrames]) => {
  364. for (const frame of dataFrames) {
  365. if (
  366. [
  367. CloudWatchLogsQueryStatus.Complete,
  368. CloudWatchLogsQueryStatus.Cancelled,
  369. CloudWatchLogsQueryStatus.Failed,
  370. ].includes(frame.meta?.custom?.['Status']) &&
  371. this.logQueries.hasOwnProperty(frame.refId!)
  372. ) {
  373. delete this.logQueries[frame.refId!];
  374. }
  375. }
  376. }),
  377. map(([dataFrames, failedAttempts]) => {
  378. if (timeoutFunc()) {
  379. for (const frame of dataFrames) {
  380. set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Cancelled);
  381. }
  382. }
  383. return {
  384. data: dataFrames,
  385. key: 'test-key',
  386. state: dataFrames.every((dataFrame) =>
  387. [
  388. CloudWatchLogsQueryStatus.Complete,
  389. CloudWatchLogsQueryStatus.Cancelled,
  390. CloudWatchLogsQueryStatus.Failed,
  391. ].includes(dataFrame.meta?.custom?.['Status'])
  392. )
  393. ? LoadingState.Done
  394. : LoadingState.Loading,
  395. error: timeoutFunc()
  396. ? {
  397. message: `error: query timed out after ${failedAttempts} attempts`,
  398. type: DataQueryErrorType.Timeout,
  399. }
  400. : undefined,
  401. };
  402. }),
  403. takeWhile(({ state }) => state !== LoadingState.Error && state !== LoadingState.Done, true)
  404. );
  405. return withTeardown(queryResponse, () => this.stopQueries());
  406. }
  407. stopQueries() {
  408. if (Object.keys(this.logQueries).length > 0) {
  409. this.makeLogActionRequest(
  410. 'StopQuery',
  411. Object.values(this.logQueries).map((logQuery) => ({ queryId:, region: logQuery.region })),
  412. {
  413. makeReplacements: false,
  414. skipCache: true,
  415. }
  416. ).pipe(
  417. finalize(() => {
  418. this.logQueries = {};
  419. })
  420. );
  421. }
  422. }
  423. async describeLogGroups(params: DescribeLogGroupsRequest): Promise<string[]> {
  424. const dataFrames = await lastValueFrom(this.makeLogActionRequest('DescribeLogGroups', [params]));
  425. const logGroupNames = dataFrames[0]?.fields[0]?.values.toArray() ?? [];
  426. return logGroupNames;
  427. }
  428. async getLogGroupFields(params: GetLogGroupFieldsRequest): Promise<GetLogGroupFieldsResponse> {
  429. const dataFrames = await lastValueFrom(this.makeLogActionRequest('GetLogGroupFields', [params]));
  430. const fieldNames = dataFrames[0].fields[0].values.toArray();
  431. const fieldPercentages = dataFrames[0].fields[1].values.toArray();
  432. const getLogGroupFieldsResponse = {
  433. logGroupFields:, i) => ({ name: val, percent: fieldPercentages[i] })) ?? [],
  434. };
  435. return getLogGroupFieldsResponse;
  436. }
  437. getLogRowContext = async (
  438. row: LogRowModel,
  439. { limit = 10, direction = 'BACKWARD' }: RowContextOptions = {},
  440. query?: CloudWatchLogsQuery
  441. ): Promise<{ data: DataFrame[] }> => {
  442. let logStreamField = null;
  443. let logField = null;
  444. for (const field of row.dataFrame.fields) {
  446. logStreamField = field;
  447. if (logField !== null) {
  448. break;
  449. }
  450. } else if ( === LOG_IDENTIFIER_INTERNAL) {
  451. logField = field;
  452. if (logStreamField !== null) {
  453. break;
  454. }
  455. }
  456. }
  457. const requestParams: GetLogEventsRequest = {
  458. limit,
  459. startFromHead: direction !== 'BACKWARD',
  460. region: query?.region,
  461. logGroupName: parseLogGroupName(logField!.values.get(row.rowIndex)),
  462. logStreamName: logStreamField!.values.get(row.rowIndex),
  463. };
  464. if (direction === 'BACKWARD') {
  465. requestParams.endTime = row.timeEpochMs;
  466. } else {
  467. requestParams.startTime = row.timeEpochMs;
  468. }
  469. const dataFrames = await lastValueFrom(this.makeLogActionRequest('GetLogEvents', [requestParams]));
  470. return {
  471. data: dataFrames,
  472. };
  473. };
  474. getVariables() {
  475. return this.templateSrv.getVariables().map((v) => `$${}`);
  476. }
  477. getPeriod(target: CloudWatchMetricsQuery, options: any) {
  478. let period = this.templateSrv.replace(target.period, options.scopedVars) as any;
  479. if (period && period.toLowerCase() !== 'auto') {
  480. if (/^\d+$/.test(period)) {
  481. period = parseInt(period, 10);
  482. } else {
  483. period = rangeUtil.intervalToSeconds(period);
  484. }
  485. if (period < 1) {
  486. period = 1;
  487. }
  488. }
  489. return period || '';
  490. }
  491. performTimeSeriesQuery(request: MetricRequest, { from, to }: TimeRange): Observable<any> {
  492. return this.awsRequest(DS_QUERY_ENDPOINT, request).pipe(
  493. map((res) => {
  494. const dataframes: DataFrame[] = toDataQueryResponse({ data: res }).data;
  495. if (!dataframes || dataframes.length <= 0) {
  496. return { data: [] };
  497. }
  498. const lastError = findLast(res.results, (v) => !!v.error);
  499. dataframes.forEach((frame) => {
  500. frame.fields.forEach((field) => {
  501. if (field.type === FieldType.time) {
  502. // field.config.interval is populated in order for Grafana to fill in null values at frame intervals
  503. field.config.interval = frame.meta?.custom?.period * 1000;
  504. }
  505. });
  506. });
  507. return {
  508. data: dataframes,
  509. error: lastError ? { message: lastError.error } : null,
  510. };
  511. }),
  512. catchError((err) => {
  513. const isFrameError =;
  514. // Error is not frame specific
  515. if (!isFrameError && && === 'Metric request error' && {
  516. err.message =;
  517. return throwError(() => err);
  518. }
  519. // The error is either for a specific frame or for all the frames
  520. const results: Array<{ error?: string }> = Object.values(;
  521. const firstErrorResult = results.find((r) => r.error);
  522. if (firstErrorResult) {
  523. err.message = firstErrorResult.error;
  524. }
  525. if (results.some((r) => r.error && /^Throttling:.*/.test(r.error))) {
  526. const failedRedIds = Object.keys(;
  527. const regionsAffected = Object.values(request.queries).reduce(
  528. (res: string[], { refId, region }) =>
  529. (refId && !failedRedIds.includes(refId)) || res.includes(region) ? res : [...res, region],
  530. []
  531. ) as string[];
  532. regionsAffected.forEach((region) => {
  533. const actualRegion = this.getActualRegion(region);
  534. if (actualRegion) {
  535. this.debouncedAlert(this.datasourceName, actualRegion);
  536. }
  537. });
  538. }
  539. return throwError(() => err);
  540. })
  541. );
  542. }
  543. doMetricResourceRequest(subtype: string, parameters?: any): Promise<Array<{ text: any; label: any; value: any }>> {
  544. return this.getResource(subtype, parameters);
  545. }
  546. makeLogActionRequest(
  547. subtype: LogAction,
  548. queryParams: CloudWatchLogsRequest[],
  549. options: {
  550. scopedVars?: ScopedVars;
  551. makeReplacements?: boolean;
  552. skipCache?: boolean;
  553. } = {
  554. makeReplacements: true,
  555. skipCache: false,
  556. }
  557. ): Observable<DataFrame[]> {
  558. const range = this.timeSrv.timeRange();
  559. const requestParams = {
  560. from: range.from.valueOf().toString(),
  561. to:,
  562. queries: CloudWatchLogsRequest) => ({
  563. refId: (param as StartQueryRequest).refId || 'A',
  564. intervalMs: 1, // dummy
  565. maxDataPoints: 1, // dummy
  566. datasource: this.getRef(),
  567. type: 'logAction',
  568. subtype: subtype,
  569. ...param,
  570. })),
  571. };
  572. if (options.makeReplacements) {
  573. requestParams.queries.forEach((query: CloudWatchLogsRequest) => {
  574. const fieldsToReplace: Array<
  575. keyof (GetLogEventsRequest & StartQueryRequest & DescribeLogGroupsRequest & GetLogGroupFieldsRequest)
  576. > = ['queryString', 'logGroupNames', 'logGroupName', 'logGroupNamePrefix'];
  577. const anyQuery: any = query;
  578. for (const fieldName of fieldsToReplace) {
  579. if (query.hasOwnProperty(fieldName)) {
  580. if (Array.isArray(anyQuery[fieldName])) {
  581. anyQuery[fieldName] = anyQuery[fieldName].flatMap((val: string) => {
  582. if (fieldName === 'logGroupNames') {
  583. return this.expandVariableToArray(val, options.scopedVars || {});
  584. }
  585. return this.replace(val, options.scopedVars, true, fieldName);
  586. });
  587. } else {
  588. anyQuery[fieldName] = this.replace(anyQuery[fieldName], options.scopedVars, true, fieldName);
  589. }
  590. }
  591. }
  592. // TODO: seems to be some sort of bug that we don't really send region with all queries. This means
  593. // if you select different than default region in editor you will get results for autocomplete from wrong
  594. // region.
  595. if (anyQuery.region) {
  596. anyQuery.region = this.replace(anyQuery.region, options.scopedVars, true, 'region');
  597. anyQuery.region = this.getActualRegion(anyQuery.region);
  598. }
  599. });
  600. }
  601. const resultsToDataFrames = (val: any): DataFrame[] => toDataQueryResponse(val).data || [];
  602. let headers = {};
  603. if (options.skipCache) {
  604. headers = {
  605. 'X-Cache-Skip': true,
  606. };
  607. }
  608. return this.awsRequest(DS_QUERY_ENDPOINT, requestParams, headers).pipe(
  609. map((response) => resultsToDataFrames({ data: response })),
  610. catchError((err: FetchError) => {
  611. if (config.featureToggles.datasourceQueryMultiStatus && err.status === 207) {
  612. throw err;
  613. }
  614. if (err.status === 400) {
  615. throw err;
  616. }
  617. if ( {
  618. throw;
  619. } else if ( {
  620. // In PROD we do not supply .error
  621. throw;
  622. }
  623. throw err;
  624. })
  625. );
  626. }
  627. getRegions(): Promise<Array<{ label: string; value: string; text: string }>> {
  628. return this.doMetricResourceRequest('regions').then((regions: any) => [
  629. { label: 'default', value: 'default', text: 'default' },
  630. ...regions,
  631. ]);
  632. }
  633. getNamespaces() {
  634. return this.doMetricResourceRequest('namespaces');
  635. }
  636. async getMetrics(namespace: string | undefined, region?: string) {
  637. if (!namespace) {
  638. return [];
  639. }
  640. return this.doMetricResourceRequest('metrics', {
  641. region: this.templateSrv.replace(this.getActualRegion(region)),
  642. namespace: this.templateSrv.replace(namespace),
  643. });
  644. }
  645. async getAllMetrics(region: string): Promise<Array<{ metricName: string; namespace: string }>> {
  646. const values = await this.doMetricResourceRequest('all-metrics', {
  647. region: this.templateSrv.replace(this.getActualRegion(region)),
  648. });
  649. return => ({ metricName: v.value, namespace: v.text }));
  650. }
  651. async getDimensionKeys(
  652. namespace: string | undefined,
  653. region: string,
  654. dimensionFilters: Dimensions = {},
  655. metricName = ''
  656. ) {
  657. if (!namespace) {
  658. return [];
  659. }
  660. return this.doMetricResourceRequest('dimension-keys', {
  661. region: this.templateSrv.replace(this.getActualRegion(region)),
  662. namespace: this.templateSrv.replace(namespace),
  663. dimensionFilters: JSON.stringify(this.convertDimensionFormat(dimensionFilters, {})),
  664. metricName,
  665. });
  666. }
  667. async getDimensionValues(
  668. region: string,
  669. namespace: string | undefined,
  670. metricName: string | undefined,
  671. dimensionKey: string,
  672. filterDimensions: {}
  673. ) {
  674. if (!namespace || !metricName) {
  675. return [];
  676. }
  677. const values = await this.doMetricResourceRequest('dimension-values', {
  678. region: this.templateSrv.replace(this.getActualRegion(region)),
  679. namespace: this.templateSrv.replace(namespace),
  680. metricName: this.templateSrv.replace(metricName.trim()),
  681. dimensionKey: this.templateSrv.replace(dimensionKey),
  682. dimensions: JSON.stringify(this.convertDimensionFormat(filterDimensions, {})),
  683. });
  684. return values;
  685. }
  686. getEbsVolumeIds(region: string, instanceId: string) {
  687. return this.doMetricResourceRequest('ebs-volume-ids', {
  688. region: this.templateSrv.replace(this.getActualRegion(region)),
  689. instanceId: this.templateSrv.replace(instanceId),
  690. });
  691. }
  692. getEc2InstanceAttribute(region: string, attributeName: string, filters: any) {
  693. return this.doMetricResourceRequest('ec2-instance-attribute', {
  694. region: this.templateSrv.replace(this.getActualRegion(region)),
  695. attributeName: this.templateSrv.replace(attributeName),
  696. filters: JSON.stringify(this.convertMultiFilterFormat(filters, 'filter key')),
  697. });
  698. }
  699. getResourceARNs(region: string, resourceType: string, tags: any) {
  700. return this.doMetricResourceRequest('resource-arns', {
  701. region: this.templateSrv.replace(this.getActualRegion(region)),
  702. resourceType: this.templateSrv.replace(resourceType),
  703. tags: JSON.stringify(this.convertMultiFilterFormat(tags, 'tag name')),
  704. });
  705. }
  706. targetContainsTemplate(target: any) {
  707. return (
  708. this.templateSrv.containsTemplate(target.region) ||
  709. this.templateSrv.containsTemplate(target.namespace) ||
  710. this.templateSrv.containsTemplate(target.metricName) ||
  711. this.templateSrv.containsTemplate(target.expression!) ||
  712. target.logGroupNames?.some((logGroup: string) => this.templateSrv.containsTemplate(logGroup)) ||
  713. find(target.dimensions, (v, k) => this.templateSrv.containsTemplate(k) || this.templateSrv.containsTemplate(v))
  714. );
  715. }
  716. awsRequest(url: string, data: MetricRequest, headers: Record<string, any> = {}): Observable<TSDBResponse> {
  717. const options = {
  718. method: 'POST',
  719. url,
  720. data,
  721. headers,
  722. };
  723. return getBackendSrv()
  724. .fetch<TSDBResponse>(options)
  725. .pipe(map((result) =>;
  726. }
  727. getDefaultRegion() {
  728. return this.defaultRegion;
  729. }
  730. getActualRegion(region?: string) {
  731. if (region === 'default' || region === undefined || region === '') {
  732. return this.getDefaultRegion();
  733. }
  734. return region;
  735. }
  736. showContextToggle() {
  737. return true;
  738. }
  739. convertToCloudWatchTime(date: any, roundUp: any) {
  740. if (isString(date)) {
  741. date = dateMath.parse(date, roundUp);
  742. }
  743. return Math.round(date.valueOf() / 1000);
  744. }
  745. convertDimensionFormat(dimensions: Dimensions, scopedVars: ScopedVars) {
  746. return Object.entries(dimensions).reduce((result, [key, value]) => {
  747. key = this.replace(key, scopedVars, true, 'dimension keys');
  748. if (Array.isArray(value)) {
  749. return { ...result, [key]: value };
  750. }
  751. if (!value) {
  752. return { ...result, [key]: null };
  753. }
  754. const newValues = this.expandVariableToArray(value, scopedVars);
  755. return { ...result, [key]: newValues };
  756. }, {});
  757. }
  758. // get the value for a given template variable
  759. expandVariableToArray(value: string, scopedVars: ScopedVars): string[] {
  760. const variableName = this.templateSrv.getVariableName(value);
  761. const valueVar = this.templateSrv.getVariables().find(({ name }) => {
  762. return name === variableName;
  763. });
  764. if (variableName && valueVar) {
  765. if ((valueVar as unknown as VariableWithMultiSupport).multi) {
  766. return this.templateSrv.replace(value, scopedVars, 'pipe').split('|');
  767. }
  768. return [this.templateSrv.replace(value, scopedVars)];
  769. }
  770. return [value];
  771. }
  772. convertMultiFilterFormat(multiFilters: MultiFilters, fieldName?: string) {
  773. return Object.entries(multiFilters).reduce((result, [key, values]) => {
  774. key = this.replace(key, {}, true, fieldName);
  775. if (!values) {
  776. return { ...result, [key]: null };
  777. }
  778. const initialVal: string[] = [];
  779. const newValues = values.reduce((result, value) => {
  780. const vals = this.expandVariableToArray(value, {});
  781. return [...result, ...vals];
  782. }, initialVal);
  783. return { ...result, [key]: newValues };
  784. }, {});
  785. }
  786. replace(
  787. target?: string,
  788. scopedVars?: ScopedVars,
  789. displayErrorIfIsMultiTemplateVariable?: boolean,
  790. fieldName?: string
  791. ) {
  792. if (displayErrorIfIsMultiTemplateVariable && !!target) {
  793. const variable = this.templateSrv
  794. .getVariables()
  795. .find(({ name }) => name === this.templateSrv.getVariableName(target));
  796. if (variable && (variable as unknown as VariableWithMultiSupport).multi) {
  797. this.debouncedCustomAlert(
  798. 'CloudWatch templating error',
  799. `Multi template variables are not supported for ${fieldName || target}`
  800. );
  801. }
  802. }
  803. return this.templateSrv.replace(target, scopedVars);
  804. }
  805. getQueryDisplayText(query: CloudWatchQuery) {
  806. if (query.queryMode === 'Logs') {
  807. return query.expression ?? '';
  808. } else {
  809. return JSON.stringify(query);
  810. }
  811. }
  812. getTargetsByQueryMode = (targets: CloudWatchQuery[]) => {
  813. const logQueries: CloudWatchLogsQuery[] = [];
  814. const metricsQueries: CloudWatchMetricsQuery[] = [];
  815. const annotationQueries: CloudWatchAnnotationQuery[] = [];
  816. targets.forEach((query) => {
  817. if (isCloudWatchAnnotationQuery(query)) {
  818. annotationQueries.push(query);
  819. } else if (isCloudWatchLogsQuery(query)) {
  820. logQueries.push(query);
  821. } else {
  822. metricsQueries.push(query);
  823. }
  824. });
  825. return {
  826. logQueries,
  827. metricsQueries,
  828. annotationQueries,
  829. };
  830. };
  831. interpolateVariablesInQueries(queries: CloudWatchQuery[], scopedVars: ScopedVars): CloudWatchQuery[] {
  832. if (!queries.length) {
  833. return queries;
  834. }
  835. return => ({
  836. ...query,
  837. region: this.getActualRegion(this.replace(query.region, scopedVars)),
  838. ...(isCloudWatchMetricsQuery(query) && this.interpolateMetricsQueryVariables(query, scopedVars)),
  839. }));
  840. }
  841. interpolateMetricsQueryVariables(
  842. query: CloudWatchMetricsQuery,
  843. scopedVars: ScopedVars
  844. ): Pick<CloudWatchMetricsQuery, 'alias' | 'metricName' | 'namespace' | 'period' | 'dimensions' | 'sqlExpression'> {
  845. return {
  846. alias: this.replace(query.alias, scopedVars),
  847. metricName: this.replace(query.metricName, scopedVars),
  848. namespace: this.replace(query.namespace, scopedVars),
  849. period: this.replace(query.period, scopedVars),
  850. sqlExpression: this.replace(query.sqlExpression, scopedVars),
  851. dimensions: this.convertDimensionFormat(query.dimensions ?? {}, scopedVars),
  852. };
  853. }
  854. }
  855. function withTeardown<T = any>(observable: Observable<T>, onUnsubscribe: () => void): Observable<T> {
  856. return new Observable<T>((subscriber) => {
  857. const innerSub = observable.subscribe({
  858. next: (val) =>,
  859. error: (err) =>,
  860. complete: () => subscriber.complete(),
  861. });
  862. return () => {
  863. innerSub.unsubscribe();
  864. onUnsubscribe();
  865. };
  866. });
  867. }
  868. function parseLogGroupName(logIdentifier: string): string {
  869. const colonIndex = logIdentifier.lastIndexOf(':');
  870. return logIdentifier.slice(colonIndex + 1);
  871. }