live_streams.ts 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import { Observable, throwError, timer } from 'rxjs';
  2. import { finalize, map, retryWhen, mergeMap } from 'rxjs/operators';
  3. import { webSocket } from 'rxjs/webSocket';
  4. import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data';
  5. import { appendResponseToBufferedData } from './result_transformer';
  6. import { LokiTailResponse } from './types';
  7. /**
  8. * Maps directly to a query in the UI (refId is key)
  9. */
  10. export interface LokiLiveTarget {
  11. query: string;
  12. url: string;
  13. refId: string;
  14. size: number;
  15. }
  16. /**
  17. * Cache of websocket streams that can be returned as observable. In case there already is a stream for particular
  18. * target it is returned and on subscription returns the latest dataFrame.
  19. */
  20. export class LiveStreams {
  21. private streams: KeyValue<Observable<DataFrame[]>> = {};
  22. getStream(target: LokiLiveTarget, retryInterval = 5000): Observable<DataFrame[]> {
  23. let stream = this.streams[target.url];
  24. if (stream) {
  25. return stream;
  26. }
  27. const data = new CircularDataFrame({ capacity: target.size });
  28. data.addField({ name: 'labels', type: FieldType.other }); // The labels for each line
  29. data.addField({ name: 'Time', type: FieldType.time, config: {} });
  30. data.addField({ name: 'Line', type: FieldType.string }).labels = parseLabels(target.query);
  31. data.addField({ name: 'id', type: FieldType.string });
  32. data.addField({ name: 'tsNs', type: FieldType.time, config: {} });
  33. data.meta = { ...data.meta, preferredVisualisationType: 'logs' };
  34. data.refId = target.refId;
  35. stream = webSocket<LokiTailResponse>(target.url).pipe(
  36. map((response: LokiTailResponse) => {
  37. appendResponseToBufferedData(response, data);
  38. return [data];
  39. }),
  40. retryWhen((attempts: Observable<any>) =>
  41. attempts.pipe(
  42. mergeMap((error, i) => {
  43. const retryAttempt = i + 1;
  44. // Code 1006 is used to indicate that a connection was closed abnormally.
  45. // Added hard limit of 30 on number of retries.
  46. // If connection was closed abnormally, and we wish to retry, otherwise throw error.
  47. if (error.code === 1006 && retryAttempt < 30) {
  48. if (retryAttempt > 10) {
  49. // If more than 10 times retried, consol.warn, but keep reconnecting
  50. console.warn(
  51. `Websocket connection is being disrupted. We keep reconnecting but consider starting new live tailing again. Error: ${error.reason}`
  52. );
  53. }
  54. // Retry every 5s
  55. return timer(retryInterval);
  56. }
  57. return throwError(error);
  58. })
  59. )
  60. ),
  61. finalize(() => {
  62. delete this.streams[target.url];
  63. })
  64. );
  65. this.streams[target.url] = stream;
  66. return stream;
  67. }
  68. }