MyRepo-Ums/node_modules/@ljharb/through/index.js
2024-01-19 11:09:11 +01:00

120 lines
2.7 KiB
JavaScript

'use strict';
var Stream = require('stream').Stream;
var callBind = require('call-bind');
// create a readable writable stream.
function through(write, end, opts) {
var writeBound = callBind(write || function (data) { this.queue(data); });
var endBound = callBind(end || function () { this.queue(null); });
var ended = false;
var destroyed = false;
var buffer = [];
var _ended = false;
var stream = new Stream();
stream.readable = true;
stream.writable = true;
stream.paused = false;
// stream.autoPause = !(opts && opts.autoPause === false)
stream.autoDestroy = !(opts && opts.autoDestroy === false);
stream.write = function (data) {
writeBound(this, data);
return !stream.paused;
};
function drain() {
while (buffer.length && !stream.paused) {
var data = buffer.shift();
if (data === null) { return stream.emit('end'); }
stream.emit('data', data);
}
}
stream.queue = function (data) {
// console.error(ended)
if (_ended) { return stream; }
if (data === null) { _ended = true; }
buffer.push(data);
drain();
return stream;
};
stream.push = stream.queue;
/*
* this will be registered as the first 'end' listener
* must call destroy next tick, to make sure we're after any
* stream piped from here.
* this is only a problem if end is not emitted synchronously.
* a nicer way to do this is to make sure this is the last listener for 'end'
*/
stream.on('end', function () {
stream.readable = false;
if (!stream.writable && stream.autoDestroy) {
process.nextTick(function () {
stream.destroy();
});
}
});
function _end() {
stream.writable = false;
endBound(stream);
if (!stream.readable && stream.autoDestroy) { stream.destroy(); }
}
stream.end = function (data) {
if (ended) { return; }
ended = true;
if (arguments.length) { stream.write(data); }
_end(); // will emit or queue
return stream;
};
stream.destroy = function () {
if (destroyed) { return; }
destroyed = true;
ended = true;
buffer.length = 0;
stream.writable = false;
stream.readable = false;
stream.emit('close');
return stream;
};
stream.pause = function () {
if (stream.paused) { return; }
stream.paused = true;
return stream;
};
stream.resume = function () {
if (stream.paused) {
stream.paused = false;
stream.emit('resume');
}
drain();
/*
* may have become paused again,
* as drain emits 'data'.
*/
if (!stream.paused) { stream.emit('drain'); }
return stream;
};
return stream;
}
/*
* through
*
* a stream that does nothing but re-emit the input.
* useful for aggregating a series of changing but not ending streams into one stream)
*/
module.exports = through;
through.through = through;