_stream_writable.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. // Copyright Joyent, Inc. and other Node contributors.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the
  5. // "Software"), to deal in the Software without restriction, including
  6. // without limitation the rights to use, copy, modify, merge, publish,
  7. // distribute, sublicense, and/or sell copies of the Software, and to permit
  8. // persons to whom the Software is furnished to do so, subject to the
  9. // following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included
  12. // in all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  15. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
  17. // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
  18. // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  19. // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
  20. // USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. // A bit simpler than readable streams.
  22. // Implement an async ._write(chunk, encoding, cb), and it'll handle all
  23. // the drain event emission and buffering.
  24. 'use strict';
  25. module.exports = Writable;
  26. /* <replacement> */
  27. function WriteReq(chunk, encoding, cb) {
  28. this.chunk = chunk;
  29. this.encoding = encoding;
  30. this.callback = cb;
  31. this.next = null;
  32. }
  33. // It seems a linked list but it is not
  34. // there will be only 2 of these for each stream
  35. function CorkedRequest(state) {
  36. var _this = this;
  37. this.next = null;
  38. this.entry = null;
  39. this.finish = function () {
  40. onCorkedFinish(_this, state);
  41. };
  42. }
  43. /* </replacement> */
  44. /*<replacement>*/
  45. var Duplex;
  46. /*</replacement>*/
  47. Writable.WritableState = WritableState;
  48. /*<replacement>*/
  49. var internalUtil = {
  50. deprecate: require('util-deprecate')
  51. };
  52. /*</replacement>*/
  53. /*<replacement>*/
  54. var Stream = require('./internal/streams/stream');
  55. /*</replacement>*/
  56. var Buffer = require('buffer').Buffer;
  57. var OurUint8Array = (typeof global !== 'undefined' ? global : typeof window !== 'undefined' ? window : typeof self !== 'undefined' ? self : {}).Uint8Array || function () {};
  58. function _uint8ArrayToBuffer(chunk) {
  59. return Buffer.from(chunk);
  60. }
  61. function _isUint8Array(obj) {
  62. return Buffer.isBuffer(obj) || obj instanceof OurUint8Array;
  63. }
  64. var destroyImpl = require('./internal/streams/destroy');
  65. var _require = require('./internal/streams/state'),
  66. getHighWaterMark = _require.getHighWaterMark;
  67. var _require$codes = require('../errors').codes,
  68. ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
  69. ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
  70. ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK,
  71. ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE,
  72. ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED,
  73. ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES,
  74. ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END,
  75. ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING;
  76. var errorOrDestroy = destroyImpl.errorOrDestroy;
  77. require('inherits')(Writable, Stream);
  78. function nop() {}
  79. function WritableState(options, stream, isDuplex) {
  80. Duplex = Duplex || require('./_stream_duplex');
  81. options = options || {};
  82. // Duplex streams are both readable and writable, but share
  83. // the same options object.
  84. // However, some cases require setting options to different
  85. // values for the readable and the writable sides of the duplex stream,
  86. // e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
  87. if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex;
  88. // object stream flag to indicate whether or not this stream
  89. // contains buffers or objects.
  90. this.objectMode = !!options.objectMode;
  91. if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
  92. // the point at which write() starts returning false
  93. // Note: 0 is a valid value, means that we always return false if
  94. // the entire buffer is not flushed immediately on write()
  95. this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex);
  96. // if _final has been called
  97. this.finalCalled = false;
  98. // drain event flag.
  99. this.needDrain = false;
  100. // at the start of calling end()
  101. this.ending = false;
  102. // when end() has been called, and returned
  103. this.ended = false;
  104. // when 'finish' is emitted
  105. this.finished = false;
  106. // has it been destroyed
  107. this.destroyed = false;
  108. // should we decode strings into buffers before passing to _write?
  109. // this is here so that some node-core streams can optimize string
  110. // handling at a lower level.
  111. var noDecode = options.decodeStrings === false;
  112. this.decodeStrings = !noDecode;
  113. // Crypto is kind of old and crusty. Historically, its default string
  114. // encoding is 'binary' so we have to make this configurable.
  115. // Everything else in the universe uses 'utf8', though.
  116. this.defaultEncoding = options.defaultEncoding || 'utf8';
  117. // not an actual buffer we keep track of, but a measurement
  118. // of how much we're waiting to get pushed to some underlying
  119. // socket or file.
  120. this.length = 0;
  121. // a flag to see when we're in the middle of a write.
  122. this.writing = false;
  123. // when true all writes will be buffered until .uncork() call
  124. this.corked = 0;
  125. // a flag to be able to tell if the onwrite cb is called immediately,
  126. // or on a later tick. We set this to true at first, because any
  127. // actions that shouldn't happen until "later" should generally also
  128. // not happen before the first write call.
  129. this.sync = true;
  130. // a flag to know if we're processing previously buffered items, which
  131. // may call the _write() callback in the same tick, so that we don't
  132. // end up in an overlapped onwrite situation.
  133. this.bufferProcessing = false;
  134. // the callback that's passed to _write(chunk,cb)
  135. this.onwrite = function (er) {
  136. onwrite(stream, er);
  137. };
  138. // the callback that the user supplies to write(chunk,encoding,cb)
  139. this.writecb = null;
  140. // the amount that is being written when _write is called.
  141. this.writelen = 0;
  142. this.bufferedRequest = null;
  143. this.lastBufferedRequest = null;
  144. // number of pending user-supplied write callbacks
  145. // this must be 0 before 'finish' can be emitted
  146. this.pendingcb = 0;
  147. // emit prefinish if the only thing we're waiting for is _write cbs
  148. // This is relevant for synchronous Transform streams
  149. this.prefinished = false;
  150. // True if the error was already emitted and should not be thrown again
  151. this.errorEmitted = false;
  152. // Should close be emitted on destroy. Defaults to true.
  153. this.emitClose = options.emitClose !== false;
  154. // Should .destroy() be called after 'finish' (and potentially 'end')
  155. this.autoDestroy = !!options.autoDestroy;
  156. // count buffered requests
  157. this.bufferedRequestCount = 0;
  158. // allocate the first CorkedRequest, there is always
  159. // one allocated and free to use, and we maintain at most two
  160. this.corkedRequestsFree = new CorkedRequest(this);
  161. }
  162. WritableState.prototype.getBuffer = function getBuffer() {
  163. var current = this.bufferedRequest;
  164. var out = [];
  165. while (current) {
  166. out.push(current);
  167. current = current.next;
  168. }
  169. return out;
  170. };
  171. (function () {
  172. try {
  173. Object.defineProperty(WritableState.prototype, 'buffer', {
  174. get: internalUtil.deprecate(function writableStateBufferGetter() {
  175. return this.getBuffer();
  176. }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.', 'DEP0003')
  177. });
  178. } catch (_) {}
  179. })();
  180. // Test _writableState for inheritance to account for Duplex streams,
  181. // whose prototype chain only points to Readable.
  182. var realHasInstance;
  183. if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
  184. realHasInstance = Function.prototype[Symbol.hasInstance];
  185. Object.defineProperty(Writable, Symbol.hasInstance, {
  186. value: function value(object) {
  187. if (realHasInstance.call(this, object)) return true;
  188. if (this !== Writable) return false;
  189. return object && object._writableState instanceof WritableState;
  190. }
  191. });
  192. } else {
  193. realHasInstance = function realHasInstance(object) {
  194. return object instanceof this;
  195. };
  196. }
  197. function Writable(options) {
  198. Duplex = Duplex || require('./_stream_duplex');
  199. // Writable ctor is applied to Duplexes, too.
  200. // `realHasInstance` is necessary because using plain `instanceof`
  201. // would return false, as no `_writableState` property is attached.
  202. // Trying to use the custom `instanceof` for Writable here will also break the
  203. // Node.js LazyTransform implementation, which has a non-trivial getter for
  204. // `_writableState` that would lead to infinite recursion.
  205. // Checking for a Stream.Duplex instance is faster here instead of inside
  206. // the WritableState constructor, at least with V8 6.5
  207. var isDuplex = this instanceof Duplex;
  208. if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options);
  209. this._writableState = new WritableState(options, this, isDuplex);
  210. // legacy.
  211. this.writable = true;
  212. if (options) {
  213. if (typeof options.write === 'function') this._write = options.write;
  214. if (typeof options.writev === 'function') this._writev = options.writev;
  215. if (typeof options.destroy === 'function') this._destroy = options.destroy;
  216. if (typeof options.final === 'function') this._final = options.final;
  217. }
  218. Stream.call(this);
  219. }
  220. // Otherwise people can pipe Writable streams, which is just wrong.
  221. Writable.prototype.pipe = function () {
  222. errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
  223. };
  224. function writeAfterEnd(stream, cb) {
  225. var er = new ERR_STREAM_WRITE_AFTER_END();
  226. // TODO: defer error events consistently everywhere, not just the cb
  227. errorOrDestroy(stream, er);
  228. process.nextTick(cb, er);
  229. }
  230. // Checks that a user-supplied chunk is valid, especially for the particular
  231. // mode the stream is in. Currently this means that `null` is never accepted
  232. // and undefined/non-string values are only allowed in object mode.
  233. function validChunk(stream, state, chunk, cb) {
  234. var er;
  235. if (chunk === null) {
  236. er = new ERR_STREAM_NULL_VALUES();
  237. } else if (typeof chunk !== 'string' && !state.objectMode) {
  238. er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
  239. }
  240. if (er) {
  241. errorOrDestroy(stream, er);
  242. process.nextTick(cb, er);
  243. return false;
  244. }
  245. return true;
  246. }
  247. Writable.prototype.write = function (chunk, encoding, cb) {
  248. var state = this._writableState;
  249. var ret = false;
  250. var isBuf = !state.objectMode && _isUint8Array(chunk);
  251. if (isBuf && !Buffer.isBuffer(chunk)) {
  252. chunk = _uint8ArrayToBuffer(chunk);
  253. }
  254. if (typeof encoding === 'function') {
  255. cb = encoding;
  256. encoding = null;
  257. }
  258. if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
  259. if (typeof cb !== 'function') cb = nop;
  260. if (state.ending) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
  261. state.pendingcb++;
  262. ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
  263. }
  264. return ret;
  265. };
  266. Writable.prototype.cork = function () {
  267. this._writableState.corked++;
  268. };
  269. Writable.prototype.uncork = function () {
  270. var state = this._writableState;
  271. if (state.corked) {
  272. state.corked--;
  273. if (!state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
  274. }
  275. };
  276. Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  277. // node::ParseEncoding() requires lower case.
  278. if (typeof encoding === 'string') encoding = encoding.toLowerCase();
  279. if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new ERR_UNKNOWN_ENCODING(encoding);
  280. this._writableState.defaultEncoding = encoding;
  281. return this;
  282. };
  283. Object.defineProperty(Writable.prototype, 'writableBuffer', {
  284. // making it explicit this property is not enumerable
  285. // because otherwise some prototype manipulation in
  286. // userland will fail
  287. enumerable: false,
  288. get: function get() {
  289. return this._writableState && this._writableState.getBuffer();
  290. }
  291. });
  292. function decodeChunk(state, chunk, encoding) {
  293. if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
  294. chunk = Buffer.from(chunk, encoding);
  295. }
  296. return chunk;
  297. }
  298. Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
  299. // making it explicit this property is not enumerable
  300. // because otherwise some prototype manipulation in
  301. // userland will fail
  302. enumerable: false,
  303. get: function get() {
  304. return this._writableState.highWaterMark;
  305. }
  306. });
  307. // if we're already writing something, then just put this
  308. // in the queue, and wait our turn. Otherwise, call _write
  309. // If we return false, then we need a drain event, so set that flag.
  310. function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
  311. if (!isBuf) {
  312. var newChunk = decodeChunk(state, chunk, encoding);
  313. if (chunk !== newChunk) {
  314. isBuf = true;
  315. encoding = 'buffer';
  316. chunk = newChunk;
  317. }
  318. }
  319. var len = state.objectMode ? 1 : chunk.length;
  320. state.length += len;
  321. var ret = state.length < state.highWaterMark;
  322. // we must ensure that previous needDrain will not be reset to false.
  323. if (!ret) state.needDrain = true;
  324. if (state.writing || state.corked) {
  325. var last = state.lastBufferedRequest;
  326. state.lastBufferedRequest = {
  327. chunk: chunk,
  328. encoding: encoding,
  329. isBuf: isBuf,
  330. callback: cb,
  331. next: null
  332. };
  333. if (last) {
  334. last.next = state.lastBufferedRequest;
  335. } else {
  336. state.bufferedRequest = state.lastBufferedRequest;
  337. }
  338. state.bufferedRequestCount += 1;
  339. } else {
  340. doWrite(stream, state, false, len, chunk, encoding, cb);
  341. }
  342. return ret;
  343. }
  344. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  345. state.writelen = len;
  346. state.writecb = cb;
  347. state.writing = true;
  348. state.sync = true;
  349. if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  350. state.sync = false;
  351. }
  352. function onwriteError(stream, state, sync, er, cb) {
  353. --state.pendingcb;
  354. if (sync) {
  355. // defer the callback if we are being called synchronously
  356. // to avoid piling up things on the stack
  357. process.nextTick(cb, er);
  358. // this can emit finish, and it will always happen
  359. // after error
  360. process.nextTick(finishMaybe, stream, state);
  361. stream._writableState.errorEmitted = true;
  362. errorOrDestroy(stream, er);
  363. } else {
  364. // the caller expect this to happen before if
  365. // it is async
  366. cb(er);
  367. stream._writableState.errorEmitted = true;
  368. errorOrDestroy(stream, er);
  369. // this can emit finish, but finish must
  370. // always follow error
  371. finishMaybe(stream, state);
  372. }
  373. }
  374. function onwriteStateUpdate(state) {
  375. state.writing = false;
  376. state.writecb = null;
  377. state.length -= state.writelen;
  378. state.writelen = 0;
  379. }
  380. function onwrite(stream, er) {
  381. var state = stream._writableState;
  382. var sync = state.sync;
  383. var cb = state.writecb;
  384. if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK();
  385. onwriteStateUpdate(state);
  386. if (er) onwriteError(stream, state, sync, er, cb);else {
  387. // Check if we're actually ready to finish, but don't emit yet
  388. var finished = needFinish(state) || stream.destroyed;
  389. if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
  390. clearBuffer(stream, state);
  391. }
  392. if (sync) {
  393. process.nextTick(afterWrite, stream, state, finished, cb);
  394. } else {
  395. afterWrite(stream, state, finished, cb);
  396. }
  397. }
  398. }
  399. function afterWrite(stream, state, finished, cb) {
  400. if (!finished) onwriteDrain(stream, state);
  401. state.pendingcb--;
  402. cb();
  403. finishMaybe(stream, state);
  404. }
  405. // Must force callback to be called on nextTick, so that we don't
  406. // emit 'drain' before the write() consumer gets the 'false' return
  407. // value, and has a chance to attach a 'drain' listener.
  408. function onwriteDrain(stream, state) {
  409. if (state.length === 0 && state.needDrain) {
  410. state.needDrain = false;
  411. stream.emit('drain');
  412. }
  413. }
  414. // if there's something in the buffer waiting, then process it
  415. function clearBuffer(stream, state) {
  416. state.bufferProcessing = true;
  417. var entry = state.bufferedRequest;
  418. if (stream._writev && entry && entry.next) {
  419. // Fast case, write everything using _writev()
  420. var l = state.bufferedRequestCount;
  421. var buffer = new Array(l);
  422. var holder = state.corkedRequestsFree;
  423. holder.entry = entry;
  424. var count = 0;
  425. var allBuffers = true;
  426. while (entry) {
  427. buffer[count] = entry;
  428. if (!entry.isBuf) allBuffers = false;
  429. entry = entry.next;
  430. count += 1;
  431. }
  432. buffer.allBuffers = allBuffers;
  433. doWrite(stream, state, true, state.length, buffer, '', holder.finish);
  434. // doWrite is almost always async, defer these to save a bit of time
  435. // as the hot path ends with doWrite
  436. state.pendingcb++;
  437. state.lastBufferedRequest = null;
  438. if (holder.next) {
  439. state.corkedRequestsFree = holder.next;
  440. holder.next = null;
  441. } else {
  442. state.corkedRequestsFree = new CorkedRequest(state);
  443. }
  444. state.bufferedRequestCount = 0;
  445. } else {
  446. // Slow case, write chunks one-by-one
  447. while (entry) {
  448. var chunk = entry.chunk;
  449. var encoding = entry.encoding;
  450. var cb = entry.callback;
  451. var len = state.objectMode ? 1 : chunk.length;
  452. doWrite(stream, state, false, len, chunk, encoding, cb);
  453. entry = entry.next;
  454. state.bufferedRequestCount--;
  455. // if we didn't call the onwrite immediately, then
  456. // it means that we need to wait until it does.
  457. // also, that means that the chunk and cb are currently
  458. // being processed, so move the buffer counter past them.
  459. if (state.writing) {
  460. break;
  461. }
  462. }
  463. if (entry === null) state.lastBufferedRequest = null;
  464. }
  465. state.bufferedRequest = entry;
  466. state.bufferProcessing = false;
  467. }
  468. Writable.prototype._write = function (chunk, encoding, cb) {
  469. cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
  470. };
  471. Writable.prototype._writev = null;
  472. Writable.prototype.end = function (chunk, encoding, cb) {
  473. var state = this._writableState;
  474. if (typeof chunk === 'function') {
  475. cb = chunk;
  476. chunk = null;
  477. encoding = null;
  478. } else if (typeof encoding === 'function') {
  479. cb = encoding;
  480. encoding = null;
  481. }
  482. if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
  483. // .end() fully uncorks
  484. if (state.corked) {
  485. state.corked = 1;
  486. this.uncork();
  487. }
  488. // ignore unnecessary end() calls.
  489. if (!state.ending) endWritable(this, state, cb);
  490. return this;
  491. };
  492. Object.defineProperty(Writable.prototype, 'writableLength', {
  493. // making it explicit this property is not enumerable
  494. // because otherwise some prototype manipulation in
  495. // userland will fail
  496. enumerable: false,
  497. get: function get() {
  498. return this._writableState.length;
  499. }
  500. });
  501. function needFinish(state) {
  502. return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
  503. }
  504. function callFinal(stream, state) {
  505. stream._final(function (err) {
  506. state.pendingcb--;
  507. if (err) {
  508. errorOrDestroy(stream, err);
  509. }
  510. state.prefinished = true;
  511. stream.emit('prefinish');
  512. finishMaybe(stream, state);
  513. });
  514. }
  515. function prefinish(stream, state) {
  516. if (!state.prefinished && !state.finalCalled) {
  517. if (typeof stream._final === 'function' && !state.destroyed) {
  518. state.pendingcb++;
  519. state.finalCalled = true;
  520. process.nextTick(callFinal, stream, state);
  521. } else {
  522. state.prefinished = true;
  523. stream.emit('prefinish');
  524. }
  525. }
  526. }
  527. function finishMaybe(stream, state) {
  528. var need = needFinish(state);
  529. if (need) {
  530. prefinish(stream, state);
  531. if (state.pendingcb === 0) {
  532. state.finished = true;
  533. stream.emit('finish');
  534. if (state.autoDestroy) {
  535. // In case of duplex streams we need a way to detect
  536. // if the readable side is ready for autoDestroy as well
  537. var rState = stream._readableState;
  538. if (!rState || rState.autoDestroy && rState.endEmitted) {
  539. stream.destroy();
  540. }
  541. }
  542. }
  543. }
  544. return need;
  545. }
  546. function endWritable(stream, state, cb) {
  547. state.ending = true;
  548. finishMaybe(stream, state);
  549. if (cb) {
  550. if (state.finished) process.nextTick(cb);else stream.once('finish', cb);
  551. }
  552. state.ended = true;
  553. stream.writable = false;
  554. }
  555. function onCorkedFinish(corkReq, state, err) {
  556. var entry = corkReq.entry;
  557. corkReq.entry = null;
  558. while (entry) {
  559. var cb = entry.callback;
  560. state.pendingcb--;
  561. cb(err);
  562. entry = entry.next;
  563. }
  564. // reuse the free corkReq.
  565. state.corkedRequestsFree.next = corkReq;
  566. }
  567. Object.defineProperty(Writable.prototype, 'destroyed', {
  568. // making it explicit this property is not enumerable
  569. // because otherwise some prototype manipulation in
  570. // userland will fail
  571. enumerable: false,
  572. get: function get() {
  573. if (this._writableState === undefined) {
  574. return false;
  575. }
  576. return this._writableState.destroyed;
  577. },
  578. set: function set(value) {
  579. // we ignore the value if the stream
  580. // has not been initialized yet
  581. if (!this._writableState) {
  582. return;
  583. }
  584. // backward compatibility, the user is explicitly
  585. // managing destroyed
  586. this._writableState.destroyed = value;
  587. }
  588. });
  589. Writable.prototype.destroy = destroyImpl.destroy;
  590. Writable.prototype._undestroy = destroyImpl.undestroy;
  591. Writable.prototype._destroy = function (err, cb) {
  592. cb(err);
  593. };