Saturday 13 May 2017

Node.js - streaming multiple readable streams to single response

My goal is to stream multiple mongoose cursors (readable) to the same express response (writable).

The problem that I'm facing is that after the first cursor finish, the writable stream gets close event (not finish!) and I can't write to it anymore.

My mongoose model function that is streaming the cursor results to the express response and calculates the count of documents that were streamed is as follows (don't mind the arguments names, these are just for here):

const Promise = require('bluebird');

MongooseSchema.statics.StreamLimitedValues = function (Param1, DateParam, Limit, ExpressResponse) {
    if (0 >= Limit) {
        return Promise.resolve(0);
    }

    let thisRef = this;

    let query = {
        //.... some query
    };

    return new Promise(function (resolve, reject) {
        let count = 0;

        let stream = thisRef.find(query)
            .sort({Date: -1})
            .limit(Limit)
            .lean()
            .cursor({transform: transformFunc}
            );

        function transformFunc (Doc) {
            return JSON.stringify(Doc);
        }

        stream.on('data', (doc) => {
            count++;
        }).on('error', (errInst) => {
            return reject(errInst);
        }).on('end', () => {
            return resolve(count);
        }).pipe(ExpressResponse, {end: false});
    });
};

I'm passing the express response writable object to this function.

I tried to create the same function with ExpressResponse.write(doc) in the on('data') callback instead of pipe, and the results are the same.

This function is called multiple times (sequentially, not parallel) with different parameters and different Limit based on the results of the call to this function before (as you can see I'm returning the count variable in the on('end')).

The problem is that the writable express response close event is emitted after the first call to the function above and I can't write to it anymore.

Any idea of what is happening and how to solve this issue?



via TomG

No comments:

Post a Comment