index.js 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. 'use strict';
  2. var domain = require('domain');
  3. var eos = require('end-of-stream');
  4. var p = require('process-nextick-args');
  5. var once = require('once');
  6. var exhaust = require('stream-exhaust');
  7. var eosConfig = {
  8. error: false,
  9. };
  10. function rethrowAsync(err) {
  11. process.nextTick(rethrow);
  12. function rethrow() {
  13. throw err;
  14. }
  15. }
  16. function tryCatch(fn, args) {
  17. try {
  18. return fn.apply(null, args);
  19. } catch (err) {
  20. rethrowAsync(err);
  21. }
  22. }
  23. function asyncDone(fn, cb) {
  24. cb = once(cb);
  25. var d = domain.create();
  26. d.once('error', onError);
  27. var domainBoundFn = d.bind(fn);
  28. function done() {
  29. d.removeListener('error', onError);
  30. d.exit();
  31. return tryCatch(cb, arguments);
  32. }
  33. function onSuccess(result) {
  34. done(null, result);
  35. }
  36. function onError(error) {
  37. if (!error) {
  38. error = new Error('Promise rejected without Error');
  39. }
  40. done(error);
  41. }
  42. function asyncRunner() {
  43. var result = domainBoundFn(done);
  44. function onNext(state) {
  45. onNext.state = state;
  46. }
  47. function onCompleted() {
  48. onSuccess(onNext.state);
  49. }
  50. if (result && typeof result.on === 'function') {
  51. // Assume node stream
  52. d.add(result);
  53. eos(exhaust(result), eosConfig, done);
  54. return;
  55. }
  56. if (result && typeof result.subscribe === 'function') {
  57. // Assume RxJS observable
  58. result.subscribe(onNext, onError, onCompleted);
  59. return;
  60. }
  61. if (result && typeof result.then === 'function') {
  62. // Assume promise
  63. result.then(onSuccess, onError);
  64. return;
  65. }
  66. }
  67. p.nextTick(asyncRunner);
  68. }
  69. module.exports = asyncDone;