123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- 'use strict';
- var through = require('through2');
- function forward(chunk, enc, cb) {
- cb(null, chunk);
- }
- function toThrough(readable) {
- var opts = {
- objectMode: readable._readableState.objectMode,
- highWaterMark: readable._readableState.highWaterMark,
- };
- function flush(cb) {
- var self = this;
- readable.on('readable', onReadable);
- readable.on('end', cb);
- function onReadable() {
- var chunk;
- while (chunk = readable.read()) {
- self.push(chunk);
- }
- }
- }
- var wrapper = through(opts, forward, flush);
- var shouldFlow = true;
- wrapper.once('pipe', onPipe);
- wrapper.on('newListener', onListener);
- readable.on('error', wrapper.emit.bind(wrapper, 'error'));
- function onListener(event) {
- // Once we've seen the data or readable event, check if we need to flow
- if (event === 'data' || event === 'readable') {
- maybeFlow();
- this.removeListener('newListener', onListener);
- }
- }
- function onPipe() {
- // If the wrapper is piped, disable flow
- shouldFlow = false;
- }
- function maybeFlow() {
- // If we need to flow, end the stream which triggers flush
- if (shouldFlow) {
- wrapper.end();
- }
- }
- return wrapper;
- }
- module.exports = toThrough;
|