LiveDataStream.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. import { map, Observable, ReplaySubject, Subject, Subscriber, Subscription } from 'rxjs';
  2. import {
  3. DataFrameJSON,
  4. DataQueryError,
  5. Field,
  6. isLiveChannelMessageEvent,
  7. isLiveChannelStatusEvent,
  8. LiveChannelConnectionState,
  9. LiveChannelEvent,
  10. LiveChannelId,
  11. LoadingState,
  12. } from '@grafana/data';
  13. import { LiveDataStreamOptions, StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
  14. import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
  15. import { getStreamingFrameOptions, StreamingDataFrame } from '../data/StreamingDataFrame';
  16. import { StreamingResponseDataType } from '../data/utils';
  17. import { DataStreamSubscriptionKey, StreamingDataQueryResponse } from './service';
  18. const bufferIfNot =
  19. (canEmitObservable: Observable<boolean>) =>
  20. <T>(source: Observable<T>): Observable<T[]> => {
  21. return new Observable((subscriber: Subscriber<T[]>) => {
  22. let buffer: T[] = [];
  23. let canEmit = true;
  24. const emitBuffer = () => {
  25. subscriber.next(buffer);
  26. buffer = [];
  27. };
  28. const canEmitSub = canEmitObservable.subscribe({
  29. next: (val) => {
  30. canEmit = val;
  31. if (canEmit && buffer.length) {
  32. emitBuffer();
  33. }
  34. },
  35. });
  36. const sourceSub = source.subscribe({
  37. next(value) {
  38. if (canEmit) {
  39. if (!buffer.length) {
  40. subscriber.next([value]);
  41. } else {
  42. emitBuffer();
  43. }
  44. } else {
  45. buffer.push(value);
  46. }
  47. },
  48. error(error) {
  49. subscriber.error(error);
  50. },
  51. complete() {
  52. subscriber.complete();
  53. },
  54. });
  55. return () => {
  56. sourceSub.unsubscribe();
  57. canEmitSub.unsubscribe();
  58. };
  59. });
  60. };
  61. export type DataStreamHandlerDeps<T> = {
  62. channelId: LiveChannelId;
  63. liveEventsObservable: Observable<LiveChannelEvent<T>>;
  64. onShutdown: () => void;
  65. subscriberReadiness: Observable<boolean>;
  66. defaultStreamingFrameOptions: Readonly<StreamingFrameOptions>;
  67. shutdownDelayInMs: number;
  68. };
  69. enum InternalStreamMessageType {
  70. Error,
  71. NewValuesSameSchema,
  72. ChangedSchema,
  73. }
  74. type InternalStreamMessageTypeToData = {
  75. [InternalStreamMessageType.Error]: {
  76. error: DataQueryError;
  77. };
  78. [InternalStreamMessageType.ChangedSchema]: {};
  79. [InternalStreamMessageType.NewValuesSameSchema]: {
  80. values: unknown[][];
  81. };
  82. };
  83. type InternalStreamMessage<T = InternalStreamMessageType> = T extends InternalStreamMessageType
  84. ? {
  85. type: T;
  86. } & InternalStreamMessageTypeToData[T]
  87. : never;
  88. const reduceNewValuesSameSchemaMessages = (
  89. packets: Array<InternalStreamMessage<InternalStreamMessageType.NewValuesSameSchema>>
  90. ) => ({
  91. values: packets.reduce((acc, { values }) => {
  92. for (let i = 0; i < values.length; i++) {
  93. if (!acc[i]) {
  94. acc[i] = [];
  95. }
  96. for (let j = 0; j < values[i].length; j++) {
  97. acc[i].push(values[i][j]);
  98. }
  99. }
  100. return acc;
  101. }, [] as unknown[][]),
  102. type: InternalStreamMessageType.NewValuesSameSchema,
  103. });
  104. const filterMessages = <T extends InternalStreamMessageType>(
  105. packets: InternalStreamMessage[],
  106. type: T
  107. ): Array<InternalStreamMessage<T>> => packets.filter((p) => p.type === type) as Array<InternalStreamMessage<T>>;
  108. export class LiveDataStream<T = unknown> {
  109. private frameBuffer: StreamingDataFrame;
  110. private liveEventsSubscription: Subscription;
  111. private stream: Subject<InternalStreamMessage> = new ReplaySubject(1);
  112. private shutdownTimeoutId: ReturnType<typeof setTimeout> | undefined;
  113. constructor(private deps: DataStreamHandlerDeps<T>) {
  114. this.frameBuffer = StreamingDataFrame.empty(deps.defaultStreamingFrameOptions);
  115. this.liveEventsSubscription = deps.liveEventsObservable.subscribe({
  116. error: this.onError,
  117. complete: this.onComplete,
  118. next: this.onNext,
  119. });
  120. }
  121. private shutdown = () => {
  122. this.stream.complete();
  123. this.liveEventsSubscription.unsubscribe();
  124. this.deps.onShutdown();
  125. };
  126. private shutdownIfNoSubscribers = () => {
  127. if (!this.stream.observed) {
  128. this.shutdown();
  129. }
  130. };
  131. private onError = (err: any) => {
  132. console.log('LiveQuery [error]', { err }, this.deps.channelId);
  133. this.stream.next({
  134. type: InternalStreamMessageType.Error,
  135. error: toDataQueryError(err),
  136. });
  137. this.shutdown();
  138. };
  139. private onComplete = () => {
  140. console.log('LiveQuery [complete]', this.deps.channelId);
  141. this.shutdown();
  142. };
  143. private onNext = (evt: LiveChannelEvent) => {
  144. if (isLiveChannelMessageEvent(evt)) {
  145. this.process(evt.message);
  146. return;
  147. }
  148. const liveChannelStatusEvent = isLiveChannelStatusEvent(evt);
  149. if (liveChannelStatusEvent && evt.error) {
  150. this.stream.next({
  151. type: InternalStreamMessageType.Error,
  152. error: {
  153. ...toDataQueryError(evt.error),
  154. message: `Streaming channel error: ${evt.error.message}`,
  155. },
  156. });
  157. return;
  158. }
  159. if (
  160. liveChannelStatusEvent &&
  161. (evt.state === LiveChannelConnectionState.Connected || evt.state === LiveChannelConnectionState.Pending) &&
  162. evt.message
  163. ) {
  164. this.process(evt.message);
  165. }
  166. };
  167. private process = (msg: DataFrameJSON) => {
  168. const packetInfo = this.frameBuffer.push(msg);
  169. if (packetInfo.schemaChanged) {
  170. this.stream.next({
  171. type: InternalStreamMessageType.ChangedSchema,
  172. });
  173. } else {
  174. this.stream.next({
  175. type: InternalStreamMessageType.NewValuesSameSchema,
  176. values: this.frameBuffer.getValuesFromLastPacket(),
  177. });
  178. }
  179. };
  180. private resizeBuffer = (bufferOptions: StreamingFrameOptions) => {
  181. if (bufferOptions && this.frameBuffer.needsResizing(bufferOptions)) {
  182. this.frameBuffer.resize(bufferOptions);
  183. }
  184. };
  185. private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => {
  186. if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) {
  187. // will skip initial frames from subsequent subscribers
  188. this.process(options.frame);
  189. }
  190. };
  191. private clearShutdownTimeout = () => {
  192. if (this.shutdownTimeoutId) {
  193. clearTimeout(this.shutdownTimeoutId);
  194. this.shutdownTimeoutId = undefined;
  195. }
  196. };
  197. get = (options: LiveDataStreamOptions, subKey: DataStreamSubscriptionKey): Observable<StreamingDataQueryResponse> => {
  198. this.clearShutdownTimeout();
  199. const buffer = getStreamingFrameOptions(options.buffer);
  200. this.resizeBuffer(buffer);
  201. this.prepareInternalStreamForNewSubscription(options);
  202. const shouldSendLastPacketOnly = options?.buffer?.action === StreamingFrameAction.Replace;
  203. const fieldsNamesFilter = options.filter?.fields;
  204. const dataNeedsFiltering = fieldsNamesFilter?.length;
  205. const fieldFilterPredicate = dataNeedsFiltering ? ({ name }: Field) => fieldsNamesFilter.includes(name) : undefined;
  206. let matchingFieldIndexes: number[] | undefined = undefined;
  207. const getFullFrameResponseData = <T>(
  208. messages: InternalStreamMessage[],
  209. error?: DataQueryError
  210. ): StreamingDataQueryResponse => {
  211. matchingFieldIndexes = fieldFilterPredicate
  212. ? this.frameBuffer.getMatchingFieldIndexes(fieldFilterPredicate)
  213. : undefined;
  214. if (!shouldSendLastPacketOnly) {
  215. return {
  216. key: subKey,
  217. state: error ? LoadingState.Error : LoadingState.Streaming,
  218. data: [
  219. {
  220. type: StreamingResponseDataType.FullFrame,
  221. frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer),
  222. },
  223. ],
  224. error,
  225. };
  226. }
  227. if (error) {
  228. // send empty frame with error
  229. return {
  230. key: subKey,
  231. state: LoadingState.Error,
  232. data: [
  233. {
  234. type: StreamingResponseDataType.FullFrame,
  235. frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, { maxLength: 0 }),
  236. },
  237. ],
  238. error,
  239. };
  240. }
  241. if (!messages.length) {
  242. console.warn(`expected to find at least one non error message ${messages.map(({ type }) => type)}`);
  243. // send empty frame
  244. return {
  245. key: subKey,
  246. state: LoadingState.Streaming,
  247. data: [
  248. {
  249. type: StreamingResponseDataType.FullFrame,
  250. frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, { maxLength: 0 }),
  251. },
  252. ],
  253. error,
  254. };
  255. }
  256. return {
  257. key: subKey,
  258. state: LoadingState.Streaming,
  259. data: [
  260. {
  261. type: StreamingResponseDataType.FullFrame,
  262. frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, {
  263. maxLength: this.frameBuffer.packetInfo.length,
  264. }),
  265. },
  266. ],
  267. error,
  268. };
  269. };
  270. const getNewValuesSameSchemaResponseData = (
  271. messages: Array<InternalStreamMessage<InternalStreamMessageType.NewValuesSameSchema>>
  272. ): StreamingDataQueryResponse => {
  273. const lastMessage = messages.length ? messages[messages.length - 1] : undefined;
  274. const values =
  275. shouldSendLastPacketOnly && lastMessage
  276. ? lastMessage.values
  277. : reduceNewValuesSameSchemaMessages(messages).values;
  278. const filteredValues = matchingFieldIndexes
  279. ? values.filter((v, i) => (matchingFieldIndexes as number[]).includes(i))
  280. : values;
  281. return {
  282. key: subKey,
  283. state: LoadingState.Streaming,
  284. data: [
  285. {
  286. type: StreamingResponseDataType.NewValuesSameSchema,
  287. values: filteredValues,
  288. },
  289. ],
  290. };
  291. };
  292. let shouldSendFullFrame = true;
  293. const transformedInternalStream = this.stream.pipe(
  294. bufferIfNot(this.deps.subscriberReadiness),
  295. map((messages, i) => {
  296. const errors = filterMessages(messages, InternalStreamMessageType.Error);
  297. const lastError = errors.length ? errors[errors.length - 1].error : undefined;
  298. if (shouldSendFullFrame) {
  299. shouldSendFullFrame = false;
  300. return getFullFrameResponseData(messages, lastError);
  301. }
  302. if (errors.length) {
  303. // send the latest frame with the last error, discard everything else
  304. return getFullFrameResponseData(messages, lastError);
  305. }
  306. const schemaChanged = messages.some((n) => n.type === InternalStreamMessageType.ChangedSchema);
  307. if (schemaChanged) {
  308. // send the latest frame, discard intermediate appends
  309. return getFullFrameResponseData(messages, undefined);
  310. }
  311. const newValueSameSchemaMessages = filterMessages(messages, InternalStreamMessageType.NewValuesSameSchema);
  312. if (newValueSameSchemaMessages.length !== messages.length) {
  313. console.warn(`unsupported message type ${messages.map(({ type }) => type)}`);
  314. }
  315. return getNewValuesSameSchemaResponseData(newValueSameSchemaMessages);
  316. })
  317. );
  318. return new Observable<StreamingDataQueryResponse>((subscriber) => {
  319. const sub = transformedInternalStream.subscribe({
  320. next: (n) => {
  321. subscriber.next(n);
  322. },
  323. error: (err) => {
  324. subscriber.error(err);
  325. },
  326. complete: () => {
  327. subscriber.complete();
  328. },
  329. });
  330. return () => {
  331. // TODO: potentially resize (downsize) the buffer on unsubscribe
  332. sub.unsubscribe();
  333. if (!this.stream.observed) {
  334. this.clearShutdownTimeout();
  335. this.shutdownTimeoutId = setTimeout(this.shutdownIfNoSubscribers, this.deps.shutdownDelayInMs);
  336. }
  337. };
  338. });
  339. };
  340. }