Let me stat that I'm relatively new to NodeJS and I may have some misunderstood concepts, apologies for my bad english as well, I'm not native.
So, basically what I want to do is
- Read from a MySQL stream
- pipe it to a transform stream, so I end up pushing buffered strings with 5 sql update operations
"UPDATE 1;UPDATE 2..." - pipe it to a write stream that perform an async MySQL update
When running the code, it start generating the UPDATE string but it freezes like 30 segs, then all of a sudden it start updating like crazy for 3 or 4 seconds, and it crashes with a Error: Connection lost: The server closed the connection.
Weird thing, is that if I switch the update command for a setInterval (by faking an async operation) it responds as expected. That made me think about mysql performing issues or connection problems.
const cfg = require("my-config-module");
const mysql = require('mysql');
const BATCH_SIZE = 5;
const stream = require("stream");
const Transform = stream.Transform;
const Writable = stream.Writable;
let db = mysql.createPool({
connectionLimit: 50,
user: cfg.sql.user,
host: cfg.sql.host,
password: cfg.sql.password,
database: cfg.sql.db,
multipleStatements: true
});
let sql = `SELECT id FROM items`;
class BatchTransform extends Transform {
constructor (opts) {
super(opts);
this.buffer = [];
}
_transform (data, encoding, done) {
let { id } = data;
let updated = 'test';
let sql = `UPDATE items SET field = '${updated}' WHERE id = ${id}`;
this.buffer.push(sql);
console.log('Chunk recieved');
if(this.buffer.length >= BATCH_SIZE){
console.log('Chunk transformed!');
this.push(this.buffer.join(";"));
this.buffer = [];
}
done();
}
}
class BatchWrite extends Writable {
constructor (opts) {
super(opts);
}
_write (data, encoding, done) {
/*setTimeout(() => {
console.log('Chunk saved');
done();
}, 650);*/
db.query(data, (err, res) => {
console.log('Chunk saved!');
done();
});
}
}
let transformStream = new BatchTransform({objectMode: true, highWaterMark: 1});
let writeStream = new BatchWrite({objectMode: true, highWaterMark: 1});
let stream = db.query(sql).stream({highWaterMark: 1});
stream.on('error', (err) => console.log('err', err));
stream.on('end', () => console.log("END!"));
stream.pipe(transformStream).pipe(writeStream);
Any ideas in what am I missing?
Thanks a lot in advance for your time.
via manelgarcia
No comments:
Post a Comment