logs_model.ts 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. import { size } from 'lodash';
  2. import { Observable } from 'rxjs';
  3. import {
  4. AbsoluteTimeRange,
  5. DataFrame,
  6. DataQuery,
  7. DataQueryRequest,
  8. DataQueryResponse,
  9. DataSourceApi,
  10. dateTimeFormat,
  11. dateTimeFormatTimeAgo,
  12. FieldCache,
  13. FieldColorModeId,
  14. FieldConfig,
  15. FieldType,
  16. FieldWithIndex,
  17. findCommonLabels,
  18. findUniqueLabels,
  19. getLogLevel,
  20. getLogLevelFromKey,
  21. Labels,
  22. LoadingState,
  23. LogLevel,
  24. LogRowModel,
  25. LogsDedupStrategy,
  26. LogsMetaItem,
  27. LogsMetaKind,
  28. LogsModel,
  29. MutableDataFrame,
  30. rangeUtil,
  31. ScopedVars,
  32. sortInAscendingOrder,
  33. textUtil,
  34. TimeRange,
  35. toDataFrame,
  36. toUtc,
  37. } from '@grafana/data';
  38. import { SIPrefix } from '@grafana/data/src/valueFormats/symbolFormatters';
  39. import { BarAlignment, GraphDrawStyle, StackingMode } from '@grafana/schema';
  40. import { ansicolor, colors } from '@grafana/ui';
  41. import { getThemeColor } from 'app/core/utils/colors';
  42. export const LIMIT_LABEL = 'Line limit';
  43. export const COMMON_LABELS = 'Common labels';
  44. export const LogLevelColor = {
  45. [LogLevel.critical]: colors[7],
  46. [LogLevel.warning]: colors[1],
  47. [LogLevel.error]: colors[4],
  48. [LogLevel.info]: colors[0],
  49. [LogLevel.debug]: colors[5],
  50. [LogLevel.trace]: colors[2],
  51. [LogLevel.unknown]: getThemeColor('#8e8e8e', '#dde4ed'),
  52. };
  53. const MILLISECOND = 1;
  54. const SECOND = 1000 * MILLISECOND;
  55. const MINUTE = 60 * SECOND;
  56. const HOUR = 60 * MINUTE;
  57. const DAY = 24 * HOUR;
  58. const isoDateRegexp = /\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-6]\d[,\.]\d+([+-][0-2]\d:[0-5]\d|Z)/g;
  59. function isDuplicateRow(row: LogRowModel, other: LogRowModel, strategy?: LogsDedupStrategy): boolean {
  60. switch (strategy) {
  61. case LogsDedupStrategy.exact:
  62. // Exact still strips dates
  63. return row.entry.replace(isoDateRegexp, '') === other.entry.replace(isoDateRegexp, '');
  64. case LogsDedupStrategy.numbers:
  65. return row.entry.replace(/\d/g, '') === other.entry.replace(/\d/g, '');
  66. case LogsDedupStrategy.signature:
  67. return row.entry.replace(/\w/g, '') === other.entry.replace(/\w/g, '');
  68. default:
  69. return false;
  70. }
  71. }
  72. export function dedupLogRows(rows: LogRowModel[], strategy?: LogsDedupStrategy): LogRowModel[] {
  73. if (strategy === LogsDedupStrategy.none) {
  74. return rows;
  75. }
  76. return rows.reduce((result: LogRowModel[], row: LogRowModel, index) => {
  77. const rowCopy = { ...row };
  78. const previous = result[result.length - 1];
  79. if (index > 0 && isDuplicateRow(row, previous, strategy)) {
  80. previous.duplicates!++;
  81. } else {
  82. rowCopy.duplicates = 0;
  83. result.push(rowCopy);
  84. }
  85. return result;
  86. }, []);
  87. }
  88. export function filterLogLevels(logRows: LogRowModel[], hiddenLogLevels: Set<LogLevel>): LogRowModel[] {
  89. if (hiddenLogLevels.size === 0) {
  90. return logRows;
  91. }
  92. return logRows.filter((row: LogRowModel) => {
  93. return !hiddenLogLevels.has(row.logLevel);
  94. });
  95. }
  96. export function makeDataFramesForLogs(sortedRows: LogRowModel[], bucketSize: number): DataFrame[] {
  97. // currently interval is rangeMs / resolution, which is too low for showing series as bars.
  98. // Should be solved higher up the chain when executing queries & interval calculated and not here but this is a temporary fix.
  99. // Graph time series by log level
  100. const seriesByLevel: any = {};
  101. const seriesList: any[] = [];
  102. for (const row of sortedRows) {
  103. let series = seriesByLevel[row.logLevel];
  104. if (!series) {
  105. seriesByLevel[row.logLevel] = series = {
  106. lastTs: null,
  107. datapoints: [],
  108. target: row.logLevel,
  109. color: LogLevelColor[row.logLevel],
  110. };
  111. seriesList.push(series);
  112. }
  113. // align time to bucket size - used Math.floor for calculation as time of the bucket
  114. // must be in the past (before Date.now()) to be displayed on the graph
  115. const time = Math.floor(row.timeEpochMs / bucketSize) * bucketSize;
  116. // Entry for time
  117. if (time === series.lastTs) {
  118. series.datapoints[series.datapoints.length - 1][0]++;
  119. } else {
  120. series.datapoints.push([1, time]);
  121. series.lastTs = time;
  122. }
  123. // add zero to other levels to aid stacking so each level series has same number of points
  124. for (const other of seriesList) {
  125. if (other !== series && other.lastTs !== time) {
  126. other.datapoints.push([0, time]);
  127. other.lastTs = time;
  128. }
  129. }
  130. }
  131. return seriesList.map((series, i) => {
  132. series.datapoints.sort((a: number[], b: number[]) => a[1] - b[1]);
  133. const data = toDataFrame(series);
  134. const fieldCache = new FieldCache(data);
  135. const valueField = fieldCache.getFirstFieldOfType(FieldType.number)!;
  136. data.fields[valueField.index].config.min = 0;
  137. data.fields[valueField.index].config.decimals = 0;
  138. data.fields[valueField.index].config.color = {
  139. mode: FieldColorModeId.Fixed,
  140. fixedColor: series.color,
  141. };
  142. data.fields[valueField.index].config.custom = {
  143. drawStyle: GraphDrawStyle.Bars,
  144. barAlignment: BarAlignment.Center,
  145. barWidthFactor: 0.9,
  146. barMaxWidth: 5,
  147. lineColor: series.color,
  148. pointColor: series.color,
  149. fillColor: series.color,
  150. lineWidth: 0,
  151. fillOpacity: 100,
  152. stacking: {
  153. mode: StackingMode.Normal,
  154. group: 'A',
  155. },
  156. };
  157. return data;
  158. });
  159. }
  160. function isLogsData(series: DataFrame) {
  161. return series.fields.some((f) => f.type === FieldType.time) && series.fields.some((f) => f.type === FieldType.string);
  162. }
  163. /**
  164. * Convert dataFrame into LogsModel which consists of creating separate array of log rows and metrics series. Metrics
  165. * series can be either already included in the dataFrame or will be computed from the log rows.
  166. * @param dataFrame
  167. * @param intervalMs In case there are no metrics series, we use this for computing it from log rows.
  168. */
  169. export function dataFrameToLogsModel(
  170. dataFrame: DataFrame[],
  171. intervalMs: number | undefined,
  172. absoluteRange?: AbsoluteTimeRange,
  173. queries?: DataQuery[]
  174. ): LogsModel {
  175. const { logSeries } = separateLogsAndMetrics(dataFrame);
  176. const logsModel = logSeriesToLogsModel(logSeries);
  177. if (logsModel) {
  178. // Create histogram metrics from logs using the interval as bucket size for the line count
  179. if (intervalMs && logsModel.rows.length > 0) {
  180. const sortedRows = logsModel.rows.sort(sortInAscendingOrder);
  181. const { visibleRange, bucketSize, visibleRangeMs, requestedRangeMs } = getSeriesProperties(
  182. sortedRows,
  183. intervalMs,
  184. absoluteRange
  185. );
  186. logsModel.visibleRange = visibleRange;
  187. logsModel.series = makeDataFramesForLogs(sortedRows, bucketSize);
  188. if (logsModel.meta) {
  189. logsModel.meta = adjustMetaInfo(logsModel, visibleRangeMs, requestedRangeMs);
  190. }
  191. } else {
  192. logsModel.series = [];
  193. }
  194. logsModel.queries = queries;
  195. return logsModel;
  196. }
  197. return {
  198. hasUniqueLabels: false,
  199. rows: [],
  200. meta: [],
  201. series: [],
  202. queries,
  203. };
  204. }
  205. /**
  206. * Returns a clamped time range and interval based on the visible logs and the given range.
  207. *
  208. * @param sortedRows Log rows from the query response
  209. * @param intervalMs Dynamic data interval based on available pixel width
  210. * @param absoluteRange Requested time range
  211. * @param pxPerBar Default: 20, buckets will be rendered as bars, assuming 10px per histogram bar plus some free space around it
  212. */
  213. export function getSeriesProperties(
  214. sortedRows: LogRowModel[],
  215. intervalMs: number,
  216. absoluteRange?: AbsoluteTimeRange,
  217. pxPerBar = 20,
  218. minimumBucketSize = 1000
  219. ) {
  220. let visibleRange = absoluteRange;
  221. let resolutionIntervalMs = intervalMs;
  222. let bucketSize = Math.max(resolutionIntervalMs * pxPerBar, minimumBucketSize);
  223. let visibleRangeMs;
  224. let requestedRangeMs;
  225. // Clamp time range to visible logs otherwise big parts of the graph might look empty
  226. if (absoluteRange) {
  227. const earliestTsLogs = sortedRows[0].timeEpochMs;
  228. requestedRangeMs = absoluteRange.to - absoluteRange.from;
  229. visibleRangeMs = absoluteRange.to - earliestTsLogs;
  230. if (visibleRangeMs > 0) {
  231. // Adjust interval bucket size for potentially shorter visible range
  232. const clampingFactor = visibleRangeMs / requestedRangeMs;
  233. resolutionIntervalMs *= clampingFactor;
  234. // Minimum bucketsize of 1s for nicer graphing
  235. bucketSize = Math.max(Math.ceil(resolutionIntervalMs * pxPerBar), minimumBucketSize);
  236. // makeSeriesForLogs() aligns dataspoints with time buckets, so we do the same here to not cut off data
  237. const adjustedEarliest = Math.floor(earliestTsLogs / bucketSize) * bucketSize;
  238. visibleRange = { from: adjustedEarliest, to: absoluteRange.to };
  239. } else {
  240. // We use visibleRangeMs to calculate range coverage of received logs. However, some data sources are rounding up range in requests. This means that received logs
  241. // can (in edge cases) be outside of the requested range and visibleRangeMs < 0. In that case, we want to change visibleRangeMs to be 1 so we can calculate coverage.
  242. visibleRangeMs = 1;
  243. }
  244. }
  245. return { bucketSize, visibleRange, visibleRangeMs, requestedRangeMs };
  246. }
  247. function separateLogsAndMetrics(dataFrames: DataFrame[]) {
  248. const metricSeries: DataFrame[] = [];
  249. const logSeries: DataFrame[] = [];
  250. for (const dataFrame of dataFrames) {
  251. // We want to show meta stats even if no result was returned. That's why we are pushing also data frames with no fields.
  252. if (isLogsData(dataFrame) || !dataFrame.fields.length) {
  253. logSeries.push(dataFrame);
  254. continue;
  255. }
  256. if (dataFrame.length > 0) {
  257. metricSeries.push(dataFrame);
  258. }
  259. }
  260. return { logSeries, metricSeries };
  261. }
  262. interface LogFields {
  263. series: DataFrame;
  264. timeField: FieldWithIndex;
  265. stringField: FieldWithIndex;
  266. labelsField?: FieldWithIndex;
  267. timeNanosecondField?: FieldWithIndex;
  268. logLevelField?: FieldWithIndex;
  269. idField?: FieldWithIndex;
  270. }
  271. function getAllLabels(fields: LogFields): Labels[] {
  272. // there are two types of dataframes we handle:
  273. // 1. labels are in a separate field (more efficient when labels change by every log-row)
  274. // 2. labels are in in the string-field's `.labels` attribute
  275. const { stringField, labelsField } = fields;
  276. if (labelsField !== undefined) {
  277. return labelsField.values.toArray();
  278. } else {
  279. return [stringField.labels ?? {}];
  280. }
  281. }
  282. function getLabelsForFrameRow(fields: LogFields, index: number): Labels {
  283. // there are two types of dataframes we handle.
  284. // either labels-on-the-string-field, or labels-in-the-labels-field
  285. const { stringField, labelsField } = fields;
  286. if (labelsField !== undefined) {
  287. return labelsField.values.get(index);
  288. } else {
  289. return stringField.labels ?? {};
  290. }
  291. }
  292. /**
  293. * Converts dataFrames into LogsModel. This involves merging them into one list, sorting them and computing metadata
  294. * like common labels.
  295. */
  296. export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefined {
  297. if (logSeries.length === 0) {
  298. return undefined;
  299. }
  300. const allLabels: Labels[][] = [];
  301. // Find the fields we care about and collect all labels
  302. let allSeries: LogFields[] = [];
  303. // We are sometimes passing data frames with no fields because we want to calculate correct meta stats.
  304. // Therefore we need to filter out series with no fields. These series are used only for meta stats calculation.
  305. const seriesWithFields = logSeries.filter((series) => series.fields.length);
  306. if (seriesWithFields.length) {
  307. seriesWithFields.forEach((series) => {
  308. const fieldCache = new FieldCache(series);
  309. const stringField = fieldCache.getFirstFieldOfType(FieldType.string);
  310. const timeField = fieldCache.getFirstFieldOfType(FieldType.time);
  311. // NOTE: this is experimental, please do not use in your code.
  312. // we will get this custom-frame-type into the "real" frame-type list soon,
  313. // but the name might change, so please do not use it until then.
  314. const labelsField =
  315. series.meta?.custom?.frameType === 'LabeledTimeValues' ? fieldCache.getFieldByName('labels') : undefined;
  316. if (stringField !== undefined && timeField !== undefined) {
  317. const info = {
  318. series,
  319. timeField,
  320. labelsField,
  321. timeNanosecondField: fieldCache.getFieldByName('tsNs'),
  322. stringField,
  323. logLevelField: fieldCache.getFieldByName('level'),
  324. idField: getIdField(fieldCache),
  325. };
  326. allSeries.push(info);
  327. const labels = getAllLabels(info);
  328. if (labels.length > 0) {
  329. allLabels.push(labels);
  330. }
  331. }
  332. });
  333. }
  334. const flatAllLabels = allLabels.flat();
  335. const commonLabels = flatAllLabels.length > 0 ? findCommonLabels(flatAllLabels) : {};
  336. const rows: LogRowModel[] = [];
  337. let hasUniqueLabels = false;
  338. for (const info of allSeries) {
  339. const { timeField, timeNanosecondField, stringField, logLevelField, idField, series } = info;
  340. for (let j = 0; j < series.length; j++) {
  341. const ts = timeField.values.get(j);
  342. const time = toUtc(ts);
  343. const tsNs = timeNanosecondField ? timeNanosecondField.values.get(j) : undefined;
  344. const timeEpochNs = tsNs ? tsNs : time.valueOf() + '000000';
  345. // In edge cases, this can be undefined. If undefined, we want to replace it with empty string.
  346. const messageValue: unknown = stringField.values.get(j) ?? '';
  347. // This should be string but sometimes isn't (eg elastic) because the dataFrame is not strongly typed.
  348. const message: string = typeof messageValue === 'string' ? messageValue : JSON.stringify(messageValue);
  349. const hasAnsi = textUtil.hasAnsiCodes(message);
  350. const hasUnescapedContent = !!message.match(/\\n|\\t|\\r/);
  351. const searchWords = series.meta && series.meta.searchWords ? series.meta.searchWords : [];
  352. const entry = hasAnsi ? ansicolor.strip(message) : message;
  353. const labels = getLabelsForFrameRow(info, j);
  354. const uniqueLabels = findUniqueLabels(labels, commonLabels);
  355. if (Object.keys(uniqueLabels).length > 0) {
  356. hasUniqueLabels = true;
  357. }
  358. let logLevel = LogLevel.unknown;
  359. const logLevelKey = (logLevelField && logLevelField.values.get(j)) || (labels && labels['level']);
  360. if (logLevelKey) {
  361. logLevel = getLogLevelFromKey(logLevelKey);
  362. } else {
  363. logLevel = getLogLevel(entry);
  364. }
  365. rows.push({
  366. entryFieldIndex: stringField.index,
  367. rowIndex: j,
  368. dataFrame: series,
  369. logLevel,
  370. timeFromNow: dateTimeFormatTimeAgo(ts),
  371. timeEpochMs: time.valueOf(),
  372. timeEpochNs,
  373. timeLocal: dateTimeFormat(ts, { timeZone: 'browser' }),
  374. timeUtc: dateTimeFormat(ts, { timeZone: 'utc' }),
  375. uniqueLabels,
  376. hasAnsi,
  377. hasUnescapedContent,
  378. searchWords,
  379. entry,
  380. raw: message,
  381. labels: labels || {},
  382. uid: idField ? idField.values.get(j) : j.toString(),
  383. });
  384. }
  385. }
  386. // Meta data to display in status
  387. const meta: LogsMetaItem[] = [];
  388. if (size(commonLabels) > 0) {
  389. meta.push({
  390. label: COMMON_LABELS,
  391. value: commonLabels,
  392. kind: LogsMetaKind.LabelsMap,
  393. });
  394. }
  395. const limits = logSeries.filter((series) => series.meta && series.meta.limit);
  396. const limitValue = Object.values(
  397. limits.reduce((acc: any, elem: any) => {
  398. acc[elem.refId] = elem.meta.limit;
  399. return acc;
  400. }, {})
  401. ).reduce((acc: number, elem: any) => (acc += elem), 0) as number;
  402. if (limitValue > 0) {
  403. meta.push({
  404. label: LIMIT_LABEL,
  405. value: limitValue,
  406. kind: LogsMetaKind.Number,
  407. });
  408. }
  409. let totalBytes = 0;
  410. const queriesVisited: { [refId: string]: boolean } = {};
  411. // To add just 1 error message
  412. let errorMetaAdded = false;
  413. for (const series of logSeries) {
  414. const totalBytesKey = series.meta?.custom?.lokiQueryStatKey;
  415. const { refId } = series; // Stats are per query, keeping track by refId
  416. if (!errorMetaAdded && series.meta?.custom?.error) {
  417. meta.push({
  418. label: '',
  419. value: series.meta?.custom.error,
  420. kind: LogsMetaKind.Error,
  421. });
  422. errorMetaAdded = true;
  423. }
  424. if (refId && !queriesVisited[refId]) {
  425. if (totalBytesKey && series.meta?.stats) {
  426. const byteStat = series.meta.stats.find((stat) => stat.displayName === totalBytesKey);
  427. if (byteStat) {
  428. totalBytes += byteStat.value;
  429. }
  430. }
  431. queriesVisited[refId] = true;
  432. }
  433. }
  434. if (totalBytes > 0) {
  435. const { text, suffix } = SIPrefix('B')(totalBytes);
  436. meta.push({
  437. label: 'Total bytes processed',
  438. value: `${text} ${suffix}`,
  439. kind: LogsMetaKind.String,
  440. });
  441. }
  442. return {
  443. hasUniqueLabels,
  444. meta,
  445. rows,
  446. };
  447. }
  448. function getIdField(fieldCache: FieldCache): FieldWithIndex | undefined {
  449. const idFieldNames = ['id'];
  450. for (const fieldName of idFieldNames) {
  451. const idField = fieldCache.getFieldByName(fieldName);
  452. if (idField) {
  453. return idField;
  454. }
  455. }
  456. return undefined;
  457. }
  458. // Used to add additional information to Line limit meta info
  459. function adjustMetaInfo(logsModel: LogsModel, visibleRangeMs?: number, requestedRangeMs?: number): LogsMetaItem[] {
  460. let logsModelMeta = [...logsModel.meta!];
  461. const limitIndex = logsModelMeta.findIndex((meta) => meta.label === LIMIT_LABEL);
  462. const limit = limitIndex >= 0 && logsModelMeta[limitIndex]?.value;
  463. if (limit && limit > 0) {
  464. let metaLimitValue;
  465. if (limit === logsModel.rows.length && visibleRangeMs && requestedRangeMs) {
  466. const coverage = ((visibleRangeMs / requestedRangeMs) * 100).toFixed(2);
  467. metaLimitValue = `${limit} reached, received logs cover ${coverage}% (${rangeUtil.msRangeToTimeString(
  468. visibleRangeMs
  469. )}) of your selected time range (${rangeUtil.msRangeToTimeString(requestedRangeMs)})`;
  470. } else {
  471. metaLimitValue = `${limit} (${logsModel.rows.length} returned)`;
  472. }
  473. logsModelMeta[limitIndex] = {
  474. label: LIMIT_LABEL,
  475. value: metaLimitValue,
  476. kind: LogsMetaKind.String,
  477. };
  478. }
  479. return logsModelMeta;
  480. }
  481. /**
  482. * Returns field configuration used to render logs volume bars
  483. */
  484. function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) {
  485. const name = oneLevelDetected && level === LogLevel.unknown ? 'logs' : level;
  486. const color = LogLevelColor[level];
  487. return {
  488. displayNameFromDS: name,
  489. color: {
  490. mode: FieldColorModeId.Fixed,
  491. fixedColor: color,
  492. },
  493. custom: {
  494. drawStyle: GraphDrawStyle.Bars,
  495. barAlignment: BarAlignment.Center,
  496. lineColor: color,
  497. pointColor: color,
  498. fillColor: color,
  499. lineWidth: 1,
  500. fillOpacity: 100,
  501. stacking: {
  502. mode: StackingMode.Normal,
  503. group: 'A',
  504. },
  505. },
  506. };
  507. }
  508. /**
  509. * Take multiple data frames, sum up values and group by level.
  510. * Return a list of data frames, each representing single level.
  511. */
  512. export function aggregateRawLogsVolume(
  513. rawLogsVolume: DataFrame[],
  514. extractLevel: (dataFrame: DataFrame) => LogLevel
  515. ): DataFrame[] {
  516. const logsVolumeByLevelMap: Partial<Record<LogLevel, DataFrame[]>> = {};
  517. rawLogsVolume.forEach((dataFrame) => {
  518. const level = extractLevel(dataFrame);
  519. if (!logsVolumeByLevelMap[level]) {
  520. logsVolumeByLevelMap[level] = [];
  521. }
  522. logsVolumeByLevelMap[level]!.push(dataFrame);
  523. });
  524. return Object.keys(logsVolumeByLevelMap).map((level: string) => {
  525. return aggregateFields(
  526. logsVolumeByLevelMap[level as LogLevel]!,
  527. getLogVolumeFieldConfig(level as LogLevel, Object.keys(logsVolumeByLevelMap).length === 1)
  528. );
  529. });
  530. }
  531. /**
  532. * Aggregate multiple data frames into a single data frame by adding values.
  533. * Multiple data frames for the same level are passed here to get a single
  534. * data frame for a given level. Aggregation by level happens in aggregateRawLogsVolume()
  535. */
  536. function aggregateFields(dataFrames: DataFrame[], config: FieldConfig): DataFrame {
  537. const aggregatedDataFrame = new MutableDataFrame();
  538. if (!dataFrames.length) {
  539. return aggregatedDataFrame;
  540. }
  541. const totalLength = dataFrames[0].length;
  542. const timeField = new FieldCache(dataFrames[0]).getFirstFieldOfType(FieldType.time);
  543. if (!timeField) {
  544. return aggregatedDataFrame;
  545. }
  546. aggregatedDataFrame.addField({ name: 'Time', type: FieldType.time }, totalLength);
  547. aggregatedDataFrame.addField({ name: 'Value', type: FieldType.number, config }, totalLength);
  548. dataFrames.forEach((dataFrame) => {
  549. dataFrame.fields.forEach((field) => {
  550. if (field.type === FieldType.number) {
  551. for (let pointIndex = 0; pointIndex < totalLength; pointIndex++) {
  552. const currentValue = aggregatedDataFrame.get(pointIndex).Value;
  553. const valueToAdd = field.values.get(pointIndex);
  554. const totalValue =
  555. currentValue === null && valueToAdd === null ? null : (currentValue || 0) + (valueToAdd || 0);
  556. aggregatedDataFrame.set(pointIndex, { Value: totalValue, Time: timeField.values.get(pointIndex) });
  557. }
  558. }
  559. });
  560. });
  561. return aggregatedDataFrame;
  562. }
  563. type LogsVolumeQueryOptions<T extends DataQuery> = {
  564. extractLevel: (dataFrame: DataFrame) => LogLevel;
  565. targets: T[];
  566. range: TimeRange;
  567. };
  568. /**
  569. * Creates an observable, which makes requests to get logs volume and aggregates results.
  570. */
  571. export function queryLogsVolume<T extends DataQuery>(
  572. datasource: DataSourceApi<T, any, any>,
  573. logsVolumeRequest: DataQueryRequest<T>,
  574. options: LogsVolumeQueryOptions<T>
  575. ): Observable<DataQueryResponse> {
  576. const timespan = options.range.to.valueOf() - options.range.from.valueOf();
  577. const intervalInfo = getIntervalInfo(logsVolumeRequest.scopedVars, timespan);
  578. logsVolumeRequest.interval = intervalInfo.interval;
  579. logsVolumeRequest.scopedVars.__interval = { value: intervalInfo.interval, text: intervalInfo.interval };
  580. if (intervalInfo.intervalMs !== undefined) {
  581. logsVolumeRequest.intervalMs = intervalInfo.intervalMs;
  582. logsVolumeRequest.scopedVars.__interval_ms = { value: intervalInfo.intervalMs, text: intervalInfo.intervalMs };
  583. }
  584. return new Observable((observer) => {
  585. let rawLogsVolume: DataFrame[] = [];
  586. observer.next({
  587. state: LoadingState.Loading,
  588. error: undefined,
  589. data: [],
  590. });
  591. const subscription = (datasource.query(logsVolumeRequest) as Observable<DataQueryResponse>).subscribe({
  592. complete: () => {
  593. const aggregatedLogsVolume = aggregateRawLogsVolume(rawLogsVolume, options.extractLevel);
  594. if (aggregatedLogsVolume[0]) {
  595. aggregatedLogsVolume[0].meta = {
  596. custom: {
  597. targets: options.targets,
  598. absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
  599. },
  600. };
  601. }
  602. observer.next({
  603. state: LoadingState.Done,
  604. error: undefined,
  605. data: aggregatedLogsVolume,
  606. });
  607. observer.complete();
  608. },
  609. next: (dataQueryResponse: DataQueryResponse) => {
  610. const { error } = dataQueryResponse;
  611. if (error !== undefined) {
  612. observer.next({
  613. state: LoadingState.Error,
  614. error,
  615. data: [],
  616. });
  617. observer.error(error);
  618. } else {
  619. rawLogsVolume = rawLogsVolume.concat(dataQueryResponse.data.map(toDataFrame));
  620. }
  621. },
  622. error: (error) => {
  623. observer.next({
  624. state: LoadingState.Error,
  625. error: error,
  626. data: [],
  627. });
  628. observer.error(error);
  629. },
  630. });
  631. return () => {
  632. subscription?.unsubscribe();
  633. };
  634. });
  635. }
  636. function getIntervalInfo(scopedVars: ScopedVars, timespanMs: number): { interval: string; intervalMs?: number } {
  637. if (scopedVars.__interval) {
  638. let intervalMs: number = scopedVars.__interval_ms.value;
  639. let interval = '';
  640. // below 5 seconds we force the resolution to be per 1ms as interval in scopedVars is not less than 10ms
  641. if (timespanMs < SECOND * 5) {
  642. intervalMs = MILLISECOND;
  643. interval = '1ms';
  644. } else if (intervalMs > HOUR) {
  645. intervalMs = DAY;
  646. interval = '1d';
  647. } else if (intervalMs > MINUTE) {
  648. intervalMs = HOUR;
  649. interval = '1h';
  650. } else if (intervalMs > SECOND) {
  651. intervalMs = MINUTE;
  652. interval = '1m';
  653. } else {
  654. intervalMs = SECOND;
  655. interval = '1s';
  656. }
  657. return { interval, intervalMs };
  658. } else {
  659. return { interval: '$__interval' };
  660. }
  661. }