Friday 9 June 2017

Node Stream - Output multiple Transform streams to single PassThrough stream

I periodically have to download/parse a bunch of Json data, about 1000~1.000.000 lines.

Each request has a chunk limit of 5000. So I would like to fire of a bunch of request at the time, stream each output through its own Transfomer for filtering out the key/value's and then write to a combined stream that writes its output to the database.

But with every attempt it doesn't work, or it gives errors because to many event listeners are set. What seems correct if I understand the the 'last pipe' is always the reference next in the chain.

Here is some code (changed it lot of times so could make little sense).

The question is: Is it bad practice to join multiple streams to one? Google also doesn't show a whole lot about it.

Thanks!

brokerApi/getCandles.js

// The 'combined output' stream
let passStream = new Stream.PassThrough();

countChunks.forEach(chunk => {
    let arr = [];
    let leftOver = '';
    let startFound = false;
    let lastPiece = false;
    let firstByte = false;
    let now = Date.now();

    let transformStream = this._client

        // Returns PassThrough stream
        .getCandles(instrument, chunk.from, chunk.until, timeFrame, chunk.count)
        .on('error', err => console.error(err) || passStream.emit('error', err))
        .on('end', () => {
            if (++finished === countChunks.length)
                passStream.end();
        })
        .pipe(passStream);

    transformStream._transform = function(data, type, done) {
        /** Treansform to typedArray **/

        this.push(/** Taansformed value **/)
    }
});

Extra - Other file that 'consumes' the stream (writes to DB)

DataLayer.js

    brokerApi.getCandles(instrument, timeFrame, from, until, count)
            .on('data', async (buf: NodeBuffer) => {
                this._dataLayer.write(instrument, timeFrame, buf);

                if (from && until) {
                    await this._mapper.update(instrument, timeFrame, from, until, buf.length / (10 * Float64Array.BYTES_PER_ELEMENT));
                } else {
                    if (buf.length) {
                        if (!from)
                            from = buf.readDoubleLE(0);

                        if (!until) {
                            until = buf.readDoubleLE(buf.length - (10 * Float64Array.BYTES_PER_ELEMENT));
                            console.log('UNTIL TUNIL', until);
                        }

                        if (from && until)
                            await this._mapper.update(instrument, timeFrame, from, until, buf.length / (10 * Float64Array.BYTES_PER_ELEMENT));
                    }
                }

            })
            .on('end', () => {
                winston.info(`Cache: Fetching ${instrument} took ${Date.now() - now} ms`);
                resolve()
            })
            .on('error', reject)



via DutchKev

No comments:

Post a Comment