service.ts 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import Centrifuge from 'centrifuge/dist/centrifuge';
  2. import { BehaviorSubject, Observable, share, startWith } from 'rxjs';
  3. import {
  4. DataQueryError,
  5. DataQueryResponse,
  6. LiveChannelAddress,
  7. LiveChannelConnectionState,
  8. LiveChannelId,
  9. toLiveChannelId,
  10. } from '@grafana/data';
  11. import { FetchResponse } from '@grafana/runtime/src/services/backendSrv';
  12. import {
  13. GrafanaLiveSrv,
  14. LiveDataStreamOptions,
  15. LiveQueryDataOptions,
  16. StreamingFrameAction,
  17. StreamingFrameOptions,
  18. } from '@grafana/runtime/src/services/live';
  19. import { BackendDataSourceResponse } from '@grafana/runtime/src/utils/queryResponse';
  20. import { StreamingResponseData } from '../data/utils';
  21. import { LiveDataStream } from './LiveDataStream';
  22. import { CentrifugeLiveChannel } from './channel';
  23. export type CentrifugeSrvDeps = {
  24. appUrl: string;
  25. orgId: number;
  26. orgRole: string;
  27. sessionId: string;
  28. liveEnabled: boolean;
  29. dataStreamSubscriberReadiness: Observable<boolean>;
  30. };
  31. export type StreamingDataQueryResponse = Omit<DataQueryResponse, 'data'> & { data: [StreamingResponseData] };
  32. export type CentrifugeSrv = Omit<GrafanaLiveSrv, 'publish' | 'getDataStream' | 'getQueryData'> & {
  33. getDataStream: (options: LiveDataStreamOptions) => Observable<StreamingDataQueryResponse>;
  34. getQueryData: (
  35. options: LiveQueryDataOptions
  36. ) => Promise<
  37. | { data: BackendDataSourceResponse | undefined }
  38. | FetchResponse<BackendDataSourceResponse | undefined>
  39. | DataQueryError
  40. >;
  41. };
  42. export type DataStreamSubscriptionKey = string;
  43. const defaultStreamingFrameOptions: Readonly<StreamingFrameOptions> = {
  44. maxLength: 100,
  45. maxDelta: Infinity,
  46. action: StreamingFrameAction.Append,
  47. };
  48. const dataStreamShutdownDelayInMs = 5000;
  49. export class CentrifugeService implements CentrifugeSrv {
  50. readonly open = new Map<string, CentrifugeLiveChannel>();
  51. private readonly liveDataStreamByChannelId: Record<LiveChannelId, LiveDataStream> = {};
  52. readonly centrifuge: Centrifuge;
  53. readonly connectionState: BehaviorSubject<boolean>;
  54. readonly connectionBlocker: Promise<void>;
  55. private readonly dataStreamSubscriberReadiness: Observable<boolean>;
  56. constructor(private deps: CentrifugeSrvDeps) {
  57. this.dataStreamSubscriberReadiness = deps.dataStreamSubscriberReadiness.pipe(share(), startWith(true));
  58. const liveUrl = `${deps.appUrl.replace(/^http/, 'ws')}/api/live/ws`;
  59. this.centrifuge = new Centrifuge(liveUrl, {
  60. timeout: 30000,
  61. });
  62. this.centrifuge.setConnectData({
  63. sessionId: deps.sessionId,
  64. orgId: deps.orgId,
  65. });
  66. // orgRole is set when logged in *or* anonomus users can use grafana
  67. if (deps.liveEnabled && deps.orgRole !== '') {
  68. this.centrifuge.connect(); // do connection
  69. }
  70. this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
  71. this.connectionBlocker = new Promise<void>((resolve) => {
  72. if (this.centrifuge.isConnected()) {
  73. return resolve();
  74. }
  75. const connectListener = () => {
  76. resolve();
  77. this.centrifuge.removeListener('connect', connectListener);
  78. };
  79. this.centrifuge.addListener('connect', connectListener);
  80. });
  81. // Register global listeners
  82. this.centrifuge.on('connect', this.onConnect);
  83. this.centrifuge.on('disconnect', this.onDisconnect);
  84. this.centrifuge.on('publish', this.onServerSideMessage);
  85. }
  86. //----------------------------------------------------------
  87. // Internal functions
  88. //----------------------------------------------------------
  89. private onConnect = (context: any) => {
  90. this.connectionState.next(true);
  91. };
  92. private onDisconnect = (context: any) => {
  93. this.connectionState.next(false);
  94. };
  95. private onServerSideMessage = (context: any) => {
  96. console.log('Publication from server-side channel', context);
  97. };
  98. /**
  99. * Get a channel. If the scope, namespace, or path is invalid, a shutdown
  100. * channel will be returned with an error state indicated in its status
  101. */
  102. private getChannel<TMessage>(addr: LiveChannelAddress): CentrifugeLiveChannel<TMessage> {
  103. const id = `${this.deps.orgId}/${addr.scope}/${addr.namespace}/${addr.path}`;
  104. let channel = this.open.get(id);
  105. if (channel != null) {
  106. return channel;
  107. }
  108. channel = new CentrifugeLiveChannel(id, addr);
  109. if (channel.currentStatus.state === LiveChannelConnectionState.Invalid) {
  110. return channel;
  111. }
  112. channel.shutdownCallback = () => {
  113. this.open.delete(id); // remove it from the list of open channels
  114. };
  115. this.open.set(id, channel);
  116. // Initialize the channel in the background
  117. this.initChannel(channel).catch((err) => {
  118. if (channel) {
  119. channel.currentStatus.state = LiveChannelConnectionState.Invalid;
  120. channel.shutdownWithError(err);
  121. }
  122. this.open.delete(id);
  123. });
  124. // return the not-yet initialized channel
  125. return channel;
  126. }
  127. private async initChannel(channel: CentrifugeLiveChannel): Promise<void> {
  128. const events = channel.initalize();
  129. if (!this.centrifuge.isConnected()) {
  130. await this.connectionBlocker;
  131. }
  132. channel.subscription = this.centrifuge.subscribe(channel.id, events, { data: channel.addr.data });
  133. return;
  134. }
  135. //----------------------------------------------------------
  136. // Exported functions
  137. //----------------------------------------------------------
  138. /**
  139. * Listen for changes to the connection state
  140. */
  141. getConnectionState = () => {
  142. return this.connectionState.asObservable();
  143. };
  144. /**
  145. * Watch for messages in a channel
  146. */
  147. getStream: CentrifugeSrv['getStream'] = <T>(address: LiveChannelAddress) => {
  148. return this.getChannel<T>(address).getStream();
  149. };
  150. private createSubscriptionKey = (options: LiveDataStreamOptions): DataStreamSubscriptionKey =>
  151. options.key ?? `xstr/${streamCounter++}`;
  152. private getLiveDataStream = (options: LiveDataStreamOptions): LiveDataStream => {
  153. const channelId = toLiveChannelId(options.addr);
  154. const existingStream = this.liveDataStreamByChannelId[channelId];
  155. if (existingStream) {
  156. return existingStream;
  157. }
  158. const channel = this.getChannel(options.addr);
  159. this.liveDataStreamByChannelId[channelId] = new LiveDataStream({
  160. channelId,
  161. onShutdown: () => {
  162. delete this.liveDataStreamByChannelId[channelId];
  163. },
  164. liveEventsObservable: channel.getStream(),
  165. subscriberReadiness: this.dataStreamSubscriberReadiness,
  166. defaultStreamingFrameOptions,
  167. shutdownDelayInMs: dataStreamShutdownDelayInMs,
  168. });
  169. return this.liveDataStreamByChannelId[channelId];
  170. };
  171. /**
  172. * Connect to a channel and return results as DataFrames
  173. */
  174. getDataStream: CentrifugeSrv['getDataStream'] = (options) => {
  175. const subscriptionKey = this.createSubscriptionKey(options);
  176. const stream = this.getLiveDataStream(options);
  177. return stream.get(options, subscriptionKey);
  178. };
  179. /**
  180. * Executes a query over the live websocket. Query response can contain live channels we can subscribe to for further updates
  181. *
  182. * Since the initial request and subscription are on the same socket, this will support HA setups
  183. */
  184. getQueryData: CentrifugeSrv['getQueryData'] = async (options) => {
  185. if (!this.centrifuge.isConnected()) {
  186. await this.connectionBlocker;
  187. }
  188. return this.centrifuge.namedRPC('grafana.query', options.body);
  189. };
  190. /**
  191. * For channels that support presence, this will request the current state from the server.
  192. *
  193. * Join and leave messages will be sent to the open stream
  194. */
  195. getPresence: CentrifugeSrv['getPresence'] = (address) => {
  196. return this.getChannel(address).getPresence();
  197. };
  198. }
  199. // This is used to give a unique key for each stream. The actual value does not matter
  200. let streamCounter = 0;