remoteObservable.ts 1.4 KB

123456789101112131415161718192021222324252627282930313233
  1. import * as comlink from 'comlink';
  2. import { from, Observable, switchMap } from 'rxjs';
  3. export const remoteObservableAsObservable = <T>(remoteObs: comlink.RemoteObject<Observable<T>>): Observable<T> =>
  4. new Observable((subscriber) => {
  5. // Passing the callbacks as 3 separate arguments is deprecated, but it's the only option for now
  6. //
  7. // RxJS recreates the functions via `Function.bind` https://github.com/ReactiveX/rxjs/blob/62aca850a37f598b5db6085661e0594b81ec4281/src/internal/Subscriber.ts#L169
  8. // and thus erases the ProxyMarker created via comlink.proxy(fN) when the callbacks
  9. // are grouped together in a Observer object (ie. { next: (v) => ..., error: (err) => ..., complete: () => ... })
  10. //
  11. // solution: TBD (autoproxy all functions?)
  12. const remoteSubPromise = remoteObs.subscribe(
  13. comlink.proxy((nextValueInRemoteObs: T) => {
  14. subscriber.next(nextValueInRemoteObs);
  15. }),
  16. comlink.proxy((err) => {
  17. subscriber.error(err);
  18. }),
  19. comlink.proxy(() => {
  20. subscriber.complete();
  21. })
  22. );
  23. return {
  24. unsubscribe: () => {
  25. remoteSubPromise.then((remoteSub) => remoteSub.unsubscribe());
  26. },
  27. };
  28. });
  29. export const promiseWithRemoteObservableAsObservable = <T>(
  30. promiseWithProxyObservable: Promise<comlink.RemoteObject<Observable<T>>>
  31. ): Observable<T> => from(promiseWithProxyObservable).pipe(switchMap((val) => remoteObservableAsObservable(val)));