Monday, 3 April 2017

Node Streams: Wait until data is available

I have 3 streams (A,B,C) which are piped into another (A->B->C). When I start my program B's _read get's called immediatly because it's piped to C. However there is no data in the B-stream yet as A fetches data async. Once B receives the data, which is passed to the _write method of B, it transform the data and emits an 'readable' event (which I fire manually - is this the supposed way of doing it?).

However nothing happens and the data from B is not consumed by anyone (hence B's _read is not called). I can workaround this by calling (on B) this._read() at the end of my _write() method. But this could potentially also push data to the consumers although the queue is full, right?

Basically I want to send a larger chunk of data into the B-stream, split it into smaller ones, and then pass those on to C one by one. So I want to have some kind of buffer in B.

_read(size) {
    if(this._lineBuffer.length > 0) {
        var stop = false;
        while(!stop) {
            stop = this.push(this._lineBuffer.splice(0,1));
        }
    }
    if(this._pendingWriteAck) {
        this._pendingWriteAck();
        this._pendingWriteAck = null;
    }
}

_write(chunk, encoding, callback) {
    console.log("New chunk for line splitter received");
    if(this._buffer) {
        this._buffer = Buffer.concat([this._buffer, chunk]);
    } else {
        this._buffer = chunk;
    }
    for (; this._offset < this._buffer.length; this._offset++) {
        if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
            this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
            this._buffer = this._buffer.slice(this._offset + 1);
            this._offset = 0;
        }
    }

    if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
        console.log("Line Split buffer has reached capacity. Waiting...");
        this._pendingWriteAck = callback;
    } else {
        callback();
    }

    setImmediate(()=>{
        this.emit('readable');
        this._read();
    })
}



via newBee

No comments:

Post a Comment