123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import Centrifuge, {
- JoinLeaveContext,
- PublicationContext,
- SubscribeErrorContext,
- SubscribeSuccessContext,
- SubscriptionEvents,
- UnsubscribeContext,
- } from 'centrifuge/dist/centrifuge';
- import { Subject, of, Observable } from 'rxjs';
- import {
- LiveChannelStatusEvent,
- LiveChannelEvent,
- LiveChannelEventType,
- LiveChannelConnectionState,
- LiveChannelPresenceStatus,
- LiveChannelAddress,
- DataFrameJSON,
- isValidLiveChannelAddress,
- } from '@grafana/data';
- /**
- * Internal class that maps Centrifuge support to GrafanaLive
- */
- export class CentrifugeLiveChannel<T = any> {
- readonly currentStatus: LiveChannelStatusEvent;
- readonly opened = Date.now();
- readonly id: string;
- readonly addr: LiveChannelAddress;
- readonly stream = new Subject<LiveChannelEvent<T>>();
- // Hold on to the last header with schema
- lastMessageWithSchema?: DataFrameJSON;
- subscription?: Centrifuge.Subscription;
- shutdownCallback?: () => void;
- initalized?: boolean;
- constructor(id: string, addr: LiveChannelAddress) {
- this.id = id;
- this.addr = addr;
- this.currentStatus = {
- type: LiveChannelEventType.Status,
- id,
- timestamp: this.opened,
- state: LiveChannelConnectionState.Pending,
- };
- if (!isValidLiveChannelAddress(addr)) {
- this.currentStatus.state = LiveChannelConnectionState.Invalid;
- this.currentStatus.error = 'invalid channel address';
- }
- }
- // This should only be called when centrifuge is connected
- initalize(): SubscriptionEvents {
- if (this.initalized) {
- throw new Error('Channel already initalized: ' + this.id);
- }
- this.initalized = true;
- const events: SubscriptionEvents = {
- // Called when a message is received from the socket
- publish: (ctx: PublicationContext) => {
- try {
- if (ctx.data) {
- if (ctx.data.schema) {
- this.lastMessageWithSchema = ctx.data as DataFrameJSON;
- }
- this.stream.next({
- type: LiveChannelEventType.Message,
- message: ctx.data,
- });
- }
- // Clear any error messages
- if (this.currentStatus.error) {
- this.currentStatus.timestamp = Date.now();
- delete this.currentStatus.error;
- this.sendStatus();
- }
- } catch (err) {
- console.log('publish error', this.addr, err);
- this.currentStatus.error = err;
- this.currentStatus.timestamp = Date.now();
- this.sendStatus();
- }
- },
- error: (ctx: SubscribeErrorContext) => {
- this.currentStatus.timestamp = Date.now();
- this.currentStatus.error = ctx.message;
- this.sendStatus();
- },
- subscribe: (ctx: SubscribeSuccessContext) => {
- this.currentStatus.timestamp = Date.now();
- this.currentStatus.state = LiveChannelConnectionState.Connected;
- delete this.currentStatus.error;
- if (ctx.data?.schema) {
- this.lastMessageWithSchema = ctx.data as DataFrameJSON;
- }
- this.sendStatus(ctx.data);
- },
- unsubscribe: (ctx: UnsubscribeContext) => {
- this.currentStatus.timestamp = Date.now();
- this.currentStatus.state = LiveChannelConnectionState.Disconnected;
- this.sendStatus();
- },
- };
- events.join = (ctx: JoinLeaveContext) => {
- this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user });
- };
- events.leave = (ctx: JoinLeaveContext) => {
- this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
- };
- return events;
- }
- private sendStatus(message?: any) {
- const copy = { ...this.currentStatus };
- if (message) {
- copy.message = message;
- }
- this.stream.next(copy);
- }
- disconnectIfNoListeners = () => {
- const count = this.stream.observers.length;
- if (count === 0) {
- this.disconnect();
- }
- };
- /**
- * Get the stream of events and
- */
- getStream() {
- return new Observable((subscriber) => {
- const initialMessage = { ...this.currentStatus };
- if (this.lastMessageWithSchema?.schema) {
- // send just schema instead of schema+data to avoid having data gaps
- initialMessage.message = { schema: this.lastMessageWithSchema?.schema };
- }
- subscriber.next({ ...this.currentStatus, message: this.lastMessageWithSchema });
- const sub = this.stream.subscribe(subscriber);
- return () => {
- sub.unsubscribe();
- const count = this.stream.observers.length;
- // Wait 1/4 second to fully disconnect
- if (count === 0) {
- setTimeout(this.disconnectIfNoListeners, 250);
- }
- };
- }) as Observable<LiveChannelEvent<T>>;
- }
- /**
- * This is configured by the server when the config supports presence
- */
- async getPresence(): Promise<LiveChannelPresenceStatus> {
- if (!this.subscription) {
- return Promise.reject('not subscribed');
- }
- return this.subscription!.presence().then((v) => {
- return {
- users: Object.keys(v.presence),
- };
- });
- }
- /**
- * This will close and terminate all streams for this channel
- */
- disconnect() {
- this.currentStatus.state = LiveChannelConnectionState.Shutdown;
- this.currentStatus.timestamp = Date.now();
- if (this.subscription) {
- this.subscription.unsubscribe();
- this.subscription.removeAllListeners(); // they keep all listeners attached after unsubscribe
- this.subscription = undefined;
- }
- this.stream.complete();
- this.stream.next({ ...this.currentStatus });
- this.stream.complete();
- if (this.shutdownCallback) {
- this.shutdownCallback();
- }
- }
- shutdownWithError(err: string) {
- this.currentStatus.error = err;
- this.sendStatus();
- this.disconnect();
- }
- }
- export function getErrorChannel<TMessage>(msg: string, id: string, addr: LiveChannelAddress) {
- return {
- id,
- opened: Date.now(),
- addr,
- // return an error
- getStream: () =>
- of({
- type: LiveChannelEventType.Status,
- id,
- timestamp: Date.now(),
- state: LiveChannelConnectionState.Invalid,
- error: msg,
- }),
- // already disconnected
- disconnect: () => {},
- };
- }
|