stream.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import EE from 'events';
  2. import {inherits} from 'util';
  3. import {Duplex} from './readable-stream/duplex.js';
  4. import {Readable} from './readable-stream/readable.js';
  5. import {Writable} from './readable-stream/writable.js';
  6. import {Transform} from './readable-stream/transform.js';
  7. import {PassThrough} from './readable-stream/passthrough.js';
  8. inherits(Stream, EE);
  9. Stream.Readable = Readable;
  10. Stream.Writable = Writable;
  11. Stream.Duplex = Duplex;
  12. Stream.Transform = Transform;
  13. Stream.PassThrough = PassThrough;
  14. // Backwards-compat with node 0.4.x
  15. Stream.Stream = Stream;
  16. export default Stream;
  17. export {Readable,Writable,Duplex,Transform,PassThrough,Stream}
  18. // old-style streams. Note that the pipe method (the only relevant
  19. // part of this class) is overridden in the Readable class.
  20. function Stream() {
  21. EE.call(this);
  22. }
  23. Stream.prototype.pipe = function(dest, options) {
  24. var source = this;
  25. function ondata(chunk) {
  26. if (dest.writable) {
  27. if (false === dest.write(chunk) && source.pause) {
  28. source.pause();
  29. }
  30. }
  31. }
  32. source.on('data', ondata);
  33. function ondrain() {
  34. if (source.readable && source.resume) {
  35. source.resume();
  36. }
  37. }
  38. dest.on('drain', ondrain);
  39. // If the 'end' option is not supplied, dest.end() will be called when
  40. // source gets the 'end' or 'close' events. Only dest.end() once.
  41. if (!dest._isStdio && (!options || options.end !== false)) {
  42. source.on('end', onend);
  43. source.on('close', onclose);
  44. }
  45. var didOnEnd = false;
  46. function onend() {
  47. if (didOnEnd) return;
  48. didOnEnd = true;
  49. dest.end();
  50. }
  51. function onclose() {
  52. if (didOnEnd) return;
  53. didOnEnd = true;
  54. if (typeof dest.destroy === 'function') dest.destroy();
  55. }
  56. // don't leave dangling pipes when there are errors.
  57. function onerror(er) {
  58. cleanup();
  59. if (EE.listenerCount(this, 'error') === 0) {
  60. throw er; // Unhandled stream error in pipe.
  61. }
  62. }
  63. source.on('error', onerror);
  64. dest.on('error', onerror);
  65. // remove all the event listeners that were added.
  66. function cleanup() {
  67. source.removeListener('data', ondata);
  68. dest.removeListener('drain', ondrain);
  69. source.removeListener('end', onend);
  70. source.removeListener('close', onclose);
  71. source.removeListener('error', onerror);
  72. dest.removeListener('error', onerror);
  73. source.removeListener('end', cleanup);
  74. source.removeListener('close', cleanup);
  75. dest.removeListener('close', cleanup);
  76. }
  77. source.on('end', cleanup);
  78. source.on('close', cleanup);
  79. dest.on('close', cleanup);
  80. dest.emit('pipe', source);
  81. // Allow for unix-like usage: A.pipe(B).pipe(C)
  82. return dest;
  83. };