Wednesday, 12 April 2017

Transforming & performing async operations on a MySQL stream

Let me stat that I'm relatively new to NodeJS and I may have some misunderstood concepts, apologies for my bad english as well, I'm not native.

So, basically what I want to do is

  • Read from a MySQL stream
  • pipe it to a transform stream, so I end up pushing buffered strings with 5 sql update operations "UPDATE 1;UPDATE 2..."
  • pipe it to a write stream that perform an async MySQL update

When running the code, it start generating the UPDATE string but it freezes like 30 segs, then all of a sudden it start updating like crazy for 3 or 4 seconds, and it crashes with a Error: Connection lost: The server closed the connection.

Weird thing, is that if I switch the update command for a setInterval (by faking an async operation) it responds as expected. That made me think about mysql performing issues or connection problems.

const cfg = require("my-config-module");
const mysql = require('mysql');
const BATCH_SIZE = 5;
const stream = require("stream");
const Transform = stream.Transform;
const Writable = stream.Writable;

let db = mysql.createPool({
  connectionLimit: 50,
  user: cfg.sql.user,
  host: cfg.sql.host,
  password: cfg.sql.password,
  database: cfg.sql.db,
  multipleStatements: true
});

let sql = `SELECT id FROM items`;

class BatchTransform extends Transform {
  constructor (opts) {
    super(opts);
    this.buffer = [];
  }
  _transform (data, encoding, done) {
    let { id } = data;
    let updated = 'test';
    let sql = `UPDATE items SET field = '${updated}' WHERE id = ${id}`;
    this.buffer.push(sql);
    console.log('Chunk recieved');
    if(this.buffer.length >= BATCH_SIZE){
      console.log('Chunk transformed!');
      this.push(this.buffer.join(";"));
      this.buffer = [];
    }
    done();
  }
}

class BatchWrite extends Writable {
  constructor (opts) {
    super(opts);
  }
  _write (data, encoding, done) {
    /*setTimeout(() => {
      console.log('Chunk saved');
      done();
    }, 650);*/
    db.query(data, (err, res) => {
      console.log('Chunk saved!');
      done();
    });
  }
}

let transformStream = new BatchTransform({objectMode: true, highWaterMark: 1});
let writeStream = new BatchWrite({objectMode: true, highWaterMark: 1});

let stream = db.query(sql).stream({highWaterMark: 1});
stream.on('error', (err) => console.log('err', err));
stream.on('end', () => console.log("END!"));
stream.pipe(transformStream).pipe(writeStream);

Any ideas in what am I missing?

Thanks a lot in advance for your time.



via manelgarcia

No comments:

Post a Comment