serviceWorkerProxy.ts 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import './transferHandlers';
  2. import * as comlink from 'comlink';
  3. import { asyncScheduler, Observable, observeOn } from 'rxjs';
  4. import { LiveChannelAddress, LiveChannelEvent } from '@grafana/data';
  5. import { createWorker } from './createCentrifugeServiceWorker';
  6. import { promiseWithRemoteObservableAsObservable } from './remoteObservable';
  7. import { CentrifugeSrv, CentrifugeSrvDeps } from './service';
  8. import { RemoteCentrifugeService } from './service.worker';
  9. export class CentrifugeServiceWorkerProxy implements CentrifugeSrv {
  10. private centrifugeWorker;
  11. constructor(deps: CentrifugeSrvDeps) {
  12. this.centrifugeWorker = comlink.wrap<RemoteCentrifugeService>(createWorker() as comlink.Endpoint);
  13. this.centrifugeWorker.initialize(deps, comlink.proxy(deps.dataStreamSubscriberReadiness));
  14. }
  15. getConnectionState: CentrifugeSrv['getConnectionState'] = () => {
  16. return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getConnectionState());
  17. };
  18. getDataStream: CentrifugeSrv['getDataStream'] = (options) => {
  19. return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getDataStream(options)).pipe(
  20. // async scheduler splits the synchronous task of deserializing data from web worker and
  21. // consuming the message (ie. updating react component) into two to avoid blocking the event loop
  22. observeOn(asyncScheduler)
  23. );
  24. };
  25. /**
  26. * Query over websocket
  27. */
  28. getQueryData: CentrifugeSrv['getQueryData'] = async (options) => {
  29. const optionsAsPlainSerializableObject = JSON.parse(JSON.stringify(options));
  30. return this.centrifugeWorker.getQueryData(optionsAsPlainSerializableObject);
  31. };
  32. getPresence: CentrifugeSrv['getPresence'] = (address) => {
  33. return this.centrifugeWorker.getPresence(address);
  34. };
  35. getStream: CentrifugeSrv['getStream'] = <T>(address: LiveChannelAddress) => {
  36. return promiseWithRemoteObservableAsObservable(
  37. this.centrifugeWorker.getStream(address) as Promise<comlink.Remote<Observable<LiveChannelEvent<T>>>>
  38. );
  39. };
  40. }