index.js 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. var Duplex = require('readable-stream').Duplex;
  2. var PassThrough = require('readable-stream').PassThrough;
  3. var Readable = require('readable-stream').Readable;
  4. var inherits = require('inherits');
  5. var nextTick = typeof setImmediate !== 'undefined'
  6. ? setImmediate : process.nextTick
  7. ;
  8. module.exports = Pipeline;
  9. inherits(Pipeline, Duplex);
  10. module.exports.obj = function (streams, opts) {
  11. if (!opts && !Array.isArray(streams)) {
  12. opts = streams;
  13. streams = [];
  14. }
  15. if (!streams) streams = [];
  16. if (!opts) opts = {};
  17. opts.objectMode = true;
  18. return new Pipeline(streams, opts);
  19. };
  20. function Pipeline (streams, opts) {
  21. if (!(this instanceof Pipeline)) return new Pipeline(streams, opts);
  22. if (!opts && !Array.isArray(streams)) {
  23. opts = streams;
  24. streams = [];
  25. }
  26. if (!streams) streams = [];
  27. if (!opts) opts = {};
  28. Duplex.call(this, opts);
  29. var self = this;
  30. this._options = opts;
  31. this._wrapOptions = { objectMode: opts.objectMode !== false };
  32. this._streams = [];
  33. this.splice.apply(this, [ 0, 0 ].concat(streams));
  34. this.once('finish', function () {
  35. self._notEmpty();
  36. self._streams[0].end();
  37. });
  38. }
  39. Pipeline.prototype._read = function () {
  40. var self = this;
  41. this._notEmpty();
  42. var r = this._streams[this._streams.length-1];
  43. var buf, reads = 0;
  44. while ((buf = r.read()) !== null) {
  45. Duplex.prototype.push.call(this, buf);
  46. reads ++;
  47. }
  48. if (reads === 0) {
  49. var onreadable = function () {
  50. r.removeListener('readable', onreadable);
  51. self.removeListener('_mutate', onreadable);
  52. self._read()
  53. };
  54. r.once('readable', onreadable);
  55. self.once('_mutate', onreadable);
  56. }
  57. };
  58. Pipeline.prototype._write = function (buf, enc, next) {
  59. this._notEmpty();
  60. this._streams[0]._write(buf, enc, next);
  61. };
  62. Pipeline.prototype._notEmpty = function () {
  63. var self = this;
  64. if (this._streams.length > 0) return;
  65. var stream = new PassThrough(this._options);
  66. stream.once('end', function () {
  67. var ix = self._streams.indexOf(stream);
  68. if (ix >= 0 && ix === self._streams.length - 1) {
  69. Duplex.prototype.push.call(self, null);
  70. }
  71. });
  72. this._streams.push(stream);
  73. this.length = this._streams.length;
  74. };
  75. Pipeline.prototype.push = function (stream) {
  76. var args = [ this._streams.length, 0 ].concat([].slice.call(arguments));
  77. this.splice.apply(this, args);
  78. return this._streams.length;
  79. };
  80. Pipeline.prototype.pop = function () {
  81. return this.splice(this._streams.length-1,1)[0];
  82. };
  83. Pipeline.prototype.shift = function () {
  84. return this.splice(0,1)[0];
  85. };
  86. Pipeline.prototype.unshift = function () {
  87. this.splice.apply(this, [0,0].concat([].slice.call(arguments)));
  88. return this._streams.length;
  89. };
  90. Pipeline.prototype.splice = function (start, removeLen) {
  91. var self = this;
  92. var len = this._streams.length;
  93. start = start < 0 ? len - start : start;
  94. if (removeLen === undefined) removeLen = len - start;
  95. removeLen = Math.max(0, Math.min(len - start, removeLen));
  96. for (var i = start; i < start + removeLen; i++) {
  97. if (self._streams[i-1]) {
  98. self._streams[i-1].unpipe(self._streams[i]);
  99. }
  100. }
  101. if (self._streams[i-1] && self._streams[i]) {
  102. self._streams[i-1].unpipe(self._streams[i]);
  103. }
  104. var end = i;
  105. var reps = [], args = arguments;
  106. for (var j = 2; j < args.length; j++) (function (stream) {
  107. if (Array.isArray(stream)) {
  108. stream = new Pipeline(stream, self._options);
  109. }
  110. stream.on('error', function (err) {
  111. err.stream = this;
  112. self.emit('error', err);
  113. });
  114. stream = self._wrapStream(stream);
  115. stream.once('end', function () {
  116. var ix = self._streams.indexOf(stream);
  117. if (ix >= 0 && ix === self._streams.length - 1) {
  118. Duplex.prototype.push.call(self, null);
  119. }
  120. });
  121. reps.push(stream);
  122. })(arguments[j]);
  123. for (var i = 0; i < reps.length - 1; i++) {
  124. reps[i].pipe(reps[i+1]);
  125. }
  126. if (reps.length && self._streams[end]) {
  127. reps[reps.length-1].pipe(self._streams[end]);
  128. }
  129. if (reps[0] && self._streams[start-1]) {
  130. self._streams[start-1].pipe(reps[0]);
  131. }
  132. var sargs = [start,removeLen].concat(reps);
  133. var removed = self._streams.splice.apply(self._streams, sargs);
  134. for (var i = 0; i < reps.length; i++) {
  135. reps[i].read(0);
  136. }
  137. this.emit('_mutate');
  138. this.length = this._streams.length;
  139. return removed;
  140. };
  141. Pipeline.prototype.get = function () {
  142. if (arguments.length === 0) return undefined;
  143. var base = this;
  144. for (var i = 0; i < arguments.length; i++) {
  145. var index = arguments[i];
  146. if (index < 0) {
  147. base = base._streams[base._streams.length + index];
  148. }
  149. else {
  150. base = base._streams[index];
  151. }
  152. if (!base) return undefined;
  153. }
  154. return base;
  155. };
  156. Pipeline.prototype.indexOf = function (stream) {
  157. return this._streams.indexOf(stream);
  158. };
  159. Pipeline.prototype._wrapStream = function (stream) {
  160. if (typeof stream.read === 'function') return stream;
  161. var w = new Readable(this._wrapOptions).wrap(stream);
  162. w._write = function (buf, enc, next) {
  163. if (stream.write(buf) === false) {
  164. stream.once('drain', next);
  165. }
  166. else nextTick(next);
  167. };
  168. return w;
  169. };