writable.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. // A bit simpler than readable streams.
  2. // Implement an async ._write(chunk, encoding, cb), and it'll handle all
  3. // the drain event emission and buffering.
  4. import {inherits, deprecate} from 'util';
  5. import {Buffer} from 'buffer';
  6. Writable.WritableState = WritableState;
  7. import {EventEmitter} from 'events';
  8. import {Duplex} from './duplex';
  9. import {nextTick} from 'process';
  10. inherits(Writable, EventEmitter);
  11. function nop() {}
  12. function WriteReq(chunk, encoding, cb) {
  13. this.chunk = chunk;
  14. this.encoding = encoding;
  15. this.callback = cb;
  16. this.next = null;
  17. }
  18. function WritableState(options, stream) {
  19. Object.defineProperty(this, 'buffer', {
  20. get: deprecate(function () {
  21. return this.getBuffer();
  22. }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
  23. });
  24. options = options || {};
  25. // object stream flag to indicate whether or not this stream
  26. // contains buffers or objects.
  27. this.objectMode = !!options.objectMode;
  28. if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
  29. // the point at which write() starts returning false
  30. // Note: 0 is a valid value, means that we always return false if
  31. // the entire buffer is not flushed immediately on write()
  32. var hwm = options.highWaterMark;
  33. var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  34. this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
  35. // cast to ints.
  36. this.highWaterMark = ~ ~this.highWaterMark;
  37. this.needDrain = false;
  38. // at the start of calling end()
  39. this.ending = false;
  40. // when end() has been called, and returned
  41. this.ended = false;
  42. // when 'finish' is emitted
  43. this.finished = false;
  44. // should we decode strings into buffers before passing to _write?
  45. // this is here so that some node-core streams can optimize string
  46. // handling at a lower level.
  47. var noDecode = options.decodeStrings === false;
  48. this.decodeStrings = !noDecode;
  49. // Crypto is kind of old and crusty. Historically, its default string
  50. // encoding is 'binary' so we have to make this configurable.
  51. // Everything else in the universe uses 'utf8', though.
  52. this.defaultEncoding = options.defaultEncoding || 'utf8';
  53. // not an actual buffer we keep track of, but a measurement
  54. // of how much we're waiting to get pushed to some underlying
  55. // socket or file.
  56. this.length = 0;
  57. // a flag to see when we're in the middle of a write.
  58. this.writing = false;
  59. // when true all writes will be buffered until .uncork() call
  60. this.corked = 0;
  61. // a flag to be able to tell if the onwrite cb is called immediately,
  62. // or on a later tick. We set this to true at first, because any
  63. // actions that shouldn't happen until "later" should generally also
  64. // not happen before the first write call.
  65. this.sync = true;
  66. // a flag to know if we're processing previously buffered items, which
  67. // may call the _write() callback in the same tick, so that we don't
  68. // end up in an overlapped onwrite situation.
  69. this.bufferProcessing = false;
  70. // the callback that's passed to _write(chunk,cb)
  71. this.onwrite = function (er) {
  72. onwrite(stream, er);
  73. };
  74. // the callback that the user supplies to write(chunk,encoding,cb)
  75. this.writecb = null;
  76. // the amount that is being written when _write is called.
  77. this.writelen = 0;
  78. this.bufferedRequest = null;
  79. this.lastBufferedRequest = null;
  80. // number of pending user-supplied write callbacks
  81. // this must be 0 before 'finish' can be emitted
  82. this.pendingcb = 0;
  83. // emit prefinish if the only thing we're waiting for is _write cbs
  84. // This is relevant for synchronous Transform streams
  85. this.prefinished = false;
  86. // True if the error was already emitted and should not be thrown again
  87. this.errorEmitted = false;
  88. // count buffered requests
  89. this.bufferedRequestCount = 0;
  90. // allocate the first CorkedRequest, there is always
  91. // one allocated and free to use, and we maintain at most two
  92. this.corkedRequestsFree = new CorkedRequest(this);
  93. }
  94. WritableState.prototype.getBuffer = function writableStateGetBuffer() {
  95. var current = this.bufferedRequest;
  96. var out = [];
  97. while (current) {
  98. out.push(current);
  99. current = current.next;
  100. }
  101. return out;
  102. };
  103. export default Writable;
  104. export function Writable(options) {
  105. // Writable ctor is applied to Duplexes, though they're not
  106. // instanceof Writable, they're instanceof Readable.
  107. if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
  108. this._writableState = new WritableState(options, this);
  109. // legacy.
  110. this.writable = true;
  111. if (options) {
  112. if (typeof options.write === 'function') this._write = options.write;
  113. if (typeof options.writev === 'function') this._writev = options.writev;
  114. }
  115. EventEmitter.call(this);
  116. }
  117. // Otherwise people can pipe Writable streams, which is just wrong.
  118. Writable.prototype.pipe = function () {
  119. this.emit('error', new Error('Cannot pipe, not readable'));
  120. };
  121. function writeAfterEnd(stream, cb) {
  122. var er = new Error('write after end');
  123. // TODO: defer error events consistently everywhere, not just the cb
  124. stream.emit('error', er);
  125. nextTick(cb, er);
  126. }
  127. // If we get something that is not a buffer, string, null, or undefined,
  128. // and we're not in objectMode, then that's an error.
  129. // Otherwise stream chunks are all considered to be of length=1, and the
  130. // watermarks determine how many objects to keep in the buffer, rather than
  131. // how many bytes or characters.
  132. function validChunk(stream, state, chunk, cb) {
  133. var valid = true;
  134. var er = false;
  135. // Always throw error if a null is written
  136. // if we are not in object mode then throw
  137. // if it is not a buffer, string, or undefined.
  138. if (chunk === null) {
  139. er = new TypeError('May not write null values to stream');
  140. } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
  141. er = new TypeError('Invalid non-string/buffer chunk');
  142. }
  143. if (er) {
  144. stream.emit('error', er);
  145. nextTick(cb, er);
  146. valid = false;
  147. }
  148. return valid;
  149. }
  150. Writable.prototype.write = function (chunk, encoding, cb) {
  151. var state = this._writableState;
  152. var ret = false;
  153. if (typeof encoding === 'function') {
  154. cb = encoding;
  155. encoding = null;
  156. }
  157. if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
  158. if (typeof cb !== 'function') cb = nop;
  159. if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
  160. state.pendingcb++;
  161. ret = writeOrBuffer(this, state, chunk, encoding, cb);
  162. }
  163. return ret;
  164. };
  165. Writable.prototype.cork = function () {
  166. var state = this._writableState;
  167. state.corked++;
  168. };
  169. Writable.prototype.uncork = function () {
  170. var state = this._writableState;
  171. if (state.corked) {
  172. state.corked--;
  173. if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
  174. }
  175. };
  176. Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  177. // node::ParseEncoding() requires lower case.
  178. if (typeof encoding === 'string') encoding = encoding.toLowerCase();
  179. if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
  180. this._writableState.defaultEncoding = encoding;
  181. return this;
  182. };
  183. function decodeChunk(state, chunk, encoding) {
  184. if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
  185. chunk = Buffer.from(chunk, encoding);
  186. }
  187. return chunk;
  188. }
  189. // if we're already writing something, then just put this
  190. // in the queue, and wait our turn. Otherwise, call _write
  191. // If we return false, then we need a drain event, so set that flag.
  192. function writeOrBuffer(stream, state, chunk, encoding, cb) {
  193. chunk = decodeChunk(state, chunk, encoding);
  194. if (Buffer.isBuffer(chunk)) encoding = 'buffer';
  195. var len = state.objectMode ? 1 : chunk.length;
  196. state.length += len;
  197. var ret = state.length < state.highWaterMark;
  198. // we must ensure that previous needDrain will not be reset to false.
  199. if (!ret) state.needDrain = true;
  200. if (state.writing || state.corked) {
  201. var last = state.lastBufferedRequest;
  202. state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
  203. if (last) {
  204. last.next = state.lastBufferedRequest;
  205. } else {
  206. state.bufferedRequest = state.lastBufferedRequest;
  207. }
  208. state.bufferedRequestCount += 1;
  209. } else {
  210. doWrite(stream, state, false, len, chunk, encoding, cb);
  211. }
  212. return ret;
  213. }
  214. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  215. state.writelen = len;
  216. state.writecb = cb;
  217. state.writing = true;
  218. state.sync = true;
  219. if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  220. state.sync = false;
  221. }
  222. function onwriteError(stream, state, sync, er, cb) {
  223. --state.pendingcb;
  224. if (sync) nextTick(cb, er);else cb(er);
  225. stream._writableState.errorEmitted = true;
  226. stream.emit('error', er);
  227. }
  228. function onwriteStateUpdate(state) {
  229. state.writing = false;
  230. state.writecb = null;
  231. state.length -= state.writelen;
  232. state.writelen = 0;
  233. }
  234. function onwrite(stream, er) {
  235. var state = stream._writableState;
  236. var sync = state.sync;
  237. var cb = state.writecb;
  238. onwriteStateUpdate(state);
  239. if (er) onwriteError(stream, state, sync, er, cb);else {
  240. // Check if we're actually ready to finish, but don't emit yet
  241. var finished = needFinish(state);
  242. if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
  243. clearBuffer(stream, state);
  244. }
  245. if (sync) {
  246. /*<replacement>*/
  247. nextTick(afterWrite, stream, state, finished, cb);
  248. /*</replacement>*/
  249. } else {
  250. afterWrite(stream, state, finished, cb);
  251. }
  252. }
  253. }
  254. function afterWrite(stream, state, finished, cb) {
  255. if (!finished) onwriteDrain(stream, state);
  256. state.pendingcb--;
  257. cb();
  258. finishMaybe(stream, state);
  259. }
  260. // Must force callback to be called on nextTick, so that we don't
  261. // emit 'drain' before the write() consumer gets the 'false' return
  262. // value, and has a chance to attach a 'drain' listener.
  263. function onwriteDrain(stream, state) {
  264. if (state.length === 0 && state.needDrain) {
  265. state.needDrain = false;
  266. stream.emit('drain');
  267. }
  268. }
  269. // if there's something in the buffer waiting, then process it
  270. function clearBuffer(stream, state) {
  271. state.bufferProcessing = true;
  272. var entry = state.bufferedRequest;
  273. if (stream._writev && entry && entry.next) {
  274. // Fast case, write everything using _writev()
  275. var l = state.bufferedRequestCount;
  276. var buffer = new Array(l);
  277. var holder = state.corkedRequestsFree;
  278. holder.entry = entry;
  279. var count = 0;
  280. while (entry) {
  281. buffer[count] = entry;
  282. entry = entry.next;
  283. count += 1;
  284. }
  285. doWrite(stream, state, true, state.length, buffer, '', holder.finish);
  286. // doWrite is almost always async, defer these to save a bit of time
  287. // as the hot path ends with doWrite
  288. state.pendingcb++;
  289. state.lastBufferedRequest = null;
  290. if (holder.next) {
  291. state.corkedRequestsFree = holder.next;
  292. holder.next = null;
  293. } else {
  294. state.corkedRequestsFree = new CorkedRequest(state);
  295. }
  296. } else {
  297. // Slow case, write chunks one-by-one
  298. while (entry) {
  299. var chunk = entry.chunk;
  300. var encoding = entry.encoding;
  301. var cb = entry.callback;
  302. var len = state.objectMode ? 1 : chunk.length;
  303. doWrite(stream, state, false, len, chunk, encoding, cb);
  304. entry = entry.next;
  305. // if we didn't call the onwrite immediately, then
  306. // it means that we need to wait until it does.
  307. // also, that means that the chunk and cb are currently
  308. // being processed, so move the buffer counter past them.
  309. if (state.writing) {
  310. break;
  311. }
  312. }
  313. if (entry === null) state.lastBufferedRequest = null;
  314. }
  315. state.bufferedRequestCount = 0;
  316. state.bufferedRequest = entry;
  317. state.bufferProcessing = false;
  318. }
  319. Writable.prototype._write = function (chunk, encoding, cb) {
  320. cb(new Error('not implemented'));
  321. };
  322. Writable.prototype._writev = null;
  323. Writable.prototype.end = function (chunk, encoding, cb) {
  324. var state = this._writableState;
  325. if (typeof chunk === 'function') {
  326. cb = chunk;
  327. chunk = null;
  328. encoding = null;
  329. } else if (typeof encoding === 'function') {
  330. cb = encoding;
  331. encoding = null;
  332. }
  333. if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
  334. // .end() fully uncorks
  335. if (state.corked) {
  336. state.corked = 1;
  337. this.uncork();
  338. }
  339. // ignore unnecessary end() calls.
  340. if (!state.ending && !state.finished) endWritable(this, state, cb);
  341. };
  342. function needFinish(state) {
  343. return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
  344. }
  345. function prefinish(stream, state) {
  346. if (!state.prefinished) {
  347. state.prefinished = true;
  348. stream.emit('prefinish');
  349. }
  350. }
  351. function finishMaybe(stream, state) {
  352. var need = needFinish(state);
  353. if (need) {
  354. if (state.pendingcb === 0) {
  355. prefinish(stream, state);
  356. state.finished = true;
  357. stream.emit('finish');
  358. } else {
  359. prefinish(stream, state);
  360. }
  361. }
  362. return need;
  363. }
  364. function endWritable(stream, state, cb) {
  365. state.ending = true;
  366. finishMaybe(stream, state);
  367. if (cb) {
  368. if (state.finished) nextTick(cb);else stream.once('finish', cb);
  369. }
  370. state.ended = true;
  371. stream.writable = false;
  372. }
  373. // It seems a linked list but it is not
  374. // there will be only 2 of these for each stream
  375. function CorkedRequest(state) {
  376. var _this = this;
  377. this.next = null;
  378. this.entry = null;
  379. this.finish = function (err) {
  380. var entry = _this.entry;
  381. _this.entry = null;
  382. while (entry) {
  383. var cb = entry.callback;
  384. state.pendingcb--;
  385. cb(err);
  386. entry = entry.next;
  387. }
  388. if (state.corkedRequestsFree) {
  389. state.corkedRequestsFree.next = _this;
  390. } else {
  391. state.corkedRequestsFree = _this;
  392. }
  393. };
  394. }