result_transformer.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. import { capitalize, groupBy, isEmpty } from 'lodash';
  2. import { of } from 'rxjs';
  3. import { v5 as uuidv5 } from 'uuid';
  4. import {
  5. FieldType,
  6. TimeSeries,
  7. Labels,
  8. DataFrame,
  9. ArrayVector,
  10. MutableDataFrame,
  11. findUniqueLabels,
  12. DataFrameView,
  13. DataLink,
  14. Field,
  15. QueryResultMetaStat,
  16. QueryResultMeta,
  17. TimeSeriesValue,
  18. ScopedVars,
  19. toDataFrame,
  20. } from '@grafana/data';
  21. import { getTemplateSrv, getDataSourceSrv } from '@grafana/runtime';
  22. import TableModel from 'app/core/table_model';
  23. import { renderLegendFormat } from '../prometheus/legend';
  24. import { formatQuery, getHighlighterExpressionsFromQuery } from './query_utils';
  25. import { dataFrameHasLokiError } from './responseUtils';
  26. import {
  27. LokiRangeQueryRequest,
  28. LokiResponse,
  29. LokiMatrixResult,
  30. LokiVectorResult,
  31. TransformerOptions,
  32. LokiResultType,
  33. LokiStreamResult,
  34. LokiTailResponse,
  35. LokiQuery,
  36. LokiOptions,
  37. DerivedFieldConfig,
  38. LokiStreamResponse,
  39. LokiStats,
  40. } from './types';
  41. const UUID_NAMESPACE = '6ec946da-0f49-47a8-983a-1d76d17e7c92';
  42. /**
  43. * Transforms LokiStreamResult structure into a dataFrame. Used when doing standard queries
  44. */
  45. export function lokiStreamsToRawDataFrame(streams: LokiStreamResult[], refId?: string): DataFrame {
  46. const labels = new ArrayVector<{}>([]);
  47. const times = new ArrayVector<string>([]);
  48. const timesNs = new ArrayVector<string>([]);
  49. const lines = new ArrayVector<string>([]);
  50. const uids = new ArrayVector<string>([]);
  51. // We need to store and track all used uids to ensure that uids are unique
  52. const usedUids: { string?: number } = {};
  53. for (const stream of streams) {
  54. const streamLabels: Labels = stream.stream;
  55. const labelsString = Object.entries(streamLabels)
  56. .map(([key, val]) => `${key}="${val}"`)
  57. .sort()
  58. .join('');
  59. for (const [ts, line] of stream.values) {
  60. labels.add(streamLabels);
  61. // num ns epoch in string, we convert it to iso string here so it matches old format
  62. times.add(new Date(parseInt(ts.slice(0, -6), 10)).toISOString());
  63. timesNs.add(ts);
  64. lines.add(line);
  65. uids.add(createUid(ts, labelsString, line, usedUids, refId));
  66. }
  67. }
  68. return constructDataFrame(times, timesNs, lines, uids, labels, refId);
  69. }
  70. /**
  71. * Constructs dataFrame with supplied fields and other data.
  72. */
  73. function constructDataFrame(
  74. times: ArrayVector<string>,
  75. timesNs: ArrayVector<string>,
  76. lines: ArrayVector<string>,
  77. uids: ArrayVector<string>,
  78. labels: ArrayVector<{}>,
  79. refId?: string
  80. ) {
  81. const dataFrame = {
  82. refId,
  83. fields: [
  84. { name: 'labels', type: FieldType.other, config: {}, values: labels },
  85. { name: 'Time', type: FieldType.time, config: {}, values: times }, // Time
  86. { name: 'Line', type: FieldType.string, config: {}, values: lines }, // Line - needs to be the first field with string type
  87. { name: 'tsNs', type: FieldType.time, config: {}, values: timesNs }, // Time
  88. { name: 'id', type: FieldType.string, config: {}, values: uids },
  89. ],
  90. length: times.length,
  91. };
  92. return dataFrame;
  93. }
  94. /**
  95. * Transform LokiResponse data and appends it to MutableDataFrame. Used for streaming where the dataFrame can be
  96. * a CircularDataFrame creating a fixed size rolling buffer.
  97. * TODO: Probably could be unified with the logStreamToDataFrame function.
  98. * @param response
  99. * @param data Needs to have ts, line, labels, id as fields
  100. */
  101. export function appendResponseToBufferedData(response: LokiTailResponse, data: MutableDataFrame) {
  102. // Should we do anything with: response.dropped_entries?
  103. const streams: LokiStreamResult[] = response.streams;
  104. if (!streams || !streams.length) {
  105. return;
  106. }
  107. let baseLabels: Labels = {};
  108. for (const f of data.fields) {
  109. if (f.type === FieldType.string) {
  110. if (f.labels) {
  111. baseLabels = f.labels;
  112. }
  113. break;
  114. }
  115. }
  116. const labelsField = data.fields[0];
  117. const tsField = data.fields[1];
  118. const lineField = data.fields[2];
  119. const idField = data.fields[3];
  120. const tsNsField = data.fields[4];
  121. // We are comparing used ids only within the received stream. This could be a problem if the same line + labels + nanosecond timestamp came in 2 separate batches.
  122. // As this is very unlikely, and the result would only affect live-tailing css animation we have decided to not compare all received uids from data param as this would slow down processing.
  123. const usedUids: { string?: number } = {};
  124. for (const stream of streams) {
  125. // Find unique labels
  126. const unique = findUniqueLabels(stream.stream, baseLabels);
  127. const allLabelsString = Object.entries(stream.stream)
  128. .map(([key, val]) => `${key}="${val}"`)
  129. .sort()
  130. .join('');
  131. // Add each line
  132. for (const [ts, line] of stream.values) {
  133. tsField.values.add(new Date(parseInt(ts.slice(0, -6), 10)).toISOString());
  134. tsNsField.values.add(ts);
  135. lineField.values.add(line);
  136. labelsField.values.add(unique);
  137. idField.values.add(createUid(ts, allLabelsString, line, usedUids, data.refId));
  138. }
  139. }
  140. }
  141. function createUid(ts: string, labelsString: string, line: string, usedUids: any, refId?: string): string {
  142. // Generate id as hashed nanosecond timestamp, labels and line (this does not have to be unique)
  143. let id = uuidv5(`${ts}_${labelsString}_${line}`, UUID_NAMESPACE);
  144. // Check if generated id is unique
  145. // If not and we've already used it, append it's count after it
  146. if (id in usedUids) {
  147. // Increase the count
  148. const newCount = usedUids[id] + 1;
  149. usedUids[id] = newCount;
  150. // Append count to generated id to make it unique
  151. id = `${id}_${newCount}`;
  152. } else {
  153. // If id is unique and wasn't used, add it to usedUids and start count at 0
  154. usedUids[id] = 0;
  155. }
  156. // Return unique id
  157. if (refId) {
  158. return `${id}_${refId}`;
  159. }
  160. return id;
  161. }
  162. function lokiMatrixToTimeSeries(matrixResult: LokiMatrixResult, options: TransformerOptions): TimeSeries {
  163. const name = createMetricLabel(matrixResult.metric, options);
  164. return {
  165. target: name,
  166. title: name,
  167. datapoints: lokiPointsToTimeseriesPoints(matrixResult.values),
  168. tags: matrixResult.metric,
  169. meta: options.meta,
  170. refId: options.refId,
  171. };
  172. }
  173. function parsePrometheusFormatSampleValue(value: string): number {
  174. switch (value) {
  175. case '+Inf':
  176. return Number.POSITIVE_INFINITY;
  177. case '-Inf':
  178. return Number.NEGATIVE_INFINITY;
  179. default:
  180. return parseFloat(value);
  181. }
  182. }
  183. export function lokiPointsToTimeseriesPoints(data: Array<[number, string]>): TimeSeriesValue[][] {
  184. const datapoints: TimeSeriesValue[][] = [];
  185. for (const [time, value] of data) {
  186. let datapointValue: TimeSeriesValue = parsePrometheusFormatSampleValue(value);
  187. const timestamp = time * 1000;
  188. datapoints.push([datapointValue, timestamp]);
  189. }
  190. return datapoints;
  191. }
  192. export function lokiResultsToTableModel(
  193. lokiResults: Array<LokiMatrixResult | LokiVectorResult>,
  194. resultCount: number,
  195. refId: string,
  196. meta: QueryResultMeta
  197. ): TableModel {
  198. if (!lokiResults || lokiResults.length === 0) {
  199. return new TableModel();
  200. }
  201. // Collect all labels across all metrics
  202. const metricLabels: Set<string> = new Set<string>(
  203. lokiResults.reduce((acc, cur) => acc.concat(Object.keys(cur.metric)), [] as string[])
  204. );
  205. // Sort metric labels, create columns for them and record their index
  206. const sortedLabels = [...metricLabels.values()].sort();
  207. const table = new TableModel();
  208. table.refId = refId;
  209. table.meta = meta;
  210. table.columns = [
  211. { text: 'Time', type: FieldType.time },
  212. ...sortedLabels.map((label) => ({ text: label, filterable: true, type: FieldType.string })),
  213. { text: `Value #${refId}`, type: FieldType.number },
  214. ];
  215. // Populate rows, set value to empty string when label not present.
  216. lokiResults.forEach((series) => {
  217. const newSeries: LokiMatrixResult = {
  218. metric: series.metric,
  219. values: (series as LokiVectorResult).value
  220. ? [(series as LokiVectorResult).value]
  221. : (series as LokiMatrixResult).values,
  222. };
  223. if (!newSeries.values) {
  224. return;
  225. }
  226. if (!newSeries.metric) {
  227. table.rows.concat(newSeries.values.map(([a, b]) => [a * 1000, parseFloat(b)]));
  228. } else {
  229. table.rows.push(
  230. ...newSeries.values.map(([a, b]) => [
  231. a * 1000,
  232. ...sortedLabels.map((label) => newSeries.metric[label] || ''),
  233. parseFloat(b),
  234. ])
  235. );
  236. }
  237. });
  238. return table;
  239. }
  240. export function createMetricLabel(labelData: { [key: string]: string }, options?: TransformerOptions) {
  241. let label =
  242. options === undefined || isEmpty(options.legendFormat)
  243. ? getOriginalMetricName(labelData)
  244. : renderLegendFormat(getTemplateSrv().replace(options.legendFormat ?? '', options.scopedVars), labelData);
  245. if (!label && options) {
  246. label = options.query;
  247. }
  248. return label;
  249. }
  250. function getOriginalMetricName(labelData: { [key: string]: string }) {
  251. const labelPart = Object.entries(labelData)
  252. .map((label) => `${label[0]}="${label[1]}"`)
  253. .join(',');
  254. return `{${labelPart}}`;
  255. }
  256. export function decamelize(s: string): string {
  257. return s.replace(/[A-Z]/g, (m) => ` ${m.toLowerCase()}`);
  258. }
  259. // Turn loki stats { metric: value } into meta stat { title: metric, value: value }
  260. function lokiStatsToMetaStat(stats: LokiStats | undefined): QueryResultMetaStat[] {
  261. const result: QueryResultMetaStat[] = [];
  262. if (!stats) {
  263. return result;
  264. }
  265. for (const section in stats) {
  266. const values = stats[section];
  267. for (const label in values) {
  268. const value = values[label];
  269. let unit;
  270. if (/time/i.test(label) && value) {
  271. unit = 's';
  272. } else if (/bytes.*persecond/i.test(label)) {
  273. unit = 'Bps';
  274. } else if (/bytes/i.test(label)) {
  275. unit = 'decbytes';
  276. }
  277. const title = `${capitalize(section)}: ${decamelize(label)}`;
  278. result.push({ displayName: title, value, unit });
  279. }
  280. }
  281. return result;
  282. }
  283. export function lokiStreamsToDataFrames(
  284. response: LokiStreamResponse,
  285. target: LokiQuery,
  286. limit: number,
  287. config: LokiOptions
  288. ): DataFrame[] {
  289. const data = limit > 0 ? response.data.result : [];
  290. const stats: QueryResultMetaStat[] = lokiStatsToMetaStat(response.data.stats);
  291. // Use custom mechanism to identify which stat we want to promote to label
  292. const custom = {
  293. lokiQueryStatKey: 'Summary: total bytes processed',
  294. // TODO: when we get a real frame-type in @grafana/data
  295. // move this to frame.meta.type
  296. frameType: 'LabeledTimeValues',
  297. };
  298. const meta: QueryResultMeta = {
  299. searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.expr)),
  300. limit,
  301. stats,
  302. custom,
  303. preferredVisualisationType: 'logs',
  304. };
  305. const dataFrame = lokiStreamsToRawDataFrame(data, target.refId);
  306. enhanceDataFrame(dataFrame, config);
  307. if (meta.custom && dataFrameHasLokiError(dataFrame)) {
  308. meta.custom.error = 'Error when parsing some of the logs';
  309. }
  310. if (stats.length && !data.length) {
  311. return [
  312. {
  313. fields: [],
  314. length: 0,
  315. refId: target.refId,
  316. meta,
  317. },
  318. ];
  319. }
  320. return [
  321. {
  322. ...dataFrame,
  323. refId: target.refId,
  324. meta,
  325. },
  326. ];
  327. }
  328. /**
  329. * Adds new fields and DataLinks to DataFrame based on DataSource instance config.
  330. */
  331. export const enhanceDataFrame = (dataFrame: DataFrame, config: LokiOptions | null): void => {
  332. if (!config) {
  333. return;
  334. }
  335. const derivedFields = config.derivedFields ?? [];
  336. if (!derivedFields.length) {
  337. return;
  338. }
  339. const derivedFieldsGrouped = groupBy(derivedFields, 'name');
  340. const newFields = Object.values(derivedFieldsGrouped).map(fieldFromDerivedFieldConfig);
  341. const view = new DataFrameView(dataFrame);
  342. view.forEach((row: { Line: string }) => {
  343. for (const field of newFields) {
  344. const logMatch = row.Line.match(derivedFieldsGrouped[field.name][0].matcherRegex);
  345. field.values.add(logMatch && logMatch[1]);
  346. }
  347. });
  348. dataFrame.fields = [...dataFrame.fields, ...newFields];
  349. };
  350. /**
  351. * Transform derivedField config into dataframe field with config that contains link.
  352. */
  353. function fieldFromDerivedFieldConfig(derivedFieldConfigs: DerivedFieldConfig[]): Field<any, ArrayVector> {
  354. const dataSourceSrv = getDataSourceSrv();
  355. const dataLinks = derivedFieldConfigs.reduce((acc, derivedFieldConfig) => {
  356. // Having field.datasourceUid means it is an internal link.
  357. if (derivedFieldConfig.datasourceUid) {
  358. const dsSettings = dataSourceSrv.getInstanceSettings(derivedFieldConfig.datasourceUid);
  359. acc.push({
  360. // Will be filled out later
  361. title: derivedFieldConfig.urlDisplayLabel || '',
  362. url: '',
  363. // This is hardcoded for Jaeger or Zipkin not way right now to specify datasource specific query object
  364. internal: {
  365. query: { query: derivedFieldConfig.url },
  366. datasourceUid: derivedFieldConfig.datasourceUid,
  367. datasourceName: dsSettings?.name ?? 'Data source not found',
  368. },
  369. });
  370. } else if (derivedFieldConfig.url) {
  371. acc.push({
  372. // We do not know what title to give here so we count on presentation layer to create a title from metadata.
  373. title: derivedFieldConfig.urlDisplayLabel || '',
  374. // This is hardcoded for Jaeger or Zipkin not way right now to specify datasource specific query object
  375. url: derivedFieldConfig.url,
  376. });
  377. }
  378. return acc;
  379. }, [] as DataLink[]);
  380. return {
  381. name: derivedFieldConfigs[0].name,
  382. type: FieldType.string,
  383. config: {
  384. links: dataLinks,
  385. },
  386. // We are adding values later on
  387. values: new ArrayVector<string>([]),
  388. };
  389. }
  390. function rangeQueryResponseToTimeSeries(
  391. response: LokiResponse,
  392. query: LokiRangeQueryRequest,
  393. target: LokiQuery,
  394. scopedVars: ScopedVars
  395. ): TimeSeries[] {
  396. /** Show results of Loki metric queries only in graph */
  397. const meta: QueryResultMeta = {
  398. preferredVisualisationType: 'graph',
  399. };
  400. const transformerOptions: TransformerOptions = {
  401. legendFormat: target.legendFormat ?? '',
  402. query: query.query,
  403. refId: target.refId,
  404. meta,
  405. scopedVars,
  406. };
  407. switch (response.data.resultType) {
  408. case LokiResultType.Vector:
  409. return response.data.result.map((vecResult) =>
  410. lokiMatrixToTimeSeries({ metric: vecResult.metric, values: [vecResult.value] }, transformerOptions)
  411. );
  412. case LokiResultType.Matrix:
  413. return response.data.result.map((matrixResult) => lokiMatrixToTimeSeries(matrixResult, transformerOptions));
  414. default:
  415. return [];
  416. }
  417. }
  418. export function rangeQueryResponseToDataFrames(
  419. response: LokiResponse,
  420. query: LokiRangeQueryRequest,
  421. target: LokiQuery,
  422. scopedVars: ScopedVars
  423. ): DataFrame[] {
  424. const series = rangeQueryResponseToTimeSeries(response, query, target, scopedVars);
  425. const frames = series.map((s) => toDataFrame(s));
  426. const { step } = query;
  427. if (step != null) {
  428. const intervalMs = step * 1000;
  429. frames.forEach((frame) => {
  430. frame.fields.forEach((field) => {
  431. if (field.type === FieldType.time) {
  432. field.config.interval = intervalMs;
  433. }
  434. });
  435. });
  436. }
  437. return frames;
  438. }
  439. export function processRangeQueryResponse(
  440. response: LokiResponse,
  441. target: LokiQuery,
  442. query: LokiRangeQueryRequest,
  443. limit: number,
  444. config: LokiOptions,
  445. scopedVars: ScopedVars
  446. ) {
  447. switch (response.data.resultType) {
  448. case LokiResultType.Stream:
  449. return of({
  450. data: lokiStreamsToDataFrames(response as LokiStreamResponse, target, limit, config),
  451. key: `${target.refId}_log`,
  452. });
  453. case LokiResultType.Vector:
  454. case LokiResultType.Matrix:
  455. return of({
  456. data: rangeQueryResponseToDataFrames(response, query, target, scopedVars),
  457. key: target.refId,
  458. });
  459. default:
  460. throw new Error(`Unknown result type "${(response.data as any).resultType}".`);
  461. }
  462. }