async_iterator.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. 'use strict';
  2. var _Object$setPrototypeO;
  3. function _defineProperty(obj, key, value) { key = _toPropertyKey(key); if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
  4. function _toPropertyKey(arg) { var key = _toPrimitive(arg, "string"); return typeof key === "symbol" ? key : String(key); }
  5. function _toPrimitive(input, hint) { if (typeof input !== "object" || input === null) return input; var prim = input[Symbol.toPrimitive]; if (prim !== undefined) { var res = prim.call(input, hint || "default"); if (typeof res !== "object") return res; throw new TypeError("@@toPrimitive must return a primitive value."); } return (hint === "string" ? String : Number)(input); }
  6. var finished = require('./end-of-stream');
  7. var kLastResolve = Symbol('lastResolve');
  8. var kLastReject = Symbol('lastReject');
  9. var kError = Symbol('error');
  10. var kEnded = Symbol('ended');
  11. var kLastPromise = Symbol('lastPromise');
  12. var kHandlePromise = Symbol('handlePromise');
  13. var kStream = Symbol('stream');
  14. function createIterResult(value, done) {
  15. return {
  16. value: value,
  17. done: done
  18. };
  19. }
  20. function readAndResolve(iter) {
  21. var resolve = iter[kLastResolve];
  22. if (resolve !== null) {
  23. var data = iter[kStream].read();
  24. // we defer if data is null
  25. // we can be expecting either 'end' or
  26. // 'error'
  27. if (data !== null) {
  28. iter[kLastPromise] = null;
  29. iter[kLastResolve] = null;
  30. iter[kLastReject] = null;
  31. resolve(createIterResult(data, false));
  32. }
  33. }
  34. }
  35. function onReadable(iter) {
  36. // we wait for the next tick, because it might
  37. // emit an error with process.nextTick
  38. process.nextTick(readAndResolve, iter);
  39. }
  40. function wrapForNext(lastPromise, iter) {
  41. return function (resolve, reject) {
  42. lastPromise.then(function () {
  43. if (iter[kEnded]) {
  44. resolve(createIterResult(undefined, true));
  45. return;
  46. }
  47. iter[kHandlePromise](resolve, reject);
  48. }, reject);
  49. };
  50. }
  51. var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
  52. var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
  53. get stream() {
  54. return this[kStream];
  55. },
  56. next: function next() {
  57. var _this = this;
  58. // if we have detected an error in the meanwhile
  59. // reject straight away
  60. var error = this[kError];
  61. if (error !== null) {
  62. return Promise.reject(error);
  63. }
  64. if (this[kEnded]) {
  65. return Promise.resolve(createIterResult(undefined, true));
  66. }
  67. if (this[kStream].destroyed) {
  68. // We need to defer via nextTick because if .destroy(err) is
  69. // called, the error will be emitted via nextTick, and
  70. // we cannot guarantee that there is no error lingering around
  71. // waiting to be emitted.
  72. return new Promise(function (resolve, reject) {
  73. process.nextTick(function () {
  74. if (_this[kError]) {
  75. reject(_this[kError]);
  76. } else {
  77. resolve(createIterResult(undefined, true));
  78. }
  79. });
  80. });
  81. }
  82. // if we have multiple next() calls
  83. // we will wait for the previous Promise to finish
  84. // this logic is optimized to support for await loops,
  85. // where next() is only called once at a time
  86. var lastPromise = this[kLastPromise];
  87. var promise;
  88. if (lastPromise) {
  89. promise = new Promise(wrapForNext(lastPromise, this));
  90. } else {
  91. // fast path needed to support multiple this.push()
  92. // without triggering the next() queue
  93. var data = this[kStream].read();
  94. if (data !== null) {
  95. return Promise.resolve(createIterResult(data, false));
  96. }
  97. promise = new Promise(this[kHandlePromise]);
  98. }
  99. this[kLastPromise] = promise;
  100. return promise;
  101. }
  102. }, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
  103. return this;
  104. }), _defineProperty(_Object$setPrototypeO, "return", function _return() {
  105. var _this2 = this;
  106. // destroy(err, cb) is a private API
  107. // we can guarantee we have that here, because we control the
  108. // Readable class this is attached to
  109. return new Promise(function (resolve, reject) {
  110. _this2[kStream].destroy(null, function (err) {
  111. if (err) {
  112. reject(err);
  113. return;
  114. }
  115. resolve(createIterResult(undefined, true));
  116. });
  117. });
  118. }), _Object$setPrototypeO), AsyncIteratorPrototype);
  119. var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
  120. var _Object$create;
  121. var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
  122. value: stream,
  123. writable: true
  124. }), _defineProperty(_Object$create, kLastResolve, {
  125. value: null,
  126. writable: true
  127. }), _defineProperty(_Object$create, kLastReject, {
  128. value: null,
  129. writable: true
  130. }), _defineProperty(_Object$create, kError, {
  131. value: null,
  132. writable: true
  133. }), _defineProperty(_Object$create, kEnded, {
  134. value: stream._readableState.endEmitted,
  135. writable: true
  136. }), _defineProperty(_Object$create, kHandlePromise, {
  137. value: function value(resolve, reject) {
  138. var data = iterator[kStream].read();
  139. if (data) {
  140. iterator[kLastPromise] = null;
  141. iterator[kLastResolve] = null;
  142. iterator[kLastReject] = null;
  143. resolve(createIterResult(data, false));
  144. } else {
  145. iterator[kLastResolve] = resolve;
  146. iterator[kLastReject] = reject;
  147. }
  148. },
  149. writable: true
  150. }), _Object$create));
  151. iterator[kLastPromise] = null;
  152. finished(stream, function (err) {
  153. if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
  154. var reject = iterator[kLastReject];
  155. // reject if we are waiting for data in the Promise
  156. // returned by next() and store the error
  157. if (reject !== null) {
  158. iterator[kLastPromise] = null;
  159. iterator[kLastResolve] = null;
  160. iterator[kLastReject] = null;
  161. reject(err);
  162. }
  163. iterator[kError] = err;
  164. return;
  165. }
  166. var resolve = iterator[kLastResolve];
  167. if (resolve !== null) {
  168. iterator[kLastPromise] = null;
  169. iterator[kLastResolve] = null;
  170. iterator[kLastReject] = null;
  171. resolve(createIterResult(undefined, true));
  172. }
  173. iterator[kEnded] = true;
  174. });
  175. stream.on('readable', onReadable.bind(null, iterator));
  176. return iterator;
  177. };
  178. module.exports = createReadableStreamAsyncIterator;