123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- var Stream = require('stream')
- // through
- //
- // a stream that does nothing but re-emit the input.
- // useful for aggregating a series of changing but not ending streams into one stream)
- exports = module.exports = through
- through.through = through
- //create a readable writable stream.
- function through (write, end, opts) {
- write = write || function (data) { this.queue(data) }
- end = end || function () { this.queue(null) }
- var ended = false, destroyed = false, buffer = [], _ended = false
- var stream = new Stream()
- stream.readable = stream.writable = true
- stream.paused = false
- // stream.autoPause = !(opts && opts.autoPause === false)
- stream.autoDestroy = !(opts && opts.autoDestroy === false)
- stream.write = function (data) {
- write.call(this, data)
- return !stream.paused
- }
- function drain() {
- while(buffer.length && !stream.paused) {
- var data = buffer.shift()
- if(null === data)
- return stream.emit('end')
- else
- stream.emit('data', data)
- }
- }
- stream.queue = stream.push = function (data) {
- // console.error(ended)
- if(_ended) return stream
- if(data === null) _ended = true
- buffer.push(data)
- drain()
- return stream
- }
- //this will be registered as the first 'end' listener
- //must call destroy next tick, to make sure we're after any
- //stream piped from here.
- //this is only a problem if end is not emitted synchronously.
- //a nicer way to do this is to make sure this is the last listener for 'end'
- stream.on('end', function () {
- stream.readable = false
- if(!stream.writable && stream.autoDestroy)
- process.nextTick(function () {
- stream.destroy()
- })
- })
- function _end () {
- stream.writable = false
- end.call(stream)
- if(!stream.readable && stream.autoDestroy)
- stream.destroy()
- }
- stream.end = function (data) {
- if(ended) return
- ended = true
- if(arguments.length) stream.write(data)
- _end() // will emit or queue
- return stream
- }
- stream.destroy = function () {
- if(destroyed) return
- destroyed = true
- ended = true
- buffer.length = 0
- stream.writable = stream.readable = false
- stream.emit('close')
- return stream
- }
- stream.pause = function () {
- if(stream.paused) return
- stream.paused = true
- return stream
- }
- stream.resume = function () {
- if(stream.paused) {
- stream.paused = false
- stream.emit('resume')
- }
- drain()
- //may have become paused again,
- //as drain emits 'data'.
- if(!stream.paused)
- stream.emit('drain')
- return stream
- }
- return stream
- }
|