index.js 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. 'use strict';
  2. var through = require('through2');
  3. function forward(chunk, enc, cb) {
  4. cb(null, chunk);
  5. }
  6. function toThrough(readable) {
  7. var opts = {
  8. objectMode: readable._readableState.objectMode,
  9. highWaterMark: readable._readableState.highWaterMark,
  10. };
  11. function flush(cb) {
  12. var self = this;
  13. readable.on('readable', onReadable);
  14. readable.on('end', cb);
  15. function onReadable() {
  16. var chunk;
  17. while (chunk = readable.read()) {
  18. self.push(chunk);
  19. }
  20. }
  21. }
  22. var wrapper = through(opts, forward, flush);
  23. var shouldFlow = true;
  24. wrapper.once('pipe', onPipe);
  25. wrapper.on('newListener', onListener);
  26. readable.on('error', wrapper.emit.bind(wrapper, 'error'));
  27. function onListener(event) {
  28. // Once we've seen the data or readable event, check if we need to flow
  29. if (event === 'data' || event === 'readable') {
  30. maybeFlow();
  31. this.removeListener('newListener', onListener);
  32. }
  33. }
  34. function onPipe() {
  35. // If the wrapper is piped, disable flow
  36. shouldFlow = false;
  37. }
  38. function maybeFlow() {
  39. // If we need to flow, end the stream which triggers flush
  40. if (shouldFlow) {
  41. wrapper.end();
  42. }
  43. }
  44. return wrapper;
  45. }
  46. module.exports = toThrough;