123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- import { map, Observable, ReplaySubject, Subject, Subscriber, Subscription } from 'rxjs';
- import {
- DataFrameJSON,
- DataQueryError,
- Field,
- isLiveChannelMessageEvent,
- isLiveChannelStatusEvent,
- LiveChannelConnectionState,
- LiveChannelEvent,
- LiveChannelId,
- LoadingState,
- } from '@grafana/data';
- import { LiveDataStreamOptions, StreamingFrameAction, StreamingFrameOptions } from '@grafana/runtime/src/services/live';
- import { toDataQueryError } from '@grafana/runtime/src/utils/toDataQueryError';
- import { getStreamingFrameOptions, StreamingDataFrame } from '../data/StreamingDataFrame';
- import { StreamingResponseDataType } from '../data/utils';
- import { DataStreamSubscriptionKey, StreamingDataQueryResponse } from './service';
- const bufferIfNot =
- (canEmitObservable: Observable<boolean>) =>
- <T>(source: Observable<T>): Observable<T[]> => {
- return new Observable((subscriber: Subscriber<T[]>) => {
- let buffer: T[] = [];
- let canEmit = true;
- const emitBuffer = () => {
- subscriber.next(buffer);
- buffer = [];
- };
- const canEmitSub = canEmitObservable.subscribe({
- next: (val) => {
- canEmit = val;
- if (canEmit && buffer.length) {
- emitBuffer();
- }
- },
- });
- const sourceSub = source.subscribe({
- next(value) {
- if (canEmit) {
- if (!buffer.length) {
- subscriber.next([value]);
- } else {
- emitBuffer();
- }
- } else {
- buffer.push(value);
- }
- },
- error(error) {
- subscriber.error(error);
- },
- complete() {
- subscriber.complete();
- },
- });
- return () => {
- sourceSub.unsubscribe();
- canEmitSub.unsubscribe();
- };
- });
- };
- export type DataStreamHandlerDeps<T> = {
- channelId: LiveChannelId;
- liveEventsObservable: Observable<LiveChannelEvent<T>>;
- onShutdown: () => void;
- subscriberReadiness: Observable<boolean>;
- defaultStreamingFrameOptions: Readonly<StreamingFrameOptions>;
- shutdownDelayInMs: number;
- };
- enum InternalStreamMessageType {
- Error,
- NewValuesSameSchema,
- ChangedSchema,
- }
- type InternalStreamMessageTypeToData = {
- [InternalStreamMessageType.Error]: {
- error: DataQueryError;
- };
- [InternalStreamMessageType.ChangedSchema]: {};
- [InternalStreamMessageType.NewValuesSameSchema]: {
- values: unknown[][];
- };
- };
- type InternalStreamMessage<T = InternalStreamMessageType> = T extends InternalStreamMessageType
- ? {
- type: T;
- } & InternalStreamMessageTypeToData[T]
- : never;
- const reduceNewValuesSameSchemaMessages = (
- packets: Array<InternalStreamMessage<InternalStreamMessageType.NewValuesSameSchema>>
- ) => ({
- values: packets.reduce((acc, { values }) => {
- for (let i = 0; i < values.length; i++) {
- if (!acc[i]) {
- acc[i] = [];
- }
- for (let j = 0; j < values[i].length; j++) {
- acc[i].push(values[i][j]);
- }
- }
- return acc;
- }, [] as unknown[][]),
- type: InternalStreamMessageType.NewValuesSameSchema,
- });
- const filterMessages = <T extends InternalStreamMessageType>(
- packets: InternalStreamMessage[],
- type: T
- ): Array<InternalStreamMessage<T>> => packets.filter((p) => p.type === type) as Array<InternalStreamMessage<T>>;
- export class LiveDataStream<T = unknown> {
- private frameBuffer: StreamingDataFrame;
- private liveEventsSubscription: Subscription;
- private stream: Subject<InternalStreamMessage> = new ReplaySubject(1);
- private shutdownTimeoutId: ReturnType<typeof setTimeout> | undefined;
- constructor(private deps: DataStreamHandlerDeps<T>) {
- this.frameBuffer = StreamingDataFrame.empty(deps.defaultStreamingFrameOptions);
- this.liveEventsSubscription = deps.liveEventsObservable.subscribe({
- error: this.onError,
- complete: this.onComplete,
- next: this.onNext,
- });
- }
- private shutdown = () => {
- this.stream.complete();
- this.liveEventsSubscription.unsubscribe();
- this.deps.onShutdown();
- };
- private shutdownIfNoSubscribers = () => {
- if (!this.stream.observed) {
- this.shutdown();
- }
- };
- private onError = (err: any) => {
- console.log('LiveQuery [error]', { err }, this.deps.channelId);
- this.stream.next({
- type: InternalStreamMessageType.Error,
- error: toDataQueryError(err),
- });
- this.shutdown();
- };
- private onComplete = () => {
- console.log('LiveQuery [complete]', this.deps.channelId);
- this.shutdown();
- };
- private onNext = (evt: LiveChannelEvent) => {
- if (isLiveChannelMessageEvent(evt)) {
- this.process(evt.message);
- return;
- }
- const liveChannelStatusEvent = isLiveChannelStatusEvent(evt);
- if (liveChannelStatusEvent && evt.error) {
- this.stream.next({
- type: InternalStreamMessageType.Error,
- error: {
- ...toDataQueryError(evt.error),
- message: `Streaming channel error: ${evt.error.message}`,
- },
- });
- return;
- }
- if (
- liveChannelStatusEvent &&
- (evt.state === LiveChannelConnectionState.Connected || evt.state === LiveChannelConnectionState.Pending) &&
- evt.message
- ) {
- this.process(evt.message);
- }
- };
- private process = (msg: DataFrameJSON) => {
- const packetInfo = this.frameBuffer.push(msg);
- if (packetInfo.schemaChanged) {
- this.stream.next({
- type: InternalStreamMessageType.ChangedSchema,
- });
- } else {
- this.stream.next({
- type: InternalStreamMessageType.NewValuesSameSchema,
- values: this.frameBuffer.getValuesFromLastPacket(),
- });
- }
- };
- private resizeBuffer = (bufferOptions: StreamingFrameOptions) => {
- if (bufferOptions && this.frameBuffer.needsResizing(bufferOptions)) {
- this.frameBuffer.resize(bufferOptions);
- }
- };
- private prepareInternalStreamForNewSubscription = (options: LiveDataStreamOptions): void => {
- if (!this.frameBuffer.hasAtLeastOnePacket() && options.frame) {
- // will skip initial frames from subsequent subscribers
- this.process(options.frame);
- }
- };
- private clearShutdownTimeout = () => {
- if (this.shutdownTimeoutId) {
- clearTimeout(this.shutdownTimeoutId);
- this.shutdownTimeoutId = undefined;
- }
- };
- get = (options: LiveDataStreamOptions, subKey: DataStreamSubscriptionKey): Observable<StreamingDataQueryResponse> => {
- this.clearShutdownTimeout();
- const buffer = getStreamingFrameOptions(options.buffer);
- this.resizeBuffer(buffer);
- this.prepareInternalStreamForNewSubscription(options);
- const shouldSendLastPacketOnly = options?.buffer?.action === StreamingFrameAction.Replace;
- const fieldsNamesFilter = options.filter?.fields;
- const dataNeedsFiltering = fieldsNamesFilter?.length;
- const fieldFilterPredicate = dataNeedsFiltering ? ({ name }: Field) => fieldsNamesFilter.includes(name) : undefined;
- let matchingFieldIndexes: number[] | undefined = undefined;
- const getFullFrameResponseData = <T>(
- messages: InternalStreamMessage[],
- error?: DataQueryError
- ): StreamingDataQueryResponse => {
- matchingFieldIndexes = fieldFilterPredicate
- ? this.frameBuffer.getMatchingFieldIndexes(fieldFilterPredicate)
- : undefined;
- if (!shouldSendLastPacketOnly) {
- return {
- key: subKey,
- state: error ? LoadingState.Error : LoadingState.Streaming,
- data: [
- {
- type: StreamingResponseDataType.FullFrame,
- frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer),
- },
- ],
- error,
- };
- }
- if (error) {
- // send empty frame with error
- return {
- key: subKey,
- state: LoadingState.Error,
- data: [
- {
- type: StreamingResponseDataType.FullFrame,
- frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, { maxLength: 0 }),
- },
- ],
- error,
- };
- }
- if (!messages.length) {
- console.warn(`expected to find at least one non error message ${messages.map(({ type }) => type)}`);
- // send empty frame
- return {
- key: subKey,
- state: LoadingState.Streaming,
- data: [
- {
- type: StreamingResponseDataType.FullFrame,
- frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, { maxLength: 0 }),
- },
- ],
- error,
- };
- }
- return {
- key: subKey,
- state: LoadingState.Streaming,
- data: [
- {
- type: StreamingResponseDataType.FullFrame,
- frame: this.frameBuffer.serialize(fieldFilterPredicate, buffer, {
- maxLength: this.frameBuffer.packetInfo.length,
- }),
- },
- ],
- error,
- };
- };
- const getNewValuesSameSchemaResponseData = (
- messages: Array<InternalStreamMessage<InternalStreamMessageType.NewValuesSameSchema>>
- ): StreamingDataQueryResponse => {
- const lastMessage = messages.length ? messages[messages.length - 1] : undefined;
- const values =
- shouldSendLastPacketOnly && lastMessage
- ? lastMessage.values
- : reduceNewValuesSameSchemaMessages(messages).values;
- const filteredValues = matchingFieldIndexes
- ? values.filter((v, i) => (matchingFieldIndexes as number[]).includes(i))
- : values;
- return {
- key: subKey,
- state: LoadingState.Streaming,
- data: [
- {
- type: StreamingResponseDataType.NewValuesSameSchema,
- values: filteredValues,
- },
- ],
- };
- };
- let shouldSendFullFrame = true;
- const transformedInternalStream = this.stream.pipe(
- bufferIfNot(this.deps.subscriberReadiness),
- map((messages, i) => {
- const errors = filterMessages(messages, InternalStreamMessageType.Error);
- const lastError = errors.length ? errors[errors.length - 1].error : undefined;
- if (shouldSendFullFrame) {
- shouldSendFullFrame = false;
- return getFullFrameResponseData(messages, lastError);
- }
- if (errors.length) {
- // send the latest frame with the last error, discard everything else
- return getFullFrameResponseData(messages, lastError);
- }
- const schemaChanged = messages.some((n) => n.type === InternalStreamMessageType.ChangedSchema);
- if (schemaChanged) {
- // send the latest frame, discard intermediate appends
- return getFullFrameResponseData(messages, undefined);
- }
- const newValueSameSchemaMessages = filterMessages(messages, InternalStreamMessageType.NewValuesSameSchema);
- if (newValueSameSchemaMessages.length !== messages.length) {
- console.warn(`unsupported message type ${messages.map(({ type }) => type)}`);
- }
- return getNewValuesSameSchemaResponseData(newValueSameSchemaMessages);
- })
- );
- return new Observable<StreamingDataQueryResponse>((subscriber) => {
- const sub = transformedInternalStream.subscribe({
- next: (n) => {
- subscriber.next(n);
- },
- error: (err) => {
- subscriber.error(err);
- },
- complete: () => {
- subscriber.complete();
- },
- });
- return () => {
- // TODO: potentially resize (downsize) the buffer on unsubscribe
- sub.unsubscribe();
- if (!this.stream.observed) {
- this.clearShutdownTimeout();
- this.shutdownTimeoutId = setTimeout(this.shutdownIfNoSubscribers, this.deps.shutdownDelayInMs);
- }
- };
- });
- };
- }
|