I have millions of rows in my Cassandra db that I want to stream to the client in a zip file. I am using the stream() function from the Cassandra-Node driver, piping to a Transformer which extracts the one field from each row that I care about and appends a newline, pipes to gzip and then pipes to the Express Response object. This seems to work fine but I can't figure out how to properly handle errors during streaming. I have to set the appropriate headers/status before streaming for the client, but if there is an error during the streaming, on the dbStream for example, I want to clean up all of the pipes and reset the response status to be something like 404. But If I try to reset the status after the headers are set and the streaming starts, I get Can't set headers after they are sent
. I've looked all over and can't find how to properly handle errors in Node when piping/streaming to the Response object. Can anyone help?
streamVotesToWriteStream(query, res, options) {
return new Promise((resolve, reject) => {
let success = true;
const dbStream = db.client.stream(query);
const rowTransformer = new Transform({
objectMode: true,
transform(row, encoding, callback) {
try {
const vote = row.vote + '\n';
callback(null, vote);
} catch (err) {
callback(null, err.message + '\n');
}
}
});
// Handle res events
res.on('error', (err) => {
logger.error(`res ${res} error`);
return reject(err);
});
dbStream.on('error', function(err) {
res.status(404).send() // Can't set headers after they are sent.
logger.debug(`dbStream error: ${err}`);
success = false;
//res.end();
//return reject(err);
});
res.writeHead(200, { 'content-encoding': 'gzip' });
const gzip = zlib.createGzip();
dbStream.pipe(rowTransformer).pipe(gzip).pipe(res);
});
}
via gcosta
No comments:
Post a Comment