I setup a kinesis stream which trigger a lambda function (100 batch) in node js to process data. While I get records from kinesis and successfully able to insert into db by following code.
Kinesis Setup
Shards : 1
Data retention period: 24 hr
Lambda Trigger for kinesis
Batch Size: 100
Staring Position : Latest
Lambda Code
module.exports.handler = (event, context, callback) => {
console.log(event);
if(event.Records.length > 0) {
var db = require("../lib/db");
var dbObj = new db();
dbObj.connect("mongodb://127.0.0.1:27017/testdb").
then(function(conn) {
const collection = conn.collection('testcollection');
for(var i in event.Records) {
console.log(event.Records[i]);
var data = Buffer.from(event.Records[i].kinesis.data, 'base64').toString();
console.log(data);
collection.insert({ "data" : JSON.parse(data)}, function() {
if( i == event.Records.length - 1 ) {
conn.close();
}
});
}
});
}
}
But what i am noticing that I get all data including previous processed data and their are no filters available to get only unprocessed data ( some thing like last 1 hour or data starting from sequence number ). Example in first request I get 50 records in second I get 52 records and third I get 55 and so on,
For first option timestamp is an option at lambda -> trigger settings -> Staring Position but that would be static not dynamic like I require ( last one hour ).
Same for second option, where sequence number is not available as filter.
If my solution is incorrect ?
via kuldeep.kamboj
No comments:
Post a Comment