Monday 29 May 2017

Node event-stream: flow control based on max concurrent events?

I am writing a stream processor that reads a file line-by-line and ultimately calls a REST API (PUT) with each object read from the file.

I am using the "event-stream" package to do this, writing my processor using the map-stream element.

The problem I'm running into is the lack of control over the flow of data. I can set a highWaterMark (number of bytes) on the read stream, but the size of each object varies greatly, so it's not quite sufficient. I either over-constrain and process too slowly, or under-constrain and overflow my server's ability to process the PUTs.

My code looks like this:

const fs = require('fs');
const es = require('event-stream');

fs.createReadStream('data.json', {flags: 'r', highWaterMark: 256 })
.pipe(es.split())
.pipe(es.map(function (line, cb) {
    let obj = JSON.parse(line);
    myuploaderthingy.put('/url', obj)
    .then(() => {
        cb(null, line);
    })
    .catch(error => {
        console.error(error);
        cb(error);
    });
}));

What I really want is to limit the number of outstanding REST calls. What's the easiest way to do this?



via Jim Baldwin

No comments:

Post a Comment