datasource.ts 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. import { cloneDeep, find, first as _first, isNumber, isObject, isString, map as _map } from 'lodash';
  2. import { generate, lastValueFrom, Observable, of, throwError } from 'rxjs';
  3. import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty } from 'rxjs/operators';
  4. import { gte, lt, satisfies } from 'semver';
  5. import {
  6. DataFrame,
  7. DataLink,
  8. DataQueryRequest,
  9. DataQueryResponse,
  10. DataSourceApi,
  11. DataSourceInstanceSettings,
  12. DataSourceWithLogsContextSupport,
  13. DataSourceWithQueryImportSupport,
  14. DataSourceWithLogsVolumeSupport,
  15. DateTime,
  16. dateTime,
  17. Field,
  18. getDefaultTimeRange,
  19. AbstractQuery,
  20. getLogLevelFromKey,
  21. LogLevel,
  22. LogRowModel,
  23. MetricFindValue,
  24. ScopedVars,
  25. TimeRange,
  26. toUtc,
  27. } from '@grafana/data';
  28. import { BackendSrvRequest, getBackendSrv, getDataSourceSrv } from '@grafana/runtime';
  29. import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
  30. import { queryLogsVolume } from 'app/core/logs_model';
  31. import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
  32. import {
  33. BucketAggregation,
  34. isBucketAggregationWithField,
  35. } from './components/QueryEditor/BucketAggregationsEditor/aggregations';
  36. import { bucketAggregationConfig } from './components/QueryEditor/BucketAggregationsEditor/utils';
  37. import {
  38. isMetricAggregationWithField,
  39. isPipelineAggregationWithMultipleBucketPaths,
  40. Logs,
  41. } from './components/QueryEditor/MetricAggregationsEditor/aggregations';
  42. import { metricAggregationConfig } from './components/QueryEditor/MetricAggregationsEditor/utils';
  43. import { ElasticResponse } from './elastic_response';
  44. import { IndexPattern } from './index_pattern';
  45. import LanguageProvider from './language_provider';
  46. import { ElasticQueryBuilder } from './query_builder';
  47. import { defaultBucketAgg, hasMetricOfType } from './query_def';
  48. import { DataLinkConfig, ElasticsearchOptions, ElasticsearchQuery, TermsQuery } from './types';
  49. import { coerceESVersion, getScriptValue, isSupportedVersion } from './utils';
  50. // Those are metadata fields as defined in https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-fields.html#_identity_metadata_fields.
  51. // custom fields can start with underscores, therefore is not safe to exclude anything that starts with one.
  52. const ELASTIC_META_FIELDS = [
  53. '_index',
  54. '_type',
  55. '_id',
  56. '_source',
  57. '_size',
  58. '_field_names',
  59. '_ignored',
  60. '_routing',
  61. '_meta',
  62. ];
  63. export class ElasticDatasource
  64. extends DataSourceApi<ElasticsearchQuery, ElasticsearchOptions>
  65. implements
  66. DataSourceWithLogsContextSupport,
  67. DataSourceWithQueryImportSupport<ElasticsearchQuery>,
  68. DataSourceWithLogsVolumeSupport<ElasticsearchQuery>
  69. {
  70. basicAuth?: string;
  71. withCredentials?: boolean;
  72. url: string;
  73. name: string;
  74. index: string;
  75. timeField: string;
  76. esVersion: string;
  77. xpack: boolean;
  78. interval: string;
  79. maxConcurrentShardRequests?: number;
  80. queryBuilder: ElasticQueryBuilder;
  81. indexPattern: IndexPattern;
  82. logMessageField?: string;
  83. logLevelField?: string;
  84. dataLinks: DataLinkConfig[];
  85. languageProvider: LanguageProvider;
  86. includeFrozen: boolean;
  87. isProxyAccess: boolean;
  88. constructor(
  89. instanceSettings: DataSourceInstanceSettings<ElasticsearchOptions>,
  90. private readonly templateSrv: TemplateSrv = getTemplateSrv()
  91. ) {
  92. super(instanceSettings);
  93. this.basicAuth = instanceSettings.basicAuth;
  94. this.withCredentials = instanceSettings.withCredentials;
  95. this.url = instanceSettings.url!;
  96. this.name = instanceSettings.name;
  97. this.index = instanceSettings.database ?? '';
  98. this.isProxyAccess = instanceSettings.access === 'proxy';
  99. const settingsData = instanceSettings.jsonData || ({} as ElasticsearchOptions);
  100. this.timeField = settingsData.timeField;
  101. this.esVersion = coerceESVersion(settingsData.esVersion);
  102. this.xpack = Boolean(settingsData.xpack);
  103. this.indexPattern = new IndexPattern(this.index, settingsData.interval);
  104. this.interval = settingsData.timeInterval;
  105. this.maxConcurrentShardRequests = settingsData.maxConcurrentShardRequests;
  106. this.queryBuilder = new ElasticQueryBuilder({
  107. timeField: this.timeField,
  108. esVersion: this.esVersion,
  109. });
  110. this.logMessageField = settingsData.logMessageField || '';
  111. this.logLevelField = settingsData.logLevelField || '';
  112. this.dataLinks = settingsData.dataLinks || [];
  113. this.includeFrozen = settingsData.includeFrozen ?? false;
  114. if (this.logMessageField === '') {
  115. this.logMessageField = undefined;
  116. }
  117. if (this.logLevelField === '') {
  118. this.logLevelField = undefined;
  119. }
  120. this.languageProvider = new LanguageProvider(this);
  121. }
  122. private request(
  123. method: string,
  124. url: string,
  125. data?: undefined,
  126. headers?: BackendSrvRequest['headers']
  127. ): Observable<any> {
  128. if (!this.isProxyAccess) {
  129. const error = new Error(
  130. 'Browser access mode in the Elasticsearch datasource is no longer available. Switch to server access mode.'
  131. );
  132. return throwError(() => error);
  133. }
  134. if (!isSupportedVersion(this.esVersion)) {
  135. const error = new Error(
  136. 'Support for Elasticsearch versions after their end-of-life (currently versions < 7.10) was removed.'
  137. );
  138. return throwError(() => error);
  139. }
  140. const options: BackendSrvRequest = {
  141. url: this.url + '/' + url,
  142. method,
  143. data,
  144. headers,
  145. };
  146. if (this.basicAuth || this.withCredentials) {
  147. options.withCredentials = true;
  148. }
  149. if (this.basicAuth) {
  150. options.headers = {
  151. Authorization: this.basicAuth,
  152. };
  153. }
  154. return getBackendSrv()
  155. .fetch<any>(options)
  156. .pipe(
  157. map((results) => {
  158. results.data.$$config = results.config;
  159. return results.data;
  160. }),
  161. catchError((err) => {
  162. if (err.data) {
  163. const message = err.data.error?.reason ?? err.data.message ?? 'Unknown error';
  164. return throwError({
  165. message: 'Elasticsearch error: ' + message,
  166. error: err.data.error,
  167. });
  168. }
  169. return throwError(err);
  170. })
  171. );
  172. }
  173. async importFromAbstractQueries(abstractQueries: AbstractQuery[]): Promise<ElasticsearchQuery[]> {
  174. return abstractQueries.map((abstractQuery) => this.languageProvider.importFromAbstractQuery(abstractQuery));
  175. }
  176. /**
  177. * Sends a GET request to the specified url on the newest matching and available index.
  178. *
  179. * When multiple indices span the provided time range, the request is sent starting from the newest index,
  180. * and then going backwards until an index is found.
  181. *
  182. * @param url the url to query the index on, for example `/_mapping`.
  183. */
  184. private get(url: string, range = getDefaultTimeRange()): Observable<any> {
  185. let indexList = this.indexPattern.getIndexList(range.from, range.to);
  186. if (!Array.isArray(indexList)) {
  187. indexList = [this.indexPattern.getIndexForToday()];
  188. }
  189. const indexUrlList = indexList.map((index) => index + url);
  190. return this.requestAllIndices(indexUrlList);
  191. }
  192. private requestAllIndices(indexList: string[]): Observable<any> {
  193. const maxTraversals = 7; // do not go beyond one week (for a daily pattern)
  194. const listLen = indexList.length;
  195. return generate({
  196. initialState: 0,
  197. condition: (i) => i < Math.min(listLen, maxTraversals),
  198. iterate: (i) => i + 1,
  199. }).pipe(
  200. mergeMap((index) => {
  201. // catch all errors and emit an object with an err property to simplify checks later in the pipeline
  202. return this.request('GET', indexList[listLen - index - 1]).pipe(catchError((err) => of({ err })));
  203. }),
  204. skipWhile((resp) => resp?.err?.status === 404), // skip all requests that fail because missing Elastic index
  205. throwIfEmpty(() => 'Could not find an available index for this time range.'), // when i === Math.min(listLen, maxTraversals) generate will complete but without emitting any values which means we didn't find a valid index
  206. first(), // take the first value that isn't skipped
  207. map((resp) => {
  208. if (resp.err) {
  209. throw resp.err; // if there is some other error except 404 then we must throw it
  210. }
  211. return resp;
  212. })
  213. );
  214. }
  215. private post(url: string, data: any): Observable<any> {
  216. return this.request('POST', url, data, { 'Content-Type': 'application/x-ndjson' });
  217. }
  218. annotationQuery(options: any): Promise<any> {
  219. const annotation = options.annotation;
  220. const timeField = annotation.timeField || '@timestamp';
  221. const timeEndField = annotation.timeEndField || null;
  222. const queryString = annotation.query;
  223. const tagsField = annotation.tagsField || 'tags';
  224. const textField = annotation.textField || null;
  225. const dateRanges = [];
  226. const rangeStart: any = {};
  227. rangeStart[timeField] = {
  228. from: options.range.from.valueOf(),
  229. to: options.range.to.valueOf(),
  230. format: 'epoch_millis',
  231. };
  232. dateRanges.push({ range: rangeStart });
  233. if (timeEndField) {
  234. const rangeEnd: any = {};
  235. rangeEnd[timeEndField] = {
  236. from: options.range.from.valueOf(),
  237. to: options.range.to.valueOf(),
  238. format: 'epoch_millis',
  239. };
  240. dateRanges.push({ range: rangeEnd });
  241. }
  242. const queryInterpolated = this.interpolateLuceneQuery(queryString);
  243. const query: any = {
  244. bool: {
  245. filter: [
  246. {
  247. bool: {
  248. should: dateRanges,
  249. minimum_should_match: 1,
  250. },
  251. },
  252. ],
  253. },
  254. };
  255. if (queryInterpolated) {
  256. query.bool.filter.push({
  257. query_string: {
  258. query: queryInterpolated,
  259. },
  260. });
  261. }
  262. const data: any = {
  263. query,
  264. size: 10000,
  265. };
  266. // fields field not supported on ES 5.x
  267. if (lt(this.esVersion, '5.0.0')) {
  268. data['fields'] = [timeField, '_source'];
  269. }
  270. const header: any = {
  271. search_type: 'query_then_fetch',
  272. ignore_unavailable: true,
  273. };
  274. // old elastic annotations had index specified on them
  275. if (annotation.index) {
  276. header.index = annotation.index;
  277. } else {
  278. header.index = this.indexPattern.getIndexList(options.range.from, options.range.to);
  279. }
  280. const payload = JSON.stringify(header) + '\n' + JSON.stringify(data) + '\n';
  281. return lastValueFrom(
  282. this.post('_msearch', payload).pipe(
  283. map((res) => {
  284. const list = [];
  285. const hits = res.responses[0].hits.hits;
  286. const getFieldFromSource = (source: any, fieldName: any) => {
  287. if (!fieldName) {
  288. return;
  289. }
  290. const fieldNames = fieldName.split('.');
  291. let fieldValue = source;
  292. for (let i = 0; i < fieldNames.length; i++) {
  293. fieldValue = fieldValue[fieldNames[i]];
  294. if (!fieldValue) {
  295. console.log('could not find field in annotation: ', fieldName);
  296. return '';
  297. }
  298. }
  299. return fieldValue;
  300. };
  301. for (let i = 0; i < hits.length; i++) {
  302. const source = hits[i]._source;
  303. let time = getFieldFromSource(source, timeField);
  304. if (typeof hits[i].fields !== 'undefined') {
  305. const fields = hits[i].fields;
  306. if (isString(fields[timeField]) || isNumber(fields[timeField])) {
  307. time = fields[timeField];
  308. }
  309. }
  310. const event: {
  311. annotation: any;
  312. time: number;
  313. timeEnd?: number;
  314. text: string;
  315. tags: string | string[];
  316. } = {
  317. annotation: annotation,
  318. time: toUtc(time).valueOf(),
  319. text: getFieldFromSource(source, textField),
  320. tags: getFieldFromSource(source, tagsField),
  321. };
  322. if (timeEndField) {
  323. const timeEnd = getFieldFromSource(source, timeEndField);
  324. if (timeEnd) {
  325. event.timeEnd = toUtc(timeEnd).valueOf();
  326. }
  327. }
  328. // legacy support for title tield
  329. if (annotation.titleField) {
  330. const title = getFieldFromSource(source, annotation.titleField);
  331. if (title) {
  332. event.text = title + '\n' + event.text;
  333. }
  334. }
  335. if (typeof event.tags === 'string') {
  336. event.tags = event.tags.split(',');
  337. }
  338. list.push(event);
  339. }
  340. return list;
  341. })
  342. )
  343. );
  344. }
  345. private interpolateLuceneQuery(queryString: string, scopedVars?: ScopedVars) {
  346. return this.templateSrv.replace(queryString, scopedVars, 'lucene');
  347. }
  348. interpolateVariablesInQueries(queries: ElasticsearchQuery[], scopedVars: ScopedVars): ElasticsearchQuery[] {
  349. // We need a separate interpolation format for lucene queries, therefore we first interpolate any
  350. // lucene query string and then everything else
  351. const interpolateBucketAgg = (bucketAgg: BucketAggregation): BucketAggregation => {
  352. if (bucketAgg.type === 'filters') {
  353. return {
  354. ...bucketAgg,
  355. settings: {
  356. ...bucketAgg.settings,
  357. filters: bucketAgg.settings?.filters?.map((filter) => ({
  358. ...filter,
  359. query: this.interpolateLuceneQuery(filter.query, scopedVars) || '*',
  360. })),
  361. },
  362. };
  363. }
  364. return bucketAgg;
  365. };
  366. const expandedQueries = queries.map(
  367. (query): ElasticsearchQuery => ({
  368. ...query,
  369. datasource: this.getRef(),
  370. query: this.interpolateLuceneQuery(query.query || '', scopedVars),
  371. bucketAggs: query.bucketAggs?.map(interpolateBucketAgg),
  372. })
  373. );
  374. const finalQueries: ElasticsearchQuery[] = JSON.parse(
  375. this.templateSrv.replace(JSON.stringify(expandedQueries), scopedVars)
  376. );
  377. return finalQueries;
  378. }
  379. testDatasource() {
  380. // validate that the index exist and has date field
  381. return lastValueFrom(
  382. this.getFields(['date']).pipe(
  383. mergeMap((dateFields) => {
  384. const timeField: any = find(dateFields, { text: this.timeField });
  385. if (!timeField) {
  386. return of({ status: 'error', message: 'No date field named ' + this.timeField + ' found' });
  387. }
  388. return of({ status: 'success', message: 'Index OK. Time field name OK.' });
  389. }),
  390. catchError((err) => {
  391. console.error(err);
  392. if (err.message) {
  393. return of({ status: 'error', message: err.message });
  394. } else {
  395. return of({ status: 'error', message: err.status });
  396. }
  397. })
  398. )
  399. );
  400. }
  401. getQueryHeader(searchType: any, timeFrom?: DateTime, timeTo?: DateTime): string {
  402. const queryHeader: any = {
  403. search_type: searchType,
  404. ignore_unavailable: true,
  405. index: this.indexPattern.getIndexList(timeFrom, timeTo),
  406. };
  407. if (satisfies(this.esVersion, '>=5.6.0 <7.0.0')) {
  408. queryHeader['max_concurrent_shard_requests'] = this.maxConcurrentShardRequests;
  409. }
  410. return JSON.stringify(queryHeader);
  411. }
  412. getQueryDisplayText(query: ElasticsearchQuery) {
  413. // TODO: This might be refactored a bit.
  414. const metricAggs = query.metrics;
  415. const bucketAggs = query.bucketAggs;
  416. let text = '';
  417. if (query.query) {
  418. text += 'Query: ' + query.query + ', ';
  419. }
  420. text += 'Metrics: ';
  421. text += metricAggs?.reduce((acc, metric) => {
  422. const metricConfig = metricAggregationConfig[metric.type];
  423. let text = metricConfig.label + '(';
  424. if (isMetricAggregationWithField(metric)) {
  425. text += metric.field;
  426. }
  427. if (isPipelineAggregationWithMultipleBucketPaths(metric)) {
  428. text += getScriptValue(metric).replace(new RegExp('params.', 'g'), '');
  429. }
  430. text += '), ';
  431. return `${acc} ${text}`;
  432. }, '');
  433. text += bucketAggs?.reduce((acc, bucketAgg, index) => {
  434. const bucketConfig = bucketAggregationConfig[bucketAgg.type];
  435. let text = '';
  436. if (index === 0) {
  437. text += ' Group by: ';
  438. }
  439. text += bucketConfig.label + '(';
  440. if (isBucketAggregationWithField(bucketAgg)) {
  441. text += bucketAgg.field;
  442. }
  443. return `${acc} ${text}), `;
  444. }, '');
  445. if (query.alias) {
  446. text += 'Alias: ' + query.alias;
  447. }
  448. return text;
  449. }
  450. /**
  451. * This method checks to ensure the user is running a 5.0+ cluster. This is
  452. * necessary bacause the query being used for the getLogRowContext relies on the
  453. * search_after feature.
  454. */
  455. showContextToggle(): boolean {
  456. return gte(this.esVersion, '5.0.0');
  457. }
  458. getLogRowContext = async (row: LogRowModel, options?: RowContextOptions): Promise<{ data: DataFrame[] }> => {
  459. const sortField = row.dataFrame.fields.find((f) => f.name === 'sort');
  460. const searchAfter = sortField?.values.get(row.rowIndex) || [row.timeEpochMs];
  461. const sort = options?.direction === 'FORWARD' ? 'asc' : 'desc';
  462. const header =
  463. options?.direction === 'FORWARD'
  464. ? this.getQueryHeader('query_then_fetch', dateTime(row.timeEpochMs))
  465. : this.getQueryHeader('query_then_fetch', undefined, dateTime(row.timeEpochMs));
  466. const limit = options?.limit ?? 10;
  467. const esQuery = JSON.stringify({
  468. size: limit,
  469. query: {
  470. bool: {
  471. filter: [
  472. {
  473. range: {
  474. [this.timeField]: {
  475. [options?.direction === 'FORWARD' ? 'gte' : 'lte']: row.timeEpochMs,
  476. format: 'epoch_millis',
  477. },
  478. },
  479. },
  480. ],
  481. },
  482. },
  483. sort: [{ [this.timeField]: sort }, { _doc: sort }],
  484. search_after: searchAfter,
  485. });
  486. const payload = [header, esQuery].join('\n') + '\n';
  487. const url = this.getMultiSearchUrl();
  488. const response = await lastValueFrom(this.post(url, payload));
  489. const targets: ElasticsearchQuery[] = [{ refId: `${row.dataFrame.refId}`, metrics: [{ type: 'logs', id: '1' }] }];
  490. const elasticResponse = new ElasticResponse(targets, transformHitsBasedOnDirection(response, sort));
  491. const logResponse = elasticResponse.getLogs(this.logMessageField, this.logLevelField);
  492. const dataFrame = _first(logResponse.data);
  493. if (!dataFrame) {
  494. return { data: [] };
  495. }
  496. /**
  497. * The LogRowContextProvider requires there is a field in the dataFrame.fields
  498. * named `ts` for timestamp and `line` for the actual log line to display.
  499. * Unfortunatly these fields are hardcoded and are required for the lines to
  500. * be properly displayed. This code just copies the fields based on this.timeField
  501. * and this.logMessageField and recreates the dataFrame so it works.
  502. */
  503. const timestampField = dataFrame.fields.find((f: Field) => f.name === this.timeField);
  504. const lineField = dataFrame.fields.find((f: Field) => f.name === this.logMessageField);
  505. if (timestampField && lineField) {
  506. return {
  507. data: [
  508. {
  509. ...dataFrame,
  510. fields: [...dataFrame.fields, { ...timestampField, name: 'ts' }, { ...lineField, name: 'line' }],
  511. },
  512. ],
  513. };
  514. }
  515. return logResponse;
  516. };
  517. getLogsVolumeDataProvider(request: DataQueryRequest<ElasticsearchQuery>): Observable<DataQueryResponse> | undefined {
  518. const isLogsVolumeAvailable = request.targets.some((target) => {
  519. return target.metrics?.length === 1 && target.metrics[0].type === 'logs';
  520. });
  521. if (!isLogsVolumeAvailable) {
  522. return undefined;
  523. }
  524. const logsVolumeRequest = cloneDeep(request);
  525. logsVolumeRequest.targets = logsVolumeRequest.targets.map((target) => {
  526. const bucketAggs: BucketAggregation[] = [];
  527. const timeField = this.timeField ?? '@timestamp';
  528. if (this.logLevelField) {
  529. bucketAggs.push({
  530. id: '2',
  531. type: 'terms',
  532. settings: {
  533. min_doc_count: '0',
  534. size: '0',
  535. order: 'desc',
  536. orderBy: '_count',
  537. missing: LogLevel.unknown,
  538. },
  539. field: this.logLevelField,
  540. });
  541. }
  542. bucketAggs.push({
  543. id: '3',
  544. type: 'date_histogram',
  545. settings: {
  546. interval: 'auto',
  547. min_doc_count: '0',
  548. trimEdges: '0',
  549. },
  550. field: timeField,
  551. });
  552. const logsVolumeQuery: ElasticsearchQuery = {
  553. refId: target.refId,
  554. query: target.query,
  555. metrics: [{ type: 'count', id: '1' }],
  556. timeField,
  557. bucketAggs,
  558. };
  559. return logsVolumeQuery;
  560. });
  561. return queryLogsVolume(this, logsVolumeRequest, {
  562. range: request.range,
  563. targets: request.targets,
  564. extractLevel: (dataFrame) => getLogLevelFromKey(dataFrame.name || ''),
  565. });
  566. }
  567. query(options: DataQueryRequest<ElasticsearchQuery>): Observable<DataQueryResponse> {
  568. let payload = '';
  569. const targets = this.interpolateVariablesInQueries(cloneDeep(options.targets), options.scopedVars);
  570. const sentTargets: ElasticsearchQuery[] = [];
  571. let targetsContainsLogsQuery = targets.some((target) => hasMetricOfType(target, 'logs'));
  572. // add global adhoc filters to timeFilter
  573. const adhocFilters = this.templateSrv.getAdhocFilters(this.name);
  574. const logLimits: Array<number | undefined> = [];
  575. for (const target of targets) {
  576. if (target.hide) {
  577. continue;
  578. }
  579. let queryObj;
  580. if (hasMetricOfType(target, 'logs')) {
  581. // FIXME: All this logic here should be in the query builder.
  582. // When moving to the BE-only implementation we should remove this and let the BE
  583. // Handle this.
  584. // TODO: defaultBucketAgg creates a dete_histogram aggregation without a field, so it fallbacks to
  585. // the configured timeField. we should allow people to use a different time field here.
  586. target.bucketAggs = [defaultBucketAgg()];
  587. const log = target.metrics?.find((m) => m.type === 'logs') as Logs;
  588. const limit = log.settings?.limit ? parseInt(log.settings?.limit, 10) : 500;
  589. logLimits.push(limit);
  590. target.metrics = [];
  591. // Setting this for metrics queries that are typed as logs
  592. queryObj = this.queryBuilder.getLogsQuery(target, limit, adhocFilters);
  593. } else {
  594. logLimits.push();
  595. if (target.alias) {
  596. target.alias = this.interpolateLuceneQuery(target.alias, options.scopedVars);
  597. }
  598. queryObj = this.queryBuilder.build(target, adhocFilters);
  599. }
  600. const esQuery = JSON.stringify(queryObj);
  601. const searchType = queryObj.size === 0 && lt(this.esVersion, '5.0.0') ? 'count' : 'query_then_fetch';
  602. const header = this.getQueryHeader(searchType, options.range.from, options.range.to);
  603. payload += header + '\n';
  604. payload += esQuery + '\n';
  605. sentTargets.push(target);
  606. }
  607. if (sentTargets.length === 0) {
  608. return of({ data: [] });
  609. }
  610. // We replace the range here for actual values. We need to replace it together with enclosing "" so that we replace
  611. // it as an integer not as string with digits. This is because elastic will convert the string only if the time
  612. // field is specified as type date (which probably should) but can also be specified as integer (millisecond epoch)
  613. // and then sending string will error out.
  614. payload = payload.replace(/"\$timeFrom"/g, options.range.from.valueOf().toString());
  615. payload = payload.replace(/"\$timeTo"/g, options.range.to.valueOf().toString());
  616. payload = this.templateSrv.replace(payload, options.scopedVars);
  617. const url = this.getMultiSearchUrl();
  618. return this.post(url, payload).pipe(
  619. map((res) => {
  620. const er = new ElasticResponse(sentTargets, res);
  621. // TODO: This needs to be revisited, it seems wrong to process ALL the sent queries as logs if only one of them was a log query
  622. if (targetsContainsLogsQuery) {
  623. const response = er.getLogs(this.logMessageField, this.logLevelField);
  624. response.data.forEach((dataFrame, index) => {
  625. enhanceDataFrame(dataFrame, this.dataLinks, logLimits[index]);
  626. });
  627. return response;
  628. }
  629. return er.getTimeSeries();
  630. })
  631. );
  632. }
  633. isMetadataField(fieldName: string) {
  634. return ELASTIC_META_FIELDS.includes(fieldName);
  635. }
  636. // TODO: instead of being a string, this could be a custom type representing all the elastic types
  637. // FIXME: This doesn't seem to return actual MetricFindValues, we should either change the return type
  638. // or fix the implementation.
  639. getFields(type?: string[], range?: TimeRange): Observable<MetricFindValue[]> {
  640. const typeMap: Record<string, string> = {
  641. float: 'number',
  642. double: 'number',
  643. integer: 'number',
  644. long: 'number',
  645. date: 'date',
  646. date_nanos: 'date',
  647. string: 'string',
  648. text: 'string',
  649. scaled_float: 'number',
  650. nested: 'nested',
  651. histogram: 'number',
  652. };
  653. return this.get('/_mapping', range).pipe(
  654. map((result) => {
  655. const shouldAddField = (obj: any, key: string) => {
  656. if (this.isMetadataField(key)) {
  657. return false;
  658. }
  659. if (!type || type.length === 0) {
  660. return true;
  661. }
  662. // equal query type filter, or via typemap translation
  663. return type.includes(obj.type) || type.includes(typeMap[obj.type]);
  664. };
  665. // Store subfield names: [system, process, cpu, total] -> system.process.cpu.total
  666. const fieldNameParts: any = [];
  667. const fields: any = {};
  668. function getFieldsRecursively(obj: any) {
  669. for (const key in obj) {
  670. const subObj = obj[key];
  671. // Check mapping field for nested fields
  672. if (isObject(subObj.properties)) {
  673. fieldNameParts.push(key);
  674. getFieldsRecursively(subObj.properties);
  675. }
  676. if (isObject(subObj.fields)) {
  677. fieldNameParts.push(key);
  678. getFieldsRecursively(subObj.fields);
  679. }
  680. if (isString(subObj.type)) {
  681. const fieldName = fieldNameParts.concat(key).join('.');
  682. // Hide meta-fields and check field type
  683. if (shouldAddField(subObj, key)) {
  684. fields[fieldName] = {
  685. text: fieldName,
  686. type: subObj.type,
  687. };
  688. }
  689. }
  690. }
  691. fieldNameParts.pop();
  692. }
  693. for (const indexName in result) {
  694. const index = result[indexName];
  695. if (index && index.mappings) {
  696. const mappings = index.mappings;
  697. if (lt(this.esVersion, '7.0.0')) {
  698. for (const typeName in mappings) {
  699. const properties = mappings[typeName].properties;
  700. getFieldsRecursively(properties);
  701. }
  702. } else {
  703. const properties = mappings.properties;
  704. getFieldsRecursively(properties);
  705. }
  706. }
  707. }
  708. // transform to array
  709. return _map(fields, (value) => {
  710. return value;
  711. });
  712. })
  713. );
  714. }
  715. getTerms(queryDef: TermsQuery, range = getDefaultTimeRange()): Observable<MetricFindValue[]> {
  716. const searchType = gte(this.esVersion, '5.0.0') ? 'query_then_fetch' : 'count';
  717. const header = this.getQueryHeader(searchType, range.from, range.to);
  718. let esQuery = JSON.stringify(this.queryBuilder.getTermsQuery(queryDef));
  719. esQuery = esQuery.replace(/\$timeFrom/g, range.from.valueOf().toString());
  720. esQuery = esQuery.replace(/\$timeTo/g, range.to.valueOf().toString());
  721. esQuery = header + '\n' + esQuery + '\n';
  722. const url = this.getMultiSearchUrl();
  723. return this.post(url, esQuery).pipe(
  724. map((res) => {
  725. if (!res.responses[0].aggregations) {
  726. return [];
  727. }
  728. const buckets = res.responses[0].aggregations['1'].buckets;
  729. return _map(buckets, (bucket) => {
  730. return {
  731. text: bucket.key_as_string || bucket.key,
  732. value: bucket.key,
  733. };
  734. });
  735. })
  736. );
  737. }
  738. getMultiSearchUrl() {
  739. const searchParams = new URLSearchParams();
  740. if (gte(this.esVersion, '7.0.0') && this.maxConcurrentShardRequests) {
  741. searchParams.append('max_concurrent_shard_requests', `${this.maxConcurrentShardRequests}`);
  742. }
  743. if (gte(this.esVersion, '6.6.0') && this.xpack && this.includeFrozen) {
  744. searchParams.append('ignore_throttled', 'false');
  745. }
  746. return ('_msearch?' + searchParams.toString()).replace(/\?$/, '');
  747. }
  748. metricFindQuery(query: string, options?: any): Promise<MetricFindValue[]> {
  749. const range = options?.range;
  750. const parsedQuery = JSON.parse(query);
  751. if (query) {
  752. if (parsedQuery.find === 'fields') {
  753. parsedQuery.type = this.interpolateLuceneQuery(parsedQuery.type);
  754. return lastValueFrom(this.getFields(parsedQuery.type, range));
  755. }
  756. if (parsedQuery.find === 'terms') {
  757. parsedQuery.field = this.interpolateLuceneQuery(parsedQuery.field);
  758. parsedQuery.query = this.interpolateLuceneQuery(parsedQuery.query);
  759. return lastValueFrom(this.getTerms(parsedQuery, range));
  760. }
  761. }
  762. return Promise.resolve([]);
  763. }
  764. getTagKeys() {
  765. return lastValueFrom(this.getFields());
  766. }
  767. getTagValues(options: any) {
  768. return lastValueFrom(this.getTerms({ field: options.key }));
  769. }
  770. targetContainsTemplate(target: any) {
  771. if (this.templateSrv.containsTemplate(target.query) || this.templateSrv.containsTemplate(target.alias)) {
  772. return true;
  773. }
  774. for (const bucketAgg of target.bucketAggs) {
  775. if (this.templateSrv.containsTemplate(bucketAgg.field) || this.objectContainsTemplate(bucketAgg.settings)) {
  776. return true;
  777. }
  778. }
  779. for (const metric of target.metrics) {
  780. if (
  781. this.templateSrv.containsTemplate(metric.field) ||
  782. this.objectContainsTemplate(metric.settings) ||
  783. this.objectContainsTemplate(metric.meta)
  784. ) {
  785. return true;
  786. }
  787. }
  788. return false;
  789. }
  790. private isPrimitive(obj: any) {
  791. if (obj === null || obj === undefined) {
  792. return true;
  793. }
  794. if (['string', 'number', 'boolean'].some((type) => type === typeof true)) {
  795. return true;
  796. }
  797. return false;
  798. }
  799. private objectContainsTemplate(obj: any) {
  800. if (!obj) {
  801. return false;
  802. }
  803. for (const key of Object.keys(obj)) {
  804. if (this.isPrimitive(obj[key])) {
  805. if (this.templateSrv.containsTemplate(obj[key])) {
  806. return true;
  807. }
  808. } else if (Array.isArray(obj[key])) {
  809. for (const item of obj[key]) {
  810. if (this.objectContainsTemplate(item)) {
  811. return true;
  812. }
  813. }
  814. } else {
  815. if (this.objectContainsTemplate(obj[key])) {
  816. return true;
  817. }
  818. }
  819. }
  820. return false;
  821. }
  822. }
  823. /**
  824. * Modifies dataframe and adds dataLinks from the config.
  825. * Exported for tests.
  826. */
  827. export function enhanceDataFrame(dataFrame: DataFrame, dataLinks: DataLinkConfig[], limit?: number) {
  828. if (limit) {
  829. dataFrame.meta = {
  830. ...dataFrame.meta,
  831. limit,
  832. };
  833. }
  834. if (!dataLinks.length) {
  835. return;
  836. }
  837. for (const field of dataFrame.fields) {
  838. const linksToApply = dataLinks.filter((dataLink) => new RegExp(dataLink.field).test(field.name));
  839. if (linksToApply.length === 0) {
  840. continue;
  841. }
  842. field.config = field.config || {};
  843. field.config.links = [...(field.config.links || [], linksToApply.map(generateDataLink))];
  844. }
  845. }
  846. function generateDataLink(linkConfig: DataLinkConfig): DataLink {
  847. const dataSourceSrv = getDataSourceSrv();
  848. if (linkConfig.datasourceUid) {
  849. const dsSettings = dataSourceSrv.getInstanceSettings(linkConfig.datasourceUid);
  850. return {
  851. title: linkConfig.urlDisplayLabel || '',
  852. url: '',
  853. internal: {
  854. query: { query: linkConfig.url },
  855. datasourceUid: linkConfig.datasourceUid,
  856. datasourceName: dsSettings?.name ?? 'Data source not found',
  857. },
  858. };
  859. } else {
  860. return {
  861. title: linkConfig.urlDisplayLabel || '',
  862. url: linkConfig.url,
  863. };
  864. }
  865. }
  866. function transformHitsBasedOnDirection(response: any, direction: 'asc' | 'desc') {
  867. if (direction === 'desc') {
  868. return response;
  869. }
  870. const actualResponse = response.responses[0];
  871. return {
  872. ...response,
  873. responses: [
  874. {
  875. ...actualResponse,
  876. hits: {
  877. ...actualResponse.hits,
  878. hits: actualResponse.hits.hits.reverse(),
  879. },
  880. },
  881. ],
  882. };
  883. }