im using event-stream lib to split and parse Huge files(70MB+(gzip mode)). for some reason the parse stop in line ~250,000.
first, i thought the problem is the file size(if the file is bigger the chance to stuck is higher), so i started to test it, and i got the result that from 120MB+- nodejs stuck.
but after time the nodejs stuck on 70MB files.
second,i monitored the CPU when the running is stuck, and the CPU was fine. so the problem should be at the code...
i tried to use pause and resume, and the problem didn't solved.
code with pause and resume:
var s = fileStraem
.pipe(gUzip)
.pipe(es.split())
.pipe(es.mapSync(function(line){
s.pause();
var matches = line.match(directivePattern);
if (matches) {
if (matches[1] === 'Version') {
fields = [];
directives = {};
} else if (matches[1] === 'Fields') {
fields = matches[2].split(/\s+/).map(function(name) {
return name !== '-' ? name : null;
});
}
directives[matches[1]] = matches[2];
s.resume();
return;
}else{
lineParsed+=1;
var values = _.map(line.split(/\s+/), function(value) {
try{
return decodeURIComponent(value);
}
catch(ex){
return("");
}
});
var data = _.assign(_.zipObject(fields, values), {raw: line, directives: directives});
if (data['cs-uri-query']) {
data['cs-uri-query'] = _.zipObject(_.map(data['cs-uri-query'].split('&'), function(part) {
return _.slice(part.match(/(.*?)=(.*)/), 1);
}));
}
if(data){
var remote_addr = data["c-ip"];
if(lineParsed%100000==0){
console.log(lineParsed);
}
if(remote_addr){
var ipNumber= convertIpToNumber(remote_addr),time_local= data["date"] + " " + data["time"]
,public_uri = data["cs-uri-stem"],port=data["s-port"],status=data["sc-status"],body_bytes_sent = data["sc-bytes"],region="initial",isParseRowWell = parseInt(body_bytes_sent);
if(isNaN(isParseRowWell) == false){
dbclient1.zrangebyscore('iplookup',ipNumber,4294967295,'limit',0,1, function(err, reply) {
if(reply == undefined || reply[0] == undefined || reply[0] == ""){
console.log("reply is undefined!");
}else{
region = reply[0];
setUsageData(public_uri,body_bytes_sent,region,time_local,port,status,protocol,data);
}
});
s.resume();
}
else{
console.log("Row from not provider well we skip on this row");
console.log("The file name is : " + fileName);
s.resume();
}
}else{
console.log("remoteADDR is undefined!");
s.resume();
}
}else{
s.resume();
}
}})
.on('end', function(){
console.log("do something...");
})
);
code without pause and resume :
var readline = require('readline'),
stream = require('stream'),
_ = require('lodash');
var directives = {},fields = [],directivePattern = /^#(.*?):\s*([^\s].*)/;
var s = fileStraem
.pipe(gUzip)
.pipe(es.split())
.pipe(es.mapSync(function(line){
var matches = line.match(directivePattern);
if (matches) {
if (matches[1] === 'Version') {
fields = [];
directives = {};
} else if (matches[1] === 'Fields') {
fields = matches[2].split(/\s+/).map(function(name) {
return name !== '-' ? name : null;
});
}
directives[matches[1]] = matches[2];
return;
}else{
lineParsed+=1;
var values = _.map(line.split(/\s+/), function(value) {
try{
return decodeURIComponent(value);
}
catch(ex){
return("");
}
});
var data = _.assign(_.zipObject(fields, values), {raw: line, directives: directives});
if (data['cs-uri-query']) {
data['cs-uri-query'] = _.zipObject(_.map(data['cs-uri-query'].split('&'), function(part) {
return _.slice(part.match(/(.*?)=(.*)/), 1);
}));
}
var remote_addr = data["c-ip"];
if(lineParsed%100000==0){
console.log(lineParsed);
}
if(remote_addr){
var ipNumber= convertIpToNumber(remote_addr),time_local= data["date"] + " " + data["time"]
,public_uri = data["cs-uri-stem"],port=data["s-port"],status=data["sc-status"],body_bytes_sent = data["sc-bytes"],region="initial",isParseRowWell = parseInt(body_bytes_sent);
if(isNaN(isParseRowWell) == false){
dbclient1.zrangebyscore('iplookup',ipNumber,4294967295,'limit',0,1, function(err, reply) {
if(reply == undefined || reply[0] == undefined || reply[0] == ""){
//console.log("reply is undefined!");
setUsageData(public_uri,body_bytes_sent,region,time_local,port,status,protocol,data);
}else{
region = reply[0];
setUsageData(public_uri,body_bytes_sent,region,time_local,port,status,protocol,data);
}
});
}
else{
console.log("Row not provider well we skip on this row");
}
}else{
console.log("remoteADDR is undefined!");
}
}})
.on('end', function(){
console.log("do something...");
})
);
via Nir Konky
No comments:
Post a Comment