index.js 822 B

123456789101112131415161718192021222324252627282930
  1. var Readable = require('readable-stream').Readable;
  2. module.exports = function (stream) {
  3. var opts = stream._readableState;
  4. if (typeof stream.read !== 'function') {
  5. stream = new Readable(opts).wrap(stream);
  6. }
  7. var ro = new Readable({ objectMode: opts && opts.objectMode });
  8. var waiting = false;
  9. stream.on('readable', function () {
  10. if (waiting) {
  11. waiting = false;
  12. ro._read();
  13. }
  14. });
  15. ro._read = function () {
  16. var buf, reads = 0;
  17. while ((buf = stream.read()) !== null) {
  18. ro.push(buf);
  19. reads ++;
  20. }
  21. if (reads === 0) waiting = true;
  22. };
  23. stream.once('end', function () { ro.push(null) });
  24. stream.on('error', function (err) { ro.emit('error', err) });
  25. return ro;
  26. };