Tuesday, 14 March 2017

RxJS | sequencing | out of memory

I need to process 1 million lines of record, transform each one of them, and save them to multiple files (binned by the hour; 1 file for each hour -- I'm thinking of splitting them by "filter").

For some reason I need those lines to be processed strictly sequentially. Meaning, if line #450000 takes longer to process and to save (this is the tricky part because fs is async with callback), the processing wouldn't jump to #450001... It will wait until 450000 is finished.

Previously (with simple Promise, no RxJs), I would create N promises, one for each line, keep them in an array, and do the chaining by reduce op, as explained here: https://github.com/kriskowal/q

But I don't want to create 1 million instances of Promises. So, I looked into ReactiveX, hoping that it will be like "pass the buck"; meaning it wouldn't wait, the processing would take place as soon as an event pops-up, and the resource used by the processing (thinking that the processing block is basically a promise behind the scene) will be released as soon as possible.

I tried to verify that with this code:

import Rx from 'rxjs-es6/Rx';
import Q from 'q';    

let subject = new Rx.Subject();
let processEventJsons = function(observable) {
  observable.flatMap(eventJson => {
    let deferred = Q.defer();    

    setTimeout(() => {
      eventJson.procDatetime = new Date().toISOString();
      deferred.resolve(eventJson);
    }, Math.random() * 5000);    

    return Rx.Observable.fromPromise(deferred.promise)
  })
  .subscribe({
    next: enrichedEventJson => {
      console.log(JSON.stringify(enrichedEventJson));
    },
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done'),
  });
}    

processEventJsons(
  subject.filter(dataJson => dataJson.type === "interview").map(dataJson => {
    return {event: "intv", datetime: dataJson.datetime}
  })
)    

processEventJsons(
  subject.filter(dataJson => dataJson.type === "checkin").map(dataJson => {
    return {event: "chki", datetime: dataJson.datetime}
  })
)    

for (let i = 0; i < 1000000; i++) {
  if (Math.random() < 0.5) {
    subject.next({id: i, type: "interview", datetime: new Date().toISOString()});
  } else {
    subject.next({id: i, type: "checkin", datetime: new Date().toISOString()});
  }
}
subject.complete();

But... I kept getting FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory. The console.log(JSON.stringify(enrichedEventJson)); doesn't get printed until the "for-loop" (at the end of the code) is completed.

This makes me thing switching to RxJS is not really improving the situation; it still queue up promises behind the scene.

Or... am I using the API wrongly? Can you help me point out what's wrong?

Thanks, Raka



via Cokorda Raka

No comments:

Post a Comment