Sunday, 23 April 2017

Node streams ending randomly

I'm writing program in node.js that will scan a disk, extract some metadata from files and store results in couchdb.

I'm trying to use node streams but my program ends processing randomly long before all files are processed. Sometimes I get few hundreds files done sometimes I get more than thousand but the end is premature and the number is not stable. I also does not get any error message and the 'Finished' line is not printed.

Code below

var miss = require('mississippi');
var gs = require('glob-stream');
var map = require('map-stream');
var _ = require('lodash');
const md5File = require('md5-file/promise')
const Writable = require('stream').Writable;
var patterns = _.map(['jpg', 'png', 'JPG'], (ext) => `${process.argv[2]}/**/*.${ext}`);
var PouchDB = require('pouchdb-http');
var db = new PouchDB('http://10.10.10.4:5984/images/');
var eos = require('end-of-stream');
const pouchStream = miss.to.obj(
    (object, encoding, callback) => {
        console.log(`saving ${object.path}`)
        var document = {
            _id: object.md5,
            instances: [{
                path: object.path
            }]
        };
        db.put(document).catch((error) => {
            if (error.name === 'conflict') {
                return db.get(object.md5).then((existing) => {
                    existing.instances.push({
                        path: object.path
                    });
                    return db.put(existing);
                })
            } else {
                console.error(error);
                throw error;
            }
        }).then((result) => {
            callback();
        }).catch((error) => {
            console.error(error);
            callback(error);
        });
    });
var enrichStream = map((item, cb) => {
    console.log(`enriching ${item.path}`)
    md5File(item.path).then((md5) => {
        item.md5 = md5;
        cb(null, item);
    }).catch((error) => cb(error));
    return false;
});

var stream = gs(patterns);
miss.pipe(stream, enrichStream, pouchStream, (err) => {
    if (err) return console.error('Error!', err)
    console.log('Finished');
})

It looks like the problem is with pouchStream - if I replace it with a simple writable stream logging to console everything works OK - but I do not know how to fix it.



via AGrzes

No comments:

Post a Comment