I have 3 streams (A,B,C) which are piped into another (A->B->C). When I start my program B's _read get's called immediatly because it's piped to C. However there is no data in the B-stream yet as A fetches data async. Once B receives the data, which is passed to the _write method of B, it transform the data and emits an 'readable' event (which I fire manually - is this the supposed way of doing it?).
However nothing happens and the data from B is not consumed by anyone (hence B's _read is not called). I can workaround this by calling (on B) this._read() at the end of my _write() method. But this could potentially also push data to the consumers although the queue is full, right?
Basically I want to send a larger chunk of data into the B-stream, split it into smaller ones, and then pass those on to C one by one. So I want to have some kind of buffer in B.
_read(size) {
if(this._lineBuffer.length > 0) {
var stop = false;
while(!stop) {
stop = this.push(this._lineBuffer.splice(0,1));
}
}
if(this._pendingWriteAck) {
this._pendingWriteAck();
this._pendingWriteAck = null;
}
}
_write(chunk, encoding, callback) {
console.log("New chunk for line splitter received");
if(this._buffer) {
this._buffer = Buffer.concat([this._buffer, chunk]);
} else {
this._buffer = chunk;
}
for (; this._offset < this._buffer.length; this._offset++) {
if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
this._buffer = this._buffer.slice(this._offset + 1);
this._offset = 0;
}
}
if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
console.log("Line Split buffer has reached capacity. Waiting...");
this._pendingWriteAck = callback;
} else {
callback();
}
setImmediate(()=>{
this.emit('readable');
this._read();
})
}
via newBee
No comments:
Post a Comment