channel.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. import Centrifuge, {
  2. JoinLeaveContext,
  3. PublicationContext,
  4. SubscribeErrorContext,
  5. SubscribeSuccessContext,
  6. SubscriptionEvents,
  7. UnsubscribeContext,
  8. } from 'centrifuge/dist/centrifuge';
  9. import { Subject, of, Observable } from 'rxjs';
  10. import {
  11. LiveChannelStatusEvent,
  12. LiveChannelEvent,
  13. LiveChannelEventType,
  14. LiveChannelConnectionState,
  15. LiveChannelPresenceStatus,
  16. LiveChannelAddress,
  17. DataFrameJSON,
  18. isValidLiveChannelAddress,
  19. } from '@grafana/data';
  20. /**
  21. * Internal class that maps Centrifuge support to GrafanaLive
  22. */
  23. export class CentrifugeLiveChannel<T = any> {
  24. readonly currentStatus: LiveChannelStatusEvent;
  25. readonly opened = Date.now();
  26. readonly id: string;
  27. readonly addr: LiveChannelAddress;
  28. readonly stream = new Subject<LiveChannelEvent<T>>();
  29. // Hold on to the last header with schema
  30. lastMessageWithSchema?: DataFrameJSON;
  31. subscription?: Centrifuge.Subscription;
  32. shutdownCallback?: () => void;
  33. initalized?: boolean;
  34. constructor(id: string, addr: LiveChannelAddress) {
  35. this.id = id;
  36. this.addr = addr;
  37. this.currentStatus = {
  38. type: LiveChannelEventType.Status,
  39. id,
  40. timestamp: this.opened,
  41. state: LiveChannelConnectionState.Pending,
  42. };
  43. if (!isValidLiveChannelAddress(addr)) {
  44. this.currentStatus.state = LiveChannelConnectionState.Invalid;
  45. this.currentStatus.error = 'invalid channel address';
  46. }
  47. }
  48. // This should only be called when centrifuge is connected
  49. initalize(): SubscriptionEvents {
  50. if (this.initalized) {
  51. throw new Error('Channel already initalized: ' + this.id);
  52. }
  53. this.initalized = true;
  54. const events: SubscriptionEvents = {
  55. // Called when a message is received from the socket
  56. publish: (ctx: PublicationContext) => {
  57. try {
  58. if (ctx.data) {
  59. if (ctx.data.schema) {
  60. this.lastMessageWithSchema = ctx.data as DataFrameJSON;
  61. }
  62. this.stream.next({
  63. type: LiveChannelEventType.Message,
  64. message: ctx.data,
  65. });
  66. }
  67. // Clear any error messages
  68. if (this.currentStatus.error) {
  69. this.currentStatus.timestamp = Date.now();
  70. delete this.currentStatus.error;
  71. this.sendStatus();
  72. }
  73. } catch (err) {
  74. console.log('publish error', this.addr, err);
  75. this.currentStatus.error = err;
  76. this.currentStatus.timestamp = Date.now();
  77. this.sendStatus();
  78. }
  79. },
  80. error: (ctx: SubscribeErrorContext) => {
  81. this.currentStatus.timestamp = Date.now();
  82. this.currentStatus.error = ctx.message;
  83. this.sendStatus();
  84. },
  85. subscribe: (ctx: SubscribeSuccessContext) => {
  86. this.currentStatus.timestamp = Date.now();
  87. this.currentStatus.state = LiveChannelConnectionState.Connected;
  88. delete this.currentStatus.error;
  89. if (ctx.data?.schema) {
  90. this.lastMessageWithSchema = ctx.data as DataFrameJSON;
  91. }
  92. this.sendStatus(ctx.data);
  93. },
  94. unsubscribe: (ctx: UnsubscribeContext) => {
  95. this.currentStatus.timestamp = Date.now();
  96. this.currentStatus.state = LiveChannelConnectionState.Disconnected;
  97. this.sendStatus();
  98. },
  99. };
  100. events.join = (ctx: JoinLeaveContext) => {
  101. this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user });
  102. };
  103. events.leave = (ctx: JoinLeaveContext) => {
  104. this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
  105. };
  106. return events;
  107. }
  108. private sendStatus(message?: any) {
  109. const copy = { ...this.currentStatus };
  110. if (message) {
  111. copy.message = message;
  112. }
  113. this.stream.next(copy);
  114. }
  115. disconnectIfNoListeners = () => {
  116. const count = this.stream.observers.length;
  117. if (count === 0) {
  118. this.disconnect();
  119. }
  120. };
  121. /**
  122. * Get the stream of events and
  123. */
  124. getStream() {
  125. return new Observable((subscriber) => {
  126. const initialMessage = { ...this.currentStatus };
  127. if (this.lastMessageWithSchema?.schema) {
  128. // send just schema instead of schema+data to avoid having data gaps
  129. initialMessage.message = { schema: this.lastMessageWithSchema?.schema };
  130. }
  131. subscriber.next({ ...this.currentStatus, message: this.lastMessageWithSchema });
  132. const sub = this.stream.subscribe(subscriber);
  133. return () => {
  134. sub.unsubscribe();
  135. const count = this.stream.observers.length;
  136. // Wait 1/4 second to fully disconnect
  137. if (count === 0) {
  138. setTimeout(this.disconnectIfNoListeners, 250);
  139. }
  140. };
  141. }) as Observable<LiveChannelEvent<T>>;
  142. }
  143. /**
  144. * This is configured by the server when the config supports presence
  145. */
  146. async getPresence(): Promise<LiveChannelPresenceStatus> {
  147. if (!this.subscription) {
  148. return Promise.reject('not subscribed');
  149. }
  150. return this.subscription!.presence().then((v) => {
  151. return {
  152. users: Object.keys(v.presence),
  153. };
  154. });
  155. }
  156. /**
  157. * This will close and terminate all streams for this channel
  158. */
  159. disconnect() {
  160. this.currentStatus.state = LiveChannelConnectionState.Shutdown;
  161. this.currentStatus.timestamp = Date.now();
  162. if (this.subscription) {
  163. this.subscription.unsubscribe();
  164. this.subscription.removeAllListeners(); // they keep all listeners attached after unsubscribe
  165. this.subscription = undefined;
  166. }
  167. this.stream.complete();
  168. this.stream.next({ ...this.currentStatus });
  169. this.stream.complete();
  170. if (this.shutdownCallback) {
  171. this.shutdownCallback();
  172. }
  173. }
  174. shutdownWithError(err: string) {
  175. this.currentStatus.error = err;
  176. this.sendStatus();
  177. this.disconnect();
  178. }
  179. }
  180. export function getErrorChannel<TMessage>(msg: string, id: string, addr: LiveChannelAddress) {
  181. return {
  182. id,
  183. opened: Date.now(),
  184. addr,
  185. // return an error
  186. getStream: () =>
  187. of({
  188. type: LiveChannelEventType.Status,
  189. id,
  190. timestamp: Date.now(),
  191. state: LiveChannelConnectionState.Invalid,
  192. error: msg,
  193. }),
  194. // already disconnected
  195. disconnect: () => {},
  196. };
  197. }