live_streams.test.ts 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import { noop } from 'lodash';
  2. import { Observable, Subject, of, throwError, concat } from 'rxjs';
  3. import { mergeMap } from 'rxjs/operators';
  4. import * as rxJsWebSocket from 'rxjs/webSocket';
  5. import { DataFrame, DataFrameView, formatLabels, Labels } from '@grafana/data';
  6. import { LiveStreams } from './live_streams';
  7. import { LokiTailResponse } from './types';
  8. let fakeSocket: Subject<any>;
  9. jest.mock('rxjs/webSocket', () => {
  10. return {
  11. __esModule: true,
  12. webSocket: () => fakeSocket,
  13. };
  14. });
  15. describe('Live Stream Tests', () => {
  16. afterAll(() => {
  17. jest.restoreAllMocks();
  18. });
  19. const msg0: LokiTailResponse = {
  20. streams: [
  21. {
  22. stream: { filename: '/var/log/sntpc.log', job: 'varlogs' },
  23. values: [['1567025440118944705', 'Kittens']],
  24. },
  25. ],
  26. dropped_entries: null,
  27. };
  28. it('reads the values into the buffer', (done) => {
  29. fakeSocket = new Subject<any>();
  30. const labels: Labels = { job: 'varlogs' };
  31. const target = makeTarget('fake', labels);
  32. const stream = new LiveStreams().getStream(target);
  33. expect.assertions(4);
  34. const tests = [
  35. (val: DataFrame[]) => {
  36. expect(val[0].length).toEqual(7);
  37. expect(val[0].fields[2].labels).toEqual(labels);
  38. },
  39. (val: DataFrame[]) => {
  40. expect(val[0].length).toEqual(8);
  41. const view = new DataFrameView(val[0]);
  42. const last = { ...view.get(view.length - 1) };
  43. expect(last).toEqual({
  44. Time: '2019-08-28T20:50:40.118Z',
  45. tsNs: '1567025440118944705',
  46. id: '25d81461-a66f-53ff-98d5-e39515af4735_A',
  47. Line: 'Kittens',
  48. labels: { filename: '/var/log/sntpc.log' },
  49. });
  50. },
  51. ];
  52. stream.subscribe({
  53. next: (val) => {
  54. const test = tests.shift();
  55. test!(val);
  56. },
  57. complete: () => done(),
  58. });
  59. // Send it the initial list of things
  60. fakeSocket.next(initialRawResponse);
  61. // Send it a single update
  62. fakeSocket.next(msg0);
  63. fakeSocket.complete();
  64. });
  65. it('returns the same subscription if the url matches existing one', () => {
  66. fakeSocket = new Subject<any>();
  67. const liveStreams = new LiveStreams();
  68. const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
  69. const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
  70. expect(stream1).toBe(stream2);
  71. });
  72. it('returns new subscription when the previous unsubscribed', () => {
  73. fakeSocket = new Subject<any>();
  74. const liveStreams = new LiveStreams();
  75. const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
  76. const subscription = stream1.subscribe({
  77. next: noop,
  78. });
  79. subscription.unsubscribe();
  80. const stream2 = liveStreams.getStream(makeTarget('url_to_match'));
  81. expect(stream1).not.toBe(stream2);
  82. });
  83. it('returns new subscription when the previous is unsubscribed and correctly unsubscribes from source', () => {
  84. let unsubscribed = false;
  85. fakeSocket = new Observable(() => {
  86. return () => (unsubscribed = true);
  87. }) as any;
  88. jest.spyOn(rxJsWebSocket, 'webSocket').mockReturnValue(fakeSocket as rxJsWebSocket.WebSocketSubject<unknown>);
  89. const liveStreams = new LiveStreams();
  90. const stream1 = liveStreams.getStream(makeTarget('url_to_match'));
  91. const subscription = stream1.subscribe({
  92. next: noop,
  93. });
  94. subscription.unsubscribe();
  95. expect(unsubscribed).toBe(true);
  96. });
  97. it('should reconnect when abnormal error', async () => {
  98. const abnormalError = new Error('weird error') as any;
  99. abnormalError.code = 1006;
  100. const logStreamBeforeError = of({
  101. streams: [
  102. {
  103. stream: { filename: '/var/log/sntpc.log', job: 'varlogs' },
  104. values: [['1567025440118944705', 'Kittens']],
  105. },
  106. ],
  107. dropped_entries: null,
  108. });
  109. const logStreamAfterError = of({
  110. streams: [
  111. {
  112. stream: { filename: '/var/log/sntpc.log', job: 'varlogs' },
  113. values: [['1567025440118944705', 'Doggos']],
  114. },
  115. ],
  116. dropped_entries: null,
  117. });
  118. const errorStream = throwError(abnormalError);
  119. let retries = 0;
  120. fakeSocket = of({}).pipe(
  121. mergeMap(() => {
  122. // When subscribed first time, return logStream and errorStream
  123. if (retries++ === 0) {
  124. return concat(logStreamBeforeError, errorStream);
  125. }
  126. // When re-subsribed after abnormal error, return just logStream
  127. return logStreamAfterError;
  128. })
  129. ) as any;
  130. jest.spyOn(rxJsWebSocket, 'webSocket').mockReturnValue(fakeSocket as rxJsWebSocket.WebSocketSubject<unknown>);
  131. const liveStreams = new LiveStreams();
  132. await expect(liveStreams.getStream(makeTarget('url_to_match'), 100)).toEmitValuesWith((received) => {
  133. const data = received[0];
  134. const view = new DataFrameView(data[0]);
  135. const firstLog = { ...view.get(0) };
  136. const secondLog = { ...view.get(1) };
  137. expect(firstLog.Line).toBe('Kittens');
  138. expect(secondLog.Line).toBe('Doggos');
  139. expect(retries).toBe(2);
  140. });
  141. });
  142. });
  143. /**
  144. * Create target (query to run). Url is what is used as cache key.
  145. */
  146. function makeTarget(url: string, labels?: Labels) {
  147. labels = labels || { job: 'varlogs' };
  148. return {
  149. url,
  150. size: 10,
  151. query: formatLabels(labels),
  152. refId: 'A',
  153. regexp: '',
  154. };
  155. }
  156. //----------------------------------------------------------------
  157. // Added this at the end so the top is more readable
  158. //----------------------------------------------------------------
  159. const initialRawResponse: LokiTailResponse = {
  160. streams: [
  161. {
  162. stream: {
  163. filename: '/var/log/docker.log',
  164. job: 'varlogs',
  165. },
  166. values: [
  167. [
  168. '1567025018215000000',
  169. 'level=debug msg="[resolver] received AAAA record \\"::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
  170. ],
  171. [
  172. '1567025018215000000',
  173. '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147224630Z" ' +
  174. 'level=debug msg="[resolver] received AAAA record \\"fe80::1\\" for \\"localhost.\\" from udp:192.168.65.1"',
  175. ],
  176. ['1567025020452000000', '2019-08-28T20:43:40Z sntpc sntpc[1]: offset=-0.022171, delay=0.000463'],
  177. ['1567025050297000000', '2019-08-28T20:44:10Z sntpc sntpc[1]: offset=-0.022327, delay=0.000527'],
  178. [
  179. '1567025078152000000',
  180. '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095444834Z" ' +
  181. 'level=debug msg="Name To resolve: localhost."',
  182. ],
  183. [
  184. '1567025078152000000',
  185. '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095896074Z" ' +
  186. 'level=debug msg="[resolver] query localhost. (A) from 172.22.0.4:53748, forwarding to udp:192.168.65.1"',
  187. ],
  188. [
  189. '1567025078152000000',
  190. '2019-08-28T20:44:38Z docker time="2019-08-28T20:44:38.095444834Z" level=debug msg="Name To resolve: localhost."',
  191. ],
  192. ],
  193. },
  194. ],
  195. dropped_entries: null,
  196. };