streaming.ts 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import { map, Observable, defer, mergeMap } from 'rxjs';
  2. import { DataFrameJSON, DataQueryRequest, DataQueryResponse, LiveChannelScope, LoadingState } from '@grafana/data';
  3. import { getGrafanaLiveSrv } from '@grafana/runtime';
  4. import { StreamingDataFrame } from 'app/features/live/data/StreamingDataFrame';
  5. import { LokiDatasource } from './datasource';
  6. import { LokiQuery } from './types';
  7. /**
  8. * Calculate a unique key for the query. The key is used to pick a channel and should
  9. * be unique for each distinct query execution plan. This key is not secure and is only picked to avoid
  10. * possible collisions
  11. */
  12. export async function getLiveStreamKey(query: LokiQuery): Promise<string> {
  13. const str = JSON.stringify({ expr: query.expr });
  14. const msgUint8 = new TextEncoder().encode(str); // encode as (utf-8) Uint8Array
  15. const hashBuffer = await crypto.subtle.digest('SHA-1', msgUint8); // hash the message
  16. const hashArray = Array.from(new Uint8Array(hashBuffer.slice(0, 8))); // first 8 bytes
  17. return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
  18. }
  19. // This will get both v1 and v2 result formats
  20. export function doLokiChannelStream(
  21. query: LokiQuery,
  22. ds: LokiDatasource,
  23. options: DataQueryRequest<LokiQuery>
  24. ): Observable<DataQueryResponse> {
  25. // maximum time to keep values
  26. const range = options.range;
  27. const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000;
  28. let maxLength = options.maxDataPoints ?? 1000;
  29. if (maxLength > 100) {
  30. // for small buffers, keep them small
  31. maxLength *= 2;
  32. }
  33. let frame: StreamingDataFrame | undefined = undefined;
  34. const updateFrame = (msg: any) => {
  35. if (msg?.message) {
  36. const p = msg.message as DataFrameJSON;
  37. if (!frame) {
  38. frame = StreamingDataFrame.fromDataFrameJSON(p, {
  39. maxLength,
  40. maxDelta,
  41. displayNameFormat: query.legendFormat,
  42. });
  43. } else {
  44. frame.push(p);
  45. }
  46. }
  47. return frame;
  48. };
  49. return defer(() => getLiveStreamKey(query)).pipe(
  50. mergeMap((key) => {
  51. return getGrafanaLiveSrv()
  52. .getStream<any>({
  53. scope: LiveChannelScope.DataSource,
  54. namespace: ds.uid,
  55. path: `tail/${key}`,
  56. data: {
  57. ...query,
  58. timeRange: {
  59. from: range.from.valueOf().toString(),
  60. to: range.to.valueOf().toString(),
  61. },
  62. },
  63. })
  64. .pipe(
  65. map((evt) => {
  66. const frame = updateFrame(evt);
  67. return {
  68. data: frame ? [frame] : [],
  69. state: LoadingState.Streaming,
  70. };
  71. })
  72. );
  73. })
  74. );
  75. }