I need to process 1 million lines of record, transform each one of them, and save them to multiple files (binned by the hour; 1 file for each hour -- I'm thinking of splitting them by "filter").
For some reason I need those lines to be processed strictly sequentially. Meaning, if line #450000 takes longer to process and to save (this is the tricky part because fs is async with callback), the processing wouldn't jump to #450001... It will wait until 450000 is finished.
Previously (with simple Promise, no RxJs), I would create N promises, one for each line, keep them in an array, and do the chaining by reduce op, as explained here: https://github.com/kriskowal/q
But I don't want to create 1 million instances of Promises. So, I looked into ReactiveX, hoping that it will be like "pass the buck"; meaning it wouldn't wait, the processing would take place as soon as an event pops-up, and the resource used by the processing (thinking that the processing block is basically a promise behind the scene) will be released as soon as possible.
I tried to verify that with this code:
import Rx from 'rxjs-es6/Rx';
import Q from 'q';
let subject = new Rx.Subject();
let processEventJsons = function(observable) {
observable.flatMap(eventJson => {
let deferred = Q.defer();
setTimeout(() => {
eventJson.procDatetime = new Date().toISOString();
deferred.resolve(eventJson);
}, Math.random() * 5000);
return Rx.Observable.fromPromise(deferred.promise)
})
.subscribe({
next: enrichedEventJson => {
console.log(JSON.stringify(enrichedEventJson));
},
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
}
processEventJsons(
subject.filter(dataJson => dataJson.type === "interview").map(dataJson => {
return {event: "intv", datetime: dataJson.datetime}
})
)
processEventJsons(
subject.filter(dataJson => dataJson.type === "checkin").map(dataJson => {
return {event: "chki", datetime: dataJson.datetime}
})
)
for (let i = 0; i < 1000000; i++) {
if (Math.random() < 0.5) {
subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
} else {
subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
}
}
subject.complete();
But... I kept getting FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory. The console.log(JSON.stringify(enrichedEventJson)); doesn't get printed until the "for-loop" (at the end of the code) is completed.
This makes me thing switching to RxJS is not really improving the situation; it still queue up promises behind the scene.
Or... am I using the API wrongly? Can you help me point out what's wrong?
Thanks, Raka
via Cokorda Raka
No comments:
Post a Comment