pipeline.js 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. // Ported from https://github.com/mafintosh/pump with
  2. // permission from the author, Mathias Buus (@mafintosh).
  3. 'use strict';
  4. var eos;
  5. function once(callback) {
  6. var called = false;
  7. return function () {
  8. if (called) return;
  9. called = true;
  10. callback.apply(void 0, arguments);
  11. };
  12. }
  13. var _require$codes = require('../../../errors').codes,
  14. ERR_MISSING_ARGS = _require$codes.ERR_MISSING_ARGS,
  15. ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED;
  16. function noop(err) {
  17. // Rethrow the error if it exists to avoid swallowing it
  18. if (err) throw err;
  19. }
  20. function isRequest(stream) {
  21. return stream.setHeader && typeof stream.abort === 'function';
  22. }
  23. function destroyer(stream, reading, writing, callback) {
  24. callback = once(callback);
  25. var closed = false;
  26. stream.on('close', function () {
  27. closed = true;
  28. });
  29. if (eos === undefined) eos = require('./end-of-stream');
  30. eos(stream, {
  31. readable: reading,
  32. writable: writing
  33. }, function (err) {
  34. if (err) return callback(err);
  35. closed = true;
  36. callback();
  37. });
  38. var destroyed = false;
  39. return function (err) {
  40. if (closed) return;
  41. if (destroyed) return;
  42. destroyed = true;
  43. // request.destroy just do .end - .abort is what we want
  44. if (isRequest(stream)) return stream.abort();
  45. if (typeof stream.destroy === 'function') return stream.destroy();
  46. callback(err || new ERR_STREAM_DESTROYED('pipe'));
  47. };
  48. }
  49. function call(fn) {
  50. fn();
  51. }
  52. function pipe(from, to) {
  53. return from.pipe(to);
  54. }
  55. function popCallback(streams) {
  56. if (!streams.length) return noop;
  57. if (typeof streams[streams.length - 1] !== 'function') return noop;
  58. return streams.pop();
  59. }
  60. function pipeline() {
  61. for (var _len = arguments.length, streams = new Array(_len), _key = 0; _key < _len; _key++) {
  62. streams[_key] = arguments[_key];
  63. }
  64. var callback = popCallback(streams);
  65. if (Array.isArray(streams[0])) streams = streams[0];
  66. if (streams.length < 2) {
  67. throw new ERR_MISSING_ARGS('streams');
  68. }
  69. var error;
  70. var destroys = streams.map(function (stream, i) {
  71. var reading = i < streams.length - 1;
  72. var writing = i > 0;
  73. return destroyer(stream, reading, writing, function (err) {
  74. if (!error) error = err;
  75. if (err) destroys.forEach(call);
  76. if (reading) return;
  77. destroys.forEach(call);
  78. callback(error);
  79. });
  80. });
  81. return streams.reduce(pipe);
  82. }
  83. module.exports = pipeline;