Thursday, 18 May 2017

I want to run a cron in node js to add youtube video data in Neo4j DB but getting error when the cron runs

app.js

var express           =     require('express')
  , neo4j_match               =     require('neo4j')
   , session           =     require('express-session')
  , bodyParser        =     require('body-parser')
  , moment            =     require('moment')
  , request           =     require('request')
  , app               =     express();
  var every = require('every-moment');
  var neo4j = require('neo4j-driver').v1;
  app.use(bodyParser.json());
  app.use(bodyParser.urlencoded({extended: true}));
  var db = new neo4j_match.GraphDatabase('http://Novasys:1234567@localhost:7474');
var driver = neo4j.driver("bolt://localhost:7687", neo4j.auth.basic("Novasys", "1234567"));
var session = driver.session();
  every(40, 'seconds', function() {

session
  .run('match (c:channel) where c.from="yt" return c')
  .then(function(result){
    return result.records.map(function(record){
      return {
        title: record._fields[0].properties.channelid
      };
    })
    session.close();
  })
  .then(function(cron_channel){
    for (var k in cron_channel){
       (function(k){
          console.log(cron_channel[k].title);
          request('https://www.googleapis.com/youtube/v3/search?part=snippet&maxResults=50&channelId='+cron_channel[k].title+'&order=date&key=AIzaSyBj7mztzfuZq_LA_5TYHLAw-Qaa2Fof3JU', function (error, response, body) {
  if (!error && response.statusCode == 200) {
  body=JSON.parse(body);

  for(var i in body.items){
    (function(i){
    //selecting video id and getting their meta data
    request('https://www.googleapis.com/youtube/v3/videos?part=statistics,snippet,contentDetails&id='+body.items[i].id.videoId+'&key=AIzaSyBj7mztzfuZq_LA_5TYHLAw-Qaa2Fof3JU',function(err, res, vid){
    if (!error && res.statusCode == 200){
    vid=JSON.parse(vid);
    for(var j in vid.items){
        (function(j){
      //node creation 
      if(vid.items[j].statistics.likeCount == null){
        db.cypher({
    query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, categoryid:{categoryidParam}, title:{titleParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title, 
        categoryidParam:vid.items[j].snippet.categoryId,
        descriptionParam:vid.items[j].snippet.description,
        created_timeParam:vid.items[j].snippet.publishedAt, 
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:vid.items[j].statistics.viewCount,
        likeCountParam:0, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
        console.log("success");
    }

});
      }
      else if(vid.items[j].statistics.viewCount == null){
           db.cypher({
    query: 'match (c:channel), (v:video), (u:user) where (v.source=c.channelid OR v.source=c.title) AND c.channelid={channel_id} AND u.title={userid} optional match (u:user)-[r:like]-(v) optional match (u:user)-[re:repick]-(v) RETURN v.title,v.id, v.created_time,v.likes,v.views,v.android_likes, v.android_views,v.from,c.title, c.channelid,c.channelthumbnail,c.followers, c.video_count,r.title ,re.title ORDER BY v.created_time LIMIT 200',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title,
        descriptionParam:vid.items[j].snippet.description, 
        categoryidParam:vid.items[j].snippet.categoryId,
        created_timeParam:vid.items[j].snippet.publishedAt,
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:0,
        likeCountParam:vid.items[j].statistics.likeCount, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
        console.log("success");
    }
});
      }
      else if(vid.items[j].statistics.viewCount == null && vid.items[j].statistics.likeCount == null){

db.cypher({
    query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, categoryid:{categoryidParam}, title:{titleParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title,
        descriptionParam:vid.items[j].snippet.description, 
        categoryidParam:vid.items[j].snippet.categoryId,
        created_timeParam:vid.items[j].snippet.publishedAt, 
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:0, 
        likeCountParam:0, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
         console.log("success");
    }
});

      }
      else{
      db.cypher({
    query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, title:{titleParam}, categoryid:{categoryidParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title,
        descriptionParam:vid.items[j].snippet.description, 
        categoryidParam:vid.items[j].snippet.categoryId,
        created_timeParam:vid.items[j].snippet.publishedAt,
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:vid.items[j].statistics.viewCount, 
        likeCountParam:vid.items[j].statistics.likeCount, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
        console.log("success");
    }
});
}
  session
  .run('match (a:video), (b:language) where a.language=b.title merge (b)-[r:video_lang{title:"video_lang"}]-(a) return a,b,r ')
  .then(function(result){
    console.log("success1");
    session.close();
  })
  .catch(function(err){
    console.log(err);
  });

  //relation creation
  session
  .run('match (a:channel), (b:video) where a.channelid={channelParam} AND b.source={channelParam} merge (a)-[r:rel_video{title:"yt"}]->(b) return a,b,r',{channelParam:cron_channel[k].title})
  .then(function(result){
    console.log("success2");
    session.close();
  })
  .catch(function(err){
    console.log(err);
  });
        })(j);
        }
                }
    })  
})(i);
}//first for
  }
})

       })(k);


}



  }); 
});

the server side error

{ Error: Connection was closed by server
    at Neo4jError.Error (native)
    at new Neo4jError (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\error.js:76:132)
    at newError (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\error.js:66:10)
    at NodeChannel._handleConnectionTerminated (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\internal\ch-node.js:336:41)
    at emitNone (events.js:91:20)
    at TLSSocket.emit (events.js:185:7)
    at endReadableNT (_stream_readable.js:974:12)
    at _combinedTickCallback (internal/process/next_tick.js:74:11)
    at process._tickCallback (internal/process/next_tick.js:98:9) code: 'ServiceUnavailable' }

Database log debug.log

2017-05-18 16:09:16.849+0000 ERROR [o.n.b.t.SocketTransportHandler] Fatal error occurred when handling a client connection: failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216) failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216)
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:624)
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:578)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:709)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:698)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:262)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:170)
    at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
    at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1472)
    at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1482)
    at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:519)
    at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:490)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:779)
    at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:39)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1089)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:454)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
    at java.lang.Thread.run(Unknown Source)
2017-05-18 16:09:16.937+0000 WARN  [io.netty.channel.AbstractChannelHandlerContext] Failed to mark a promise as failure because it has succeeded already: DefaultChannelPromise@3c0b7552(success)

I'm absolutely stumped, any help is much appreciated. I'm going mad. I've tried a million variations of starting and closing sessions and drivers to no avail. I was trying to solve this issue from 3 days but not getting any solution plz help.



via Mishu

No comments:

Post a Comment