Wednesday 24 May 2017

How to get only not processed records from aws kinesis using lambda

I setup a kinesis stream which trigger a lambda function (100 batch) in node js to process data. While I get records from kinesis and successfully able to insert into db by following code.

Kinesis Setup
  Shards : 1
  Data retention period: 24 hr

Lambda Trigger for kinesis
  Batch Size: 100
  Staring Position : Latest


Lambda Code

module.exports.handler = (event, context, callback) => {
    console.log(event);
    if(event.Records.length > 0) {
        var db = require("../lib/db");
        var dbObj = new db();
        dbObj.connect("mongodb://127.0.0.1:27017/testdb"). 
        then(function(conn) {
            const collection = conn.collection('testcollection');
            for(var i in event.Records) {
                console.log(event.Records[i]);
                var data = Buffer.from(event.Records[i].kinesis.data, 'base64').toString();
                console.log(data);

                collection.insert({ "data" : JSON.parse(data)}, function() {
                    if( i == event.Records.length - 1 ) {
                        conn.close();
                    }
                });
            }
        });
    }
}

But what i am noticing that I get all data including previous processed data and their are no filters available to get only unprocessed data ( some thing like last 1 hour or data starting from sequence number ). Example in first request I get 50 records in second I get 52 records and third I get 55 and so on,

For first option timestamp is an option at lambda -> trigger settings -> Staring Position but that would be static not dynamic like I require ( last one hour ).

Same for second option, where sequence number is not available as filter.

If my solution is incorrect ?



via kuldeep.kamboj

No comments:

Post a Comment