I am writing a stream processor that reads a file line-by-line and ultimately calls a REST API (PUT) with each object read from the file.
I am using the "event-stream" package to do this, writing my processor using the map-stream element.
The problem I'm running into is the lack of control over the flow of data. I can set a highWaterMark (number of bytes) on the read stream, but the size of each object varies greatly, so it's not quite sufficient. I either over-constrain and process too slowly, or under-constrain and overflow my server's ability to process the PUTs.
My code looks like this:
const fs = require('fs');
const es = require('event-stream');
fs.createReadStream('data.json', {flags: 'r', highWaterMark: 256 })
.pipe(es.split())
.pipe(es.map(function (line, cb) {
let obj = JSON.parse(line);
myuploaderthingy.put('/url', obj)
.then(() => {
cb(null, line);
})
.catch(error => {
console.error(error);
cb(error);
});
}));
What I really want is to limit the number of outstanding REST calls. What's the easiest way to do this?
via Jim Baldwin
No comments:
Post a Comment