LiveDataStream.test.ts 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049
  1. import mockConsole, { RestoreConsole } from 'jest-mock-console';
  2. import { mapValues } from 'lodash';
  3. import { Observable, Subject, Subscription, Unsubscribable } from 'rxjs';
  4. import {
  5. DataFrameJSON,
  6. dataFrameToJSON,
  7. DataQueryResponse,
  8. FieldType,
  9. LiveChannelAddress,
  10. LiveChannelConnectionState,
  11. LiveChannelEvent,
  12. LiveChannelEventType,
  13. LiveChannelLeaveEvent,
  14. LiveChannelScope,
  15. LoadingState,
  16. } from '@grafana/data';
  17. import { StreamingFrameAction } from '@grafana/runtime';
  18. import { StreamingDataFrame } from '../data/StreamingDataFrame';
  19. import { isStreamingResponseData, StreamingResponseData, StreamingResponseDataType } from '../data/utils';
  20. import { DataStreamHandlerDeps, LiveDataStream } from './LiveDataStream';
  21. type SubjectsInsteadOfObservables<T> = {
  22. [key in keyof T]: T[key] extends Observable<infer U> ? Subject<U> : T[key];
  23. };
  24. type DepsWithSubjectsInsteadOfObservables<T = any> = SubjectsInsteadOfObservables<DataStreamHandlerDeps<T>>;
  25. const createDeps = <T = any>(
  26. overrides?: Partial<DepsWithSubjectsInsteadOfObservables<T>>
  27. ): DepsWithSubjectsInsteadOfObservables<T> => {
  28. return {
  29. channelId: 'channel-1',
  30. liveEventsObservable: new Subject(),
  31. onShutdown: jest.fn(),
  32. subscriberReadiness: new Subject(),
  33. defaultStreamingFrameOptions: { maxLength: 100, maxDelta: Infinity, action: StreamingFrameAction.Append },
  34. shutdownDelayInMs: 1000,
  35. ...(overrides ?? {}),
  36. };
  37. };
  38. class ValuesCollection<T> implements Unsubscribable {
  39. values: T[] = [];
  40. errors: any[] = [];
  41. receivedComplete = false;
  42. subscription: Subscription | undefined;
  43. valuesCount = () => this.values.length;
  44. subscribeTo = (obs: Observable<T>) => {
  45. if (this.subscription) {
  46. throw new Error(`can't subscribe twice!`);
  47. }
  48. this.subscription = obs.subscribe({
  49. next: (n) => {
  50. this.values.push(n);
  51. },
  52. error: (err) => {
  53. this.errors.push(err);
  54. },
  55. complete: () => {
  56. this.receivedComplete = true;
  57. },
  58. });
  59. };
  60. get complete() {
  61. return this.receivedComplete || this.subscription?.closed;
  62. }
  63. unsubscribe = () => {
  64. this.subscription?.unsubscribe();
  65. };
  66. lastValue = () => {
  67. if (!this.values.length) {
  68. throw new Error(`no values available in ${JSON.stringify(this)}`);
  69. }
  70. return this.values[this.values.length - 1];
  71. };
  72. lastError = () => {
  73. if (!this.errors.length) {
  74. throw new Error(`no errors available in ${JSON.stringify(this)}`);
  75. }
  76. return this.errors[this.errors.length - 1];
  77. };
  78. }
  79. const liveChannelMessageEvent = <T extends DataFrameJSON>(message: T): LiveChannelEvent<T> => ({
  80. type: LiveChannelEventType.Message,
  81. message,
  82. });
  83. const liveChannelLeaveEvent = (): LiveChannelLeaveEvent => ({
  84. type: LiveChannelEventType.Leave,
  85. user: '',
  86. });
  87. const liveChannelStatusEvent = (state: LiveChannelConnectionState, error?: Error): LiveChannelEvent => ({
  88. type: LiveChannelEventType.Status,
  89. state,
  90. error,
  91. id: '',
  92. timestamp: 1,
  93. });
  94. const fieldsOf = (data: StreamingResponseData<StreamingResponseDataType.FullFrame>) => {
  95. return data.frame.fields.map((f) => ({
  96. name: f.name,
  97. values: f.values,
  98. }));
  99. };
  100. const dummyErrorMessage = 'dummy-error';
  101. describe('LiveDataStream', () => {
  102. jest.useFakeTimers();
  103. let restoreConsole: RestoreConsole | undefined;
  104. beforeEach(() => {
  105. restoreConsole = mockConsole();
  106. });
  107. afterEach(() => {
  108. restoreConsole?.();
  109. });
  110. const expectValueCollectionState = <T>(
  111. valuesCollection: ValuesCollection<T>,
  112. state: { errors: number; values: number; complete: boolean }
  113. ) => {
  114. expect(valuesCollection.values).toHaveLength(state.values);
  115. expect(valuesCollection.errors).toHaveLength(state.errors);
  116. expect(valuesCollection.complete).toEqual(state.complete);
  117. };
  118. const expectResponse =
  119. <T extends StreamingResponseDataType>(state: LoadingState) =>
  120. (res: DataQueryResponse, streamingDataType: T) => {
  121. expect(res.state).toEqual(state);
  122. expect(res.data).toHaveLength(1);
  123. const firstData = res.data[0];
  124. expect(isStreamingResponseData(firstData, streamingDataType)).toEqual(true);
  125. };
  126. const expectStreamingResponse = expectResponse(LoadingState.Streaming);
  127. const expectErrorResponse = expectResponse(LoadingState.Error);
  128. const dummyLiveChannelAddress: LiveChannelAddress = {
  129. scope: LiveChannelScope.Grafana,
  130. namespace: 'stream',
  131. path: 'abc',
  132. };
  133. const subscriptionKey = 'subKey';
  134. const liveDataStreamOptions = {
  135. withTimeBFilter: {
  136. addr: dummyLiveChannelAddress,
  137. buffer: {
  138. maxLength: 2,
  139. maxDelta: 10,
  140. action: StreamingFrameAction.Append,
  141. },
  142. filter: {
  143. fields: ['time', 'b'],
  144. },
  145. },
  146. withTimeAFilter: {
  147. addr: dummyLiveChannelAddress,
  148. buffer: {
  149. maxLength: 3,
  150. maxDelta: 10,
  151. action: StreamingFrameAction.Append,
  152. },
  153. filter: {
  154. fields: ['time', 'a'],
  155. },
  156. },
  157. withoutFilter: {
  158. addr: dummyLiveChannelAddress,
  159. buffer: {
  160. maxLength: 4,
  161. maxDelta: 10,
  162. action: StreamingFrameAction.Append,
  163. },
  164. },
  165. withReplaceMode: {
  166. addr: dummyLiveChannelAddress,
  167. buffer: {
  168. maxLength: 5,
  169. maxDelta: 10,
  170. action: StreamingFrameAction.Replace,
  171. },
  172. filter: {
  173. fields: ['time', 'b'],
  174. },
  175. },
  176. };
  177. const dataFrameJsons = {
  178. schema1: () => ({
  179. schema: {
  180. fields: [
  181. { name: 'time', type: FieldType.time },
  182. { name: 'a', type: FieldType.string },
  183. { name: 'b', type: FieldType.number },
  184. ],
  185. },
  186. data: {
  187. values: [
  188. [100, 101],
  189. ['a', 'b'],
  190. [1, 2],
  191. ],
  192. },
  193. }),
  194. schema1newValues: () => ({
  195. data: {
  196. values: [[102], ['c'], [3]],
  197. },
  198. }),
  199. schema1newValues2: () => ({
  200. data: {
  201. values: [[103], ['d'], [4]],
  202. },
  203. }),
  204. schema2: () => ({
  205. schema: {
  206. fields: [
  207. { name: 'time', type: FieldType.time },
  208. { name: 'a', type: FieldType.string },
  209. { name: 'b', type: FieldType.string },
  210. ],
  211. },
  212. data: {
  213. values: [[103], ['x'], ['y']],
  214. },
  215. }),
  216. schema2newValues: () => ({
  217. data: {
  218. values: [[104], ['w'], ['o']],
  219. },
  220. }),
  221. };
  222. describe('happy path with a single subscriber in `append` mode', () => {
  223. let deps: ReturnType<typeof createDeps>;
  224. let liveDataStream: LiveDataStream<any>;
  225. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  226. beforeAll(() => {
  227. deps = createDeps();
  228. expect(deps.liveEventsObservable.observed).toBeFalsy();
  229. expect(deps.subscriberReadiness.observed).toBeFalsy();
  230. liveDataStream = new LiveDataStream(deps);
  231. });
  232. it('should subscribe to live events observable immediately after creation', async () => {
  233. expect(deps.liveEventsObservable.observed).toBeTruthy();
  234. });
  235. it('should not subscribe to subscriberReadiness observable until first subscription', async () => {
  236. expect(deps.subscriberReadiness.observed).toBeFalsy();
  237. });
  238. it('should subscribe to subscriberReadiness observable on first subscription and return observable without any values', async () => {
  239. const observable = liveDataStream.get(liveDataStreamOptions.withTimeBFilter, subscriptionKey);
  240. valuesCollection.subscribeTo(observable);
  241. //then
  242. expect(deps.subscriberReadiness.observed).toBeTruthy();
  243. expectValueCollectionState(valuesCollection, { errors: 0, values: 0, complete: false });
  244. });
  245. it('should emit the first live channel message event as a serialized streamingDataFrame', async () => {
  246. const valuesCount = valuesCollection.valuesCount();
  247. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
  248. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  249. const response = valuesCollection.lastValue();
  250. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  251. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
  252. expect(data.frame.options).toEqual(liveDataStreamOptions.withTimeBFilter.buffer);
  253. const deserializedFrame = StreamingDataFrame.deserialize(data.frame);
  254. expect(deserializedFrame.fields).toEqual([
  255. {
  256. config: {},
  257. name: 'time',
  258. type: 'time',
  259. values: {
  260. buffer: [100, 101],
  261. },
  262. },
  263. {
  264. config: {},
  265. name: 'b',
  266. type: 'number',
  267. values: {
  268. buffer: [1, 2],
  269. },
  270. },
  271. ]);
  272. expect(deserializedFrame.length).toEqual(dataFrameJsons.schema1().data.values[0].length);
  273. });
  274. it('should emit subsequent messages as deltas if the schema stays the same', async () => {
  275. const valuesCount = valuesCollection.valuesCount();
  276. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  277. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  278. const response = valuesCollection.lastValue();
  279. expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
  280. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
  281. expect(data.values).toEqual([[102], [3]]);
  282. });
  283. it('should emit a full frame if schema changes', async () => {
  284. const valuesCount = valuesCollection.valuesCount();
  285. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2()));
  286. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  287. const response = valuesCollection.lastValue();
  288. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  289. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
  290. expect(fieldsOf(data)).toEqual([
  291. {
  292. name: 'time',
  293. values: [102, 103],
  294. },
  295. {
  296. name: 'b',
  297. values: [undefined, 'y'], // bug in streamingDataFrame - fix!
  298. },
  299. ]);
  300. });
  301. it('should emit a full frame if received a status live channel event with error', async () => {
  302. const valuesCount = valuesCollection.valuesCount();
  303. const error = new Error(`oh no!`);
  304. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, error));
  305. expectValueCollectionState(valuesCollection, {
  306. errors: 0,
  307. values: valuesCount + 1,
  308. complete: false,
  309. });
  310. const response = valuesCollection.lastValue();
  311. expectErrorResponse(response, StreamingResponseDataType.FullFrame);
  312. });
  313. it('should buffer new values until subscriber is ready', async () => {
  314. const valuesCount = valuesCollection.valuesCount();
  315. deps.subscriberReadiness.next(false);
  316. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  317. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  318. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  319. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  320. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  321. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  322. deps.subscriberReadiness.next(true);
  323. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  324. const response = valuesCollection.lastValue();
  325. expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
  326. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
  327. expect(data.values).toEqual([
  328. [104, 104, 104],
  329. ['o', 'o', 'o'],
  330. ]);
  331. });
  332. it(`should reduce buffer to a full frame if schema changed at any point during subscriber's unavailability`, async () => {
  333. const valuesCount = valuesCollection.valuesCount();
  334. deps.subscriberReadiness.next(false);
  335. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  336. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  337. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  338. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  339. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
  340. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  341. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  342. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  343. deps.subscriberReadiness.next(true);
  344. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  345. const response = valuesCollection.lastValue();
  346. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  347. expect(fieldsOf(response.data[0])).toEqual([
  348. {
  349. name: 'time',
  350. values: [101, 102],
  351. },
  352. {
  353. name: 'b',
  354. values: [2, 3],
  355. },
  356. ]);
  357. });
  358. it(`should reduce buffer to a full frame with last error if one or more errors occur during subscriber's unavailability`, async () => {
  359. const firstError = new Error('first error');
  360. const secondError = new Error(dummyErrorMessage);
  361. const valuesCount = valuesCollection.valuesCount();
  362. deps.subscriberReadiness.next(false);
  363. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  364. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  365. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, firstError));
  366. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  367. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, secondError));
  368. deps.subscriberReadiness.next(true);
  369. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  370. const response = valuesCollection.lastValue();
  371. expectErrorResponse(response, StreamingResponseDataType.FullFrame);
  372. const errorMessage = response?.error?.message;
  373. expect(errorMessage?.includes(dummyErrorMessage)).toBeTruthy();
  374. expect(fieldsOf(response.data[0])).toEqual([
  375. {
  376. name: 'time',
  377. values: [102, 102],
  378. },
  379. {
  380. name: 'b',
  381. values: [3, 3],
  382. },
  383. ]);
  384. });
  385. it('should ignore messages without payload', async () => {
  386. const valuesCount = valuesCollection.valuesCount();
  387. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected));
  388. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
  389. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
  390. deps.liveEventsObservable.next(liveChannelLeaveEvent());
  391. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  392. });
  393. it(`should shutdown when source observable completes`, async () => {
  394. expect(deps.onShutdown).not.toHaveBeenCalled();
  395. expect(deps.subscriberReadiness.observed).toBeTruthy();
  396. expect(deps.liveEventsObservable.observed).toBeTruthy();
  397. deps.liveEventsObservable.complete();
  398. expectValueCollectionState(valuesCollection, {
  399. errors: 0,
  400. values: valuesCollection.valuesCount(),
  401. complete: true,
  402. });
  403. expect(deps.subscriberReadiness.observed).toBeFalsy();
  404. expect(deps.liveEventsObservable.observed).toBeFalsy();
  405. expect(deps.onShutdown).toHaveBeenCalled();
  406. });
  407. });
  408. describe('happy path with a single subscriber in `replace` mode', () => {
  409. let deps: ReturnType<typeof createDeps>;
  410. let liveDataStream: LiveDataStream<any>;
  411. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  412. beforeAll(() => {
  413. deps = createDeps();
  414. expect(deps.liveEventsObservable.observed).toBeFalsy();
  415. expect(deps.subscriberReadiness.observed).toBeFalsy();
  416. liveDataStream = new LiveDataStream(deps);
  417. valuesCollection.subscribeTo(liveDataStream.get(liveDataStreamOptions.withReplaceMode, subscriptionKey));
  418. });
  419. it('should emit the first live channel message event as a serialized streamingDataFrame', async () => {
  420. const valuesCount = valuesCollection.valuesCount();
  421. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
  422. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  423. const response = valuesCollection.lastValue();
  424. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  425. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
  426. expect(data.frame.options).toEqual(liveDataStreamOptions.withReplaceMode.buffer);
  427. const deserializedFrame = StreamingDataFrame.deserialize(data.frame);
  428. expect(deserializedFrame.fields).toEqual([
  429. {
  430. config: {},
  431. name: 'time',
  432. type: 'time',
  433. values: {
  434. buffer: [100, 101],
  435. },
  436. },
  437. {
  438. config: {},
  439. name: 'b',
  440. type: 'number',
  441. values: {
  442. buffer: [1, 2],
  443. },
  444. },
  445. ]);
  446. expect(deserializedFrame.length).toEqual(dataFrameJsons.schema1().data.values[0].length);
  447. });
  448. it('should emit subsequent messages as deltas if the schema stays the same', async () => {
  449. const valuesCount = valuesCollection.valuesCount();
  450. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  451. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  452. const response = valuesCollection.lastValue();
  453. expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
  454. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
  455. expect(data.values).toEqual([[102], [3]]);
  456. });
  457. it('should emit a full frame if schema changes', async () => {
  458. const valuesCount = valuesCollection.valuesCount();
  459. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2()));
  460. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  461. const response = valuesCollection.lastValue();
  462. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  463. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
  464. expect(fieldsOf(data)).toEqual([
  465. {
  466. name: 'time',
  467. values: [103],
  468. },
  469. {
  470. name: 'b',
  471. values: ['y'],
  472. },
  473. ]);
  474. });
  475. it('should emit a full frame if received a status live channel event with error', async () => {
  476. const valuesCount = valuesCollection.valuesCount();
  477. const error = new Error(`oh no!`);
  478. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, error));
  479. expectValueCollectionState(valuesCollection, {
  480. errors: 0,
  481. values: valuesCount + 1,
  482. complete: false,
  483. });
  484. const response = valuesCollection.lastValue();
  485. expectErrorResponse(response, StreamingResponseDataType.FullFrame);
  486. });
  487. it('should buffer new values until subscriber is ready', async () => {
  488. const valuesCount = valuesCollection.valuesCount();
  489. deps.subscriberReadiness.next(false);
  490. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  491. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  492. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  493. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  494. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  495. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  496. deps.subscriberReadiness.next(true);
  497. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  498. const response = valuesCollection.lastValue();
  499. expectStreamingResponse(response, StreamingResponseDataType.NewValuesSameSchema);
  500. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.NewValuesSameSchema>;
  501. expect(data.values).toEqual([[104], ['o']]);
  502. });
  503. it(`should reduce buffer to a full frame if schema changed at any point during subscriber's unavailability`, async () => {
  504. const valuesCount = valuesCollection.valuesCount();
  505. deps.subscriberReadiness.next(false);
  506. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  507. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  508. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema2newValues()));
  509. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  510. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
  511. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  512. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  513. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  514. deps.subscriberReadiness.next(true);
  515. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  516. const response = valuesCollection.lastValue();
  517. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  518. expect(fieldsOf(response.data[0])).toEqual([
  519. {
  520. name: 'time',
  521. values: [102],
  522. },
  523. {
  524. name: 'b',
  525. values: [3],
  526. },
  527. ]);
  528. });
  529. it(`should reduce buffer to an empty full frame with last error if one or more errors occur during subscriber's unavailability`, async () => {
  530. const firstError = new Error('first error');
  531. const secondError = new Error(dummyErrorMessage);
  532. const valuesCount = valuesCollection.valuesCount();
  533. deps.subscriberReadiness.next(false);
  534. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  535. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  536. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, firstError));
  537. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  538. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected, secondError));
  539. deps.subscriberReadiness.next(true);
  540. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount + 1, complete: false });
  541. const response = valuesCollection.lastValue();
  542. expectErrorResponse(response, StreamingResponseDataType.FullFrame);
  543. const errorMessage = response?.error?.message;
  544. expect(errorMessage?.includes(dummyErrorMessage)).toBeTruthy();
  545. expect(fieldsOf(response.data[0])).toEqual([
  546. {
  547. name: 'time',
  548. values: [],
  549. },
  550. {
  551. name: 'b',
  552. values: [],
  553. },
  554. ]);
  555. });
  556. it('should ignore messages without payload', async () => {
  557. const valuesCount = valuesCollection.valuesCount();
  558. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Connected));
  559. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
  560. deps.liveEventsObservable.next(liveChannelStatusEvent(LiveChannelConnectionState.Pending));
  561. deps.liveEventsObservable.next(liveChannelLeaveEvent());
  562. expectValueCollectionState(valuesCollection, { errors: 0, values: valuesCount, complete: false });
  563. });
  564. it(`should shutdown when source observable completes`, async () => {
  565. expect(deps.onShutdown).not.toHaveBeenCalled();
  566. expect(deps.subscriberReadiness.observed).toBeTruthy();
  567. expect(deps.liveEventsObservable.observed).toBeTruthy();
  568. deps.liveEventsObservable.complete();
  569. expectValueCollectionState(valuesCollection, {
  570. errors: 0,
  571. values: valuesCollection.valuesCount(),
  572. complete: true,
  573. });
  574. expect(deps.subscriberReadiness.observed).toBeFalsy();
  575. expect(deps.liveEventsObservable.observed).toBeFalsy();
  576. expect(deps.onShutdown).toHaveBeenCalled();
  577. });
  578. });
  579. describe('single subscriber with initial frame', () => {
  580. it('should emit the initial frame right after subscribe', async () => {
  581. const deps = createDeps();
  582. const liveDataStream = new LiveDataStream(deps);
  583. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  584. const initialFrame = dataFrameJsons.schema2();
  585. const observable = liveDataStream.get(
  586. { ...liveDataStreamOptions.withTimeBFilter, frame: initialFrame },
  587. subscriptionKey
  588. );
  589. valuesCollection.subscribeTo(observable);
  590. //then
  591. expect(deps.subscriberReadiness.observed).toBeTruthy();
  592. expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false });
  593. const response = valuesCollection.lastValue();
  594. expectStreamingResponse(response, StreamingResponseDataType.FullFrame);
  595. const data = response.data[0] as StreamingResponseData<StreamingResponseDataType.FullFrame>;
  596. expect(fieldsOf(data)).toEqual([
  597. {
  598. name: 'time',
  599. values: [103],
  600. },
  601. {
  602. name: 'b',
  603. values: ['y'], // bug in streamingDataFrame - fix!
  604. },
  605. ]);
  606. });
  607. });
  608. describe('two subscribers with initial frames', () => {
  609. it('should ignore initial frame from second subscriber', async () => {
  610. const deps = createDeps();
  611. const liveDataStream = new LiveDataStream(deps);
  612. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  613. const valuesCollection2 = new ValuesCollection<DataQueryResponse>();
  614. valuesCollection.subscribeTo(
  615. liveDataStream.get(
  616. {
  617. ...liveDataStreamOptions.withTimeBFilter,
  618. frame: dataFrameToJSON(StreamingDataFrame.fromDataFrameJSON(dataFrameJsons.schema1())),
  619. },
  620. subscriptionKey
  621. )
  622. );
  623. expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false });
  624. valuesCollection2.subscribeTo(
  625. liveDataStream.get(
  626. {
  627. ...liveDataStreamOptions.withTimeBFilter,
  628. frame: dataFrameJsons.schema2(),
  629. },
  630. subscriptionKey
  631. )
  632. );
  633. // no extra emits for initial subscriber
  634. expectValueCollectionState(valuesCollection, { errors: 0, values: 1, complete: false });
  635. expectValueCollectionState(valuesCollection2, { errors: 0, values: 1, complete: false });
  636. const frame1 = fieldsOf(valuesCollection.lastValue().data[0]);
  637. const frame2 = fieldsOf(valuesCollection2.lastValue().data[0]);
  638. expect(frame1).toEqual(frame2);
  639. });
  640. });
  641. describe('source observable emits completed event', () => {
  642. it('should shutdown', async () => {
  643. const deps = createDeps();
  644. const liveDataStream = new LiveDataStream(deps);
  645. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  646. const observable = liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey);
  647. valuesCollection.subscribeTo(observable);
  648. expect(deps.subscriberReadiness.observed).toBeTruthy();
  649. expect(deps.liveEventsObservable.observed).toBeTruthy();
  650. expect(deps.onShutdown).not.toHaveBeenCalled();
  651. deps.liveEventsObservable.complete();
  652. expectValueCollectionState(valuesCollection, {
  653. errors: 0,
  654. values: 0,
  655. complete: true,
  656. });
  657. expect(deps.subscriberReadiness.observed).toBeFalsy();
  658. expect(deps.liveEventsObservable.observed).toBeFalsy();
  659. expect(deps.onShutdown).toHaveBeenCalled();
  660. });
  661. });
  662. describe('source observable emits error event', () => {
  663. it('should shutdown', async () => {
  664. const deps = createDeps();
  665. const liveDataStream = new LiveDataStream(deps);
  666. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  667. const observable = liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey);
  668. valuesCollection.subscribeTo(observable);
  669. expect(deps.subscriberReadiness.observed).toBeTruthy();
  670. expect(deps.liveEventsObservable.observed).toBeTruthy();
  671. expect(deps.onShutdown).not.toHaveBeenCalled();
  672. deps.liveEventsObservable.error(new Error(dummyErrorMessage));
  673. expectValueCollectionState(valuesCollection, {
  674. errors: 0,
  675. values: 1,
  676. complete: true,
  677. });
  678. const response = valuesCollection.lastValue();
  679. expectErrorResponse(response, StreamingResponseDataType.FullFrame);
  680. expect(response?.error?.message?.includes(dummyErrorMessage)).toBeTruthy();
  681. expect(deps.subscriberReadiness.observed).toBeFalsy();
  682. expect(deps.liveEventsObservable.observed).toBeFalsy();
  683. expect(deps.onShutdown).toHaveBeenCalled();
  684. });
  685. });
  686. describe('happy path with multiple subscribers', () => {
  687. let deps: ReturnType<typeof createDeps>;
  688. let liveDataStream: LiveDataStream<any>;
  689. const valuesCollections = {
  690. withTimeBFilter: new ValuesCollection<DataQueryResponse>(),
  691. withTimeAFilter: new ValuesCollection<DataQueryResponse>(),
  692. withoutFilter: new ValuesCollection<DataQueryResponse>(),
  693. withReplaceMode: new ValuesCollection<DataQueryResponse>(),
  694. };
  695. beforeAll(() => {
  696. deps = createDeps();
  697. liveDataStream = new LiveDataStream(deps);
  698. });
  699. it('should emit the last value as full frame to new subscribers', async () => {
  700. valuesCollections.withTimeAFilter.subscribeTo(
  701. liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey)
  702. );
  703. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1()));
  704. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues()));
  705. expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false });
  706. valuesCollections.withTimeBFilter.subscribeTo(
  707. liveDataStream.get(liveDataStreamOptions.withTimeBFilter, subscriptionKey)
  708. );
  709. valuesCollections.withoutFilter.subscribeTo(
  710. liveDataStream.get(liveDataStreamOptions.withoutFilter, subscriptionKey)
  711. );
  712. valuesCollections.withReplaceMode.subscribeTo(
  713. liveDataStream.get(liveDataStreamOptions.withReplaceMode, subscriptionKey)
  714. );
  715. expectValueCollectionState(valuesCollections.withTimeAFilter, { errors: 0, values: 2, complete: false });
  716. expectValueCollectionState(valuesCollections.withTimeBFilter, { errors: 0, values: 1, complete: false });
  717. expectValueCollectionState(valuesCollections.withoutFilter, { errors: 0, values: 1, complete: false });
  718. expectValueCollectionState(valuesCollections.withReplaceMode, { errors: 0, values: 1, complete: false });
  719. });
  720. it('should emit filtered data to each subscriber', async () => {
  721. deps.liveEventsObservable.next(liveChannelMessageEvent(dataFrameJsons.schema1newValues2()));
  722. expect(
  723. mapValues(valuesCollections, (collection) =>
  724. collection.values.map((response) => {
  725. const data = response.data[0];
  726. return isStreamingResponseData(data, StreamingResponseDataType.FullFrame)
  727. ? fieldsOf(data)
  728. : isStreamingResponseData(data, StreamingResponseDataType.NewValuesSameSchema)
  729. ? data.values
  730. : response;
  731. })
  732. )
  733. ).toEqual({
  734. withTimeAFilter: [
  735. [
  736. {
  737. name: 'time',
  738. values: [100, 101],
  739. },
  740. {
  741. name: 'a',
  742. values: ['a', 'b'],
  743. },
  744. ],
  745. [[102], ['c']],
  746. [[103], ['d']],
  747. ],
  748. withTimeBFilter: [
  749. [
  750. {
  751. name: 'time',
  752. values: [101, 102],
  753. },
  754. {
  755. name: 'b',
  756. values: [2, 3],
  757. },
  758. ],
  759. [[103], [4]],
  760. ],
  761. withoutFilter: [
  762. [
  763. {
  764. name: 'time',
  765. values: [100, 101, 102],
  766. },
  767. {
  768. name: 'a',
  769. values: ['a', 'b', 'c'],
  770. },
  771. {
  772. name: 'b',
  773. values: [1, 2, 3],
  774. },
  775. ],
  776. [[103], ['d'], [4]],
  777. ],
  778. withReplaceMode: [
  779. // only last packet
  780. [
  781. {
  782. name: 'time',
  783. values: [102],
  784. },
  785. {
  786. name: 'b',
  787. values: [3],
  788. },
  789. ],
  790. [[103], [4]],
  791. ],
  792. });
  793. });
  794. it('should not unsubscribe the source observable unless all subscribers unsubscribe', async () => {
  795. valuesCollections.withTimeAFilter.unsubscribe();
  796. jest.advanceTimersByTime(deps.shutdownDelayInMs + 1);
  797. expect(mapValues(valuesCollections, (coll) => coll.complete)).toEqual({
  798. withTimeAFilter: true,
  799. withTimeBFilter: false,
  800. withoutFilter: false,
  801. withReplaceMode: false,
  802. });
  803. expect(deps.subscriberReadiness.observed).toBeTruthy();
  804. expect(deps.liveEventsObservable.observed).toBeTruthy();
  805. expect(deps.onShutdown).not.toHaveBeenCalled();
  806. });
  807. it('should emit complete event to all subscribers during shutdown', async () => {
  808. deps.liveEventsObservable.complete();
  809. expect(mapValues(valuesCollections, (coll) => coll.complete)).toEqual({
  810. withTimeAFilter: true,
  811. withTimeBFilter: true,
  812. withoutFilter: true,
  813. withReplaceMode: true,
  814. });
  815. expect(deps.subscriberReadiness.observed).toBeFalsy();
  816. expect(deps.liveEventsObservable.observed).toBeFalsy();
  817. expect(deps.onShutdown).toHaveBeenCalled();
  818. });
  819. });
  820. describe('shutdown after unsubscribe', () => {
  821. it('should shutdown if no other subscriber subscribed during shutdown delay', async () => {
  822. const deps = createDeps();
  823. const liveDataStream = new LiveDataStream(deps);
  824. const valuesCollection = new ValuesCollection<DataQueryResponse>();
  825. valuesCollection.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey));
  826. expect(deps.subscriberReadiness.observed).toBeTruthy();
  827. expect(deps.liveEventsObservable.observed).toBeTruthy();
  828. expect(deps.onShutdown).not.toHaveBeenCalled();
  829. valuesCollection.unsubscribe();
  830. jest.advanceTimersByTime(deps.shutdownDelayInMs - 1);
  831. // delay not finished - should still be subscribed
  832. expect(deps.subscriberReadiness.observed).toBeFalsy();
  833. expect(deps.liveEventsObservable.observed).toBeTruthy();
  834. expect(deps.onShutdown).not.toHaveBeenCalled();
  835. jest.advanceTimersByTime(2);
  836. // delay not finished - shut still be subscribed
  837. expect(deps.subscriberReadiness.observed).toBeFalsy();
  838. expect(deps.liveEventsObservable.observed).toBeFalsy();
  839. expect(deps.onShutdown).toHaveBeenCalled();
  840. });
  841. it('should not shutdown after unsubscribe if another subscriber subscribes during shutdown delay', async () => {
  842. const deps = createDeps();
  843. const liveDataStream = new LiveDataStream(deps);
  844. const valuesCollection1 = new ValuesCollection<DataQueryResponse>();
  845. const valuesCollection2 = new ValuesCollection<DataQueryResponse>();
  846. valuesCollection1.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey));
  847. expect(deps.subscriberReadiness.observed).toBeTruthy();
  848. expect(deps.liveEventsObservable.observed).toBeTruthy();
  849. expect(deps.onShutdown).not.toHaveBeenCalled();
  850. valuesCollection1.unsubscribe();
  851. jest.advanceTimersByTime(deps.shutdownDelayInMs - 1);
  852. valuesCollection2.subscribeTo(liveDataStream.get(liveDataStreamOptions.withTimeAFilter, subscriptionKey));
  853. jest.advanceTimersByTime(deps.shutdownDelayInMs);
  854. expect(deps.subscriberReadiness.observed).toBeTruthy();
  855. expect(deps.liveEventsObservable.observed).toBeTruthy();
  856. expect(deps.onShutdown).not.toHaveBeenCalled();
  857. });
  858. });
  859. });