llmongodb/llmongo.js
2015-05-27 16:37:07 +02:00

160 lines
4.8 KiB
JavaScript
Executable file

#!/usr/bin/env node
var Redis = require('redis');
var MongoClient = require('mongodb').MongoClient
var fs = require('fs')
var Console = require('console').Console;
var redis, mongodb, lastlogin, mongoUri, redisUri, logname;
var pathname = process.argv[1].slice(0, process.argv[1].lastIndexOf('/'));
function Logger(path) {
this.writestream = fs.createWriteStream(path+'/log/llmongo.log', { flags: 'a'});
this.mylog = new Console(this.writestream, this.writestream);
}
Logger.prototype.log = function(){
var args = Array.prototype.slice.call(arguments);
var string = args[0];
args.shift();
// console.log(string + " - " + args );
if (args.length > 0) {
this.mylog.log.apply(this, ["%s "+string,(new Date()).toJSON()].concat(args));
} else {
this.mylog.log("%s "+string,(new Date()).toJSON());
}
}
logger = new Logger(pathname);
var args = process.argv.slice(2);
args.forEach(function(val, index, array){
switch(val) {
case '-mongo':
if(args[index+1].search('mongodb://') >= 0) {
mongoUri = args[index+1];
} else {
mongoUri = 'mongodb://'+args[index+1];
}
break;
case '-redis':
if(args[index+1].match(/\S+:\d+/)) {
redisUri = args[index+1];
} else {
logger.log("Wrong redis URI");
process.exit(0);
}
break;
}
});
mongoUri = mongoUri || 'mongodb://127.0.0.1:27018/dovecot';
redisUri = redisUri || 'redis-ll.mail.tiscali.sys:6379';
logger.log("Connecting redis: %s", redisUri);
redis = Redis.createClient(redisUri.split(':')[1], redisUri.split(':')[0]) //new Redis(redisUri);
redis.on("error", function (err) {
logger.log("Error redis %s", err);
process.exit(-99);
});
logger.log("Connecting mongodb: %s", mongoUri);
MongoClient.connect(mongoUri, function(err, db){
if(err){
logger.log("Mongo connect error: %s", err);
if (redis ) {
redis.end();
};
process.exit(-99);
}
lastlogin = db.collection('lastlogin');
function quit(){
redis.end();
db.close();
process.exit(0);
}
// Legge uno user dal SET llindex e lo elimina; viene eseguita fino a quando ci sono users.
// Calcola quante righe di lastlogin ci sono e chiama la funzione ricorsiva readLogs
function readUsers() {
logger.log("readUsers");
redis.spop('llindex', function(err, user){
if( err ) {
logger.log("Error redis spop llindex: %s", err);
quit();
process.exit(-1);
}
if( user ) {
logger.log("readLogs");
redis.llen(user, function(err, llen){
if (err) {
logger.log("Error redis llen %s: ",user, err)
quit();
process.exit(-2);
} else {
logger.log("logslen: %s", llen);
// Legge una riga di log, la elimina da redis e la inserisce su mongo.
// Viene eseguito n-1 volte, con n = numero di righe di log.
function readLogs(lindex, user, len) {
if (lindex == len ){
// raggiunto il numero di righe da leggere, riparte con un altro user ( se esiste )
logger.log("Done: %s - ", user, lindex);
readUsers();
} else {
logger.log("rpop log: %s", user);
redis.rpop(user, function(err, log){
if ( err ) {
logger.log("Error redis rpop %s: ",user,err);
quit();
process.exit(-3);
} else {
logger.log("log: %s", log);
// imap:1432630259:10.39.75.60
var slog = log.split(':');
var data = new Date(parseInt(slog[1]+"000"));
// logger.log("Data: %s ", data, slog[1]);
lastlogin.insert({
user: user,
protocol: slog[0],
date: data,
ip: slog[2],
}, function(err, r){
if(err){
logger.log("insert error: %s", err);
redis.rpush(user, log, function(err, result){
quit();
process.exit(-4);
});
} else {
// Salvato il log su mongo, legge il successivo.
logger.log("Saved: %s - ", log, user, lindex);
readLogs(lindex+1, user, len);
}
});
}
})
};
};
}
// Legge il primo record per lo user "user" per llen-1 volte.
readLogs(0, user, llen-1);
});
} else {
// Se non ci sono piu' users, chiude i DB ed esce.
logger.log("quit readUsers");
quit();
}
});
};
readUsers();
});