123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- var Duplex = require('readable-stream').Duplex;
- var PassThrough = require('readable-stream').PassThrough;
- var Readable = require('readable-stream').Readable;
- var inherits = require('inherits');
- var nextTick = typeof setImmediate !== 'undefined'
- ? setImmediate : process.nextTick
- ;
- module.exports = Pipeline;
- inherits(Pipeline, Duplex);
- module.exports.obj = function (streams, opts) {
- if (!opts && !Array.isArray(streams)) {
- opts = streams;
- streams = [];
- }
- if (!streams) streams = [];
- if (!opts) opts = {};
- opts.objectMode = true;
- return new Pipeline(streams, opts);
- };
- function Pipeline (streams, opts) {
- if (!(this instanceof Pipeline)) return new Pipeline(streams, opts);
- if (!opts && !Array.isArray(streams)) {
- opts = streams;
- streams = [];
- }
- if (!streams) streams = [];
- if (!opts) opts = {};
- Duplex.call(this, opts);
-
- var self = this;
- this._options = opts;
- this._wrapOptions = { objectMode: opts.objectMode !== false };
- this._streams = [];
-
- this.splice.apply(this, [ 0, 0 ].concat(streams));
-
- this.once('finish', function () {
- self._notEmpty();
- self._streams[0].end();
- });
- }
- Pipeline.prototype._read = function () {
- var self = this;
- this._notEmpty();
-
- var r = this._streams[this._streams.length-1];
- var buf, reads = 0;
- while ((buf = r.read()) !== null) {
- Duplex.prototype.push.call(this, buf);
- reads ++;
- }
- if (reads === 0) {
- var onreadable = function () {
- r.removeListener('readable', onreadable);
- self.removeListener('_mutate', onreadable);
- self._read()
- };
- r.once('readable', onreadable);
- self.once('_mutate', onreadable);
- }
- };
- Pipeline.prototype._write = function (buf, enc, next) {
- this._notEmpty();
- this._streams[0]._write(buf, enc, next);
- };
- Pipeline.prototype._notEmpty = function () {
- var self = this;
- if (this._streams.length > 0) return;
- var stream = new PassThrough(this._options);
- stream.once('end', function () {
- var ix = self._streams.indexOf(stream);
- if (ix >= 0 && ix === self._streams.length - 1) {
- Duplex.prototype.push.call(self, null);
- }
- });
- this._streams.push(stream);
- this.length = this._streams.length;
- };
- Pipeline.prototype.push = function (stream) {
- var args = [ this._streams.length, 0 ].concat([].slice.call(arguments));
- this.splice.apply(this, args);
- return this._streams.length;
- };
- Pipeline.prototype.pop = function () {
- return this.splice(this._streams.length-1,1)[0];
- };
- Pipeline.prototype.shift = function () {
- return this.splice(0,1)[0];
- };
- Pipeline.prototype.unshift = function () {
- this.splice.apply(this, [0,0].concat([].slice.call(arguments)));
- return this._streams.length;
- };
- Pipeline.prototype.splice = function (start, removeLen) {
- var self = this;
- var len = this._streams.length;
- start = start < 0 ? len - start : start;
- if (removeLen === undefined) removeLen = len - start;
- removeLen = Math.max(0, Math.min(len - start, removeLen));
-
- for (var i = start; i < start + removeLen; i++) {
- if (self._streams[i-1]) {
- self._streams[i-1].unpipe(self._streams[i]);
- }
- }
- if (self._streams[i-1] && self._streams[i]) {
- self._streams[i-1].unpipe(self._streams[i]);
- }
- var end = i;
-
- var reps = [], args = arguments;
- for (var j = 2; j < args.length; j++) (function (stream) {
- if (Array.isArray(stream)) {
- stream = new Pipeline(stream, self._options);
- }
- stream.on('error', function (err) {
- err.stream = this;
- self.emit('error', err);
- });
- stream = self._wrapStream(stream);
- stream.once('end', function () {
- var ix = self._streams.indexOf(stream);
- if (ix >= 0 && ix === self._streams.length - 1) {
- Duplex.prototype.push.call(self, null);
- }
- });
- reps.push(stream);
- })(arguments[j]);
-
- for (var i = 0; i < reps.length - 1; i++) {
- reps[i].pipe(reps[i+1]);
- }
-
- if (reps.length && self._streams[end]) {
- reps[reps.length-1].pipe(self._streams[end]);
- }
- if (reps[0] && self._streams[start-1]) {
- self._streams[start-1].pipe(reps[0]);
- }
-
- var sargs = [start,removeLen].concat(reps);
- var removed = self._streams.splice.apply(self._streams, sargs);
-
- for (var i = 0; i < reps.length; i++) {
- reps[i].read(0);
- }
-
- this.emit('_mutate');
- this.length = this._streams.length;
- return removed;
- };
- Pipeline.prototype.get = function () {
- if (arguments.length === 0) return undefined;
-
- var base = this;
- for (var i = 0; i < arguments.length; i++) {
- var index = arguments[i];
- if (index < 0) {
- base = base._streams[base._streams.length + index];
- }
- else {
- base = base._streams[index];
- }
- if (!base) return undefined;
- }
- return base;
- };
- Pipeline.prototype.indexOf = function (stream) {
- return this._streams.indexOf(stream);
- };
- Pipeline.prototype._wrapStream = function (stream) {
- if (typeof stream.read === 'function') return stream;
- var w = new Readable(this._wrapOptions).wrap(stream);
- w._write = function (buf, enc, next) {
- if (stream.write(buf) === false) {
- stream.once('drain', next);
- }
- else nextTick(next);
- };
- return w;
- };
|