Sunday, 16 April 2017

Piping twitter stream API into MongoDB with NodeJS

bit of a NodeJS newbie here.

For my data visualisation assignment, I'm constructing a twitter scraper with NodeJS to do live sentiment analysis of specific topics. Got a Mongo database up and running and I have a working version of this stream however it connects/disconnects for every entry. I've created a new version (below) that keeps the connection open whilst the twitter stream is open, this reports that it's saving the entries to the db however I'm unable to read these back from the database. I'm assuming the problem is not gracefully closing the database connection.

Any guidance or suggestions would be greatly appreciated. I'm planning on running this for a month with an estimated volume of 100m tweets. With my current version, the database crashes at least once a day so I need to get the modified stream up and running.

Thanks in advance.

var Twitter = require('twitter');
var mongo = require('mongodb').MongoClient;
var streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
var fs = require('fs');

var outputConfig = { dbURL : 'mongodb://localhost:27017/tweets',
  collection : 'stream-data' };
var url = 'mongodb://localhost:27017/tweets';

var writableStream = streamToMongoDB(outputConfig);

// READ LEXICON
var lex_path = 'NRC-Emotion-Lexicon-Wordlevel-v0.92.txt';
var lex = {};

fs.readFile(lex_path, 'utf8', function(err, data) {
  if (err==null) {
      var values = [];
      lines = data.split('\r\n');
      lines.forEach(function(line) {
        values.push(line.split('\t'));
      });
      values.forEach(function(line) {
        var word = line[0],
            emotion = line[1],
            score = line[2];

        // check word exists in dict
        if (!(word in lex)) {
          lex[word]={};
        }

        // add word-emotion score
        lex[word][emotion]=score;
      });
  }
});


// ANALYSIS FUNCTION
function scoreTweet(tweet) {
  var positivity = 0;
  var emotionScore = {
    'anger' : 0,
    'anticipation' : 0,
    'disgust' : 0,
    'fear' : 0,
    'joy' : 0,
    'sadness' : 0,
    'surprise' : 0,
    'trust' : 0
  }

  if (tweet!=undefined) {
    words = tweet.split(' ');

    words.forEach(function(word) {

      if (word in lex) {
        emotions = lex[word];

        var pos = 0;
        var emo = {
          'anger' : 0,
          'anticipation' : 0,
          'disgust' : 0,
          'fear' : 0,
          'joy' : 0,
          'sadness' : 0,
          'surprise' : 0,
          'trust' : 0
        }

        Object.keys(emotions).forEach(function(key) {
            if (key=='positive') {
              pos += parseInt(emotions[key]);
            } else if (key=='negative') {
              pos -= parseInt(emotions[key]);
            } else {
              emo[key] += parseInt(emotions[key]);
            }
        });

        positivity += pos;
        Object.keys(emo).forEach(function(key) {
          emotionScore[key] += emo[key]
        });

      }
    });
  }

  return [positivity, emotionScore];
}


// TWITTER STREAMER
var client = new Twitter({
  // OAuth Details
  consumer_key: 'ghCTHKmkSMrFO9HWMEGvIg32i',
  consumer_secret: 'msSJ9WpWLmqQJYU2zqCeLfKPWQeUveeEmHOYiFEXh4mkV4aaH0',
  access_token_key: '177110494-JEYGKT3xSLFUYXuDRfammmpCYrWAPtKrz4rMHaq2',
  access_token_secret: 'l4C8yHfwITVMODboKoD2m4R0hwEK8cvMxcY4LJeKefZ2k'
});

var stream = client.stream('statuses/filter', {track: 'trump'});

var counter = 0,
  sum = 0,
  sumCounter = 0,
  sumPositivity = 0;

mongo.connect(url, function(err, db) {
  test.equal(null, err);
  test.ok(db != null);

  // KEY METRICS
  function resetCounter() {

    var newEntry = {
      time: Date.now(),
      velocity: counter,
      total: sumCounter
    }

    db.collection('key-metrics').insertOne(newEntry,
      function(err, result) {
      //console.log(newEntry);
      counter = 0;
    });

    console.log('reset');
  }

  setInterval(resetCounter, 1000);



  stream.on('data', function(event) {
    //console.log(event && event.text);
    counter += 1;
    sumCounter += 1;

    analysisResults = scoreTweet(event.text);
    sumPositivity += analysisResults[0];

    var newEntry = {
      tweetText: event.text,
      positivity: analysisResults[0],
      emotionScore: analysisResults[1],
      sumPositivity: sumPositivity
    };

    //console.log('no error');
    db.collection('stream-data').insertOne(newEntry, function(err, result) {
      console.log(newEntry);
      console.log('tweet saved');

    });
  });



  db.close();
  test.done();


});

stream.on('error', function(error) {
  throw error;
});



via Tim O'Connell

No comments:

Post a Comment