increasingInterval.ts 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import { SchedulerLike, Observable, SchedulerAction, Subscriber, asyncScheduler } from 'rxjs';
  2. /**
  3. * Creates an Observable that emits sequential numbers after increasing intervals of time
  4. * starting with `startPeriod`, ending with `endPeriod` and incrementing by `step`.
  5. */
  6. export const increasingInterval = (
  7. { startPeriod = 0, endPeriod = 5000, step = 1000 },
  8. scheduler: SchedulerLike = asyncScheduler
  9. ): Observable<number> => {
  10. return new Observable<number>((subscriber) => {
  11. const state: IntervalState = {
  12. subscriber,
  13. counter: 0,
  14. period: startPeriod,
  15. step,
  16. endPeriod,
  17. };
  18. subscriber.add(scheduler.schedule(dispatch, startPeriod, state));
  19. return subscriber;
  20. });
  21. };
  22. function dispatch(this: SchedulerAction<IntervalState>, state?: IntervalState) {
  23. if (!state) {
  24. return;
  25. }
  26. const { subscriber, counter, period, step, endPeriod } = state;
  27. subscriber.next(counter);
  28. const newPeriod = Math.min(period + step, endPeriod);
  29. this.schedule({ subscriber, counter: counter + 1, period: newPeriod, step, endPeriod }, newPeriod);
  30. }
  31. interface IntervalState {
  32. subscriber: Subscriber<number>;
  33. counter: number;
  34. period: number;
  35. endPeriod: number;
  36. step: number;
  37. }