#!/usr/bin/env node var Redis = require('redis'); var MongoClient = require('mongodb').MongoClient var fs = require('fs') var Console = require('console').Console; var redis, mongodb, lastlogin, mongoUri, redisUri, logname; var pathname = process.argv[1].slice(0, process.argv[1].lastIndexOf('/')); function Logger(path) { this.writestream = fs.createWriteStream(path+'/log/llmongo.log', { flags: 'a'}); this.mylog = new Console(this.writestream, this.writestream); } Logger.prototype.log = function(){ var args = Array.prototype.slice.call(arguments); var string = args[0]; args.shift(); // console.log(string + " - " + args ); if (args.length > 0) { this.mylog.log.apply(this, ["%s "+string,(new Date()).toJSON()].concat(args)); } else { this.mylog.log("%s "+string,(new Date()).toJSON()); } } logger = new Logger(pathname); var args = process.argv.slice(2); args.forEach(function(val, index, array){ switch(val) { case '-mongo': if(args[index+1].search('mongodb://') >= 0) { mongoUri = args[index+1]; } else { mongoUri = 'mongodb://'+args[index+1]; } break; case '-redis': if(args[index+1].match(/\S+:\d+/)) { redisUri = args[index+1]; } else { logger.log("Wrong redis URI"); process.exit(0); } break; } }); mongoUri = mongoUri || 'mongodb://127.0.0.1:27018/dovecot'; redisUri = redisUri || 'redis-ll.mail.tiscali.sys:6379'; logger.log("Connecting redis: %s", redisUri); redis = Redis.createClient(redisUri.split(':')[1], redisUri.split(':')[0]) //new Redis(redisUri); redis.on("error", function (err) { logger.log("Error redis %s", err); process.exit(-99); }); logger.log("Connecting mongodb: %s", mongoUri); MongoClient.connect(mongoUri, function(err, db){ if(err){ logger.log("Mongo connect error: %s", err); if (redis ) { redis.end(); }; process.exit(-99); } lastlogin = db.collection('lastlogin'); function quit(){ redis.end(); db.close(); process.exit(0); } // Legge uno user dal SET llindex e lo elimina; viene eseguita fino a quando ci sono users. // Calcola quante righe di lastlogin ci sono e chiama la funzione ricorsiva readLogs function readUsers() { logger.log("readUsers"); redis.spop('llindex', function(err, user){ if( err ) { logger.log("Error redis spop llindex: %s", err); quit(); process.exit(-1); } if( user ) { logger.log("readLogs"); redis.llen(user, function(err, llen){ if (err) { logger.log("Error redis llen %s: ",user, err) quit(); process.exit(-2); } else { logger.log("logslen: %s", llen); // Legge una riga di log, la elimina da redis e la inserisce su mongo. // Viene eseguito n-1 volte, con n = numero di righe di log. function readLogs(lindex, user, len) { if (lindex == len ){ // raggiunto il numero di righe da leggere, riparte con un altro user ( se esiste ) logger.log("Done: %s - ", user, lindex); readUsers(); } else { logger.log("rpop log: %s", user); redis.rpop(user, function(err, log){ if ( err ) { logger.log("Error redis rpop %s: ",user,err); quit(); process.exit(-3); } else { logger.log("log: %s", log); // imap:1432630259:10.39.75.60 var slog = log.split(':'); var data = new Date(parseInt(slog[1]+"000")); // logger.log("Data: %s ", data, slog[1]); lastlogin.insert({ user: user, protocol: slog[0], date: data, ip: slog[2], }, function(err, r){ if( err ){ console.log(err); if ( err.errmsg.indexOf("duplicate key error") >= 0 ) { logger.log("Duplicate key: %s - ", log, user, lindex); readLogs(lindex+1, user, len); }else { logger.log("insert error: %s", err); redis.rpush(user, log, function(err, result){ quit(); process.exit(-4); }); } } else { // Salvato il log su mongo, legge il successivo. logger.log("Saved: %s - ", log, user, lindex); readLogs(lindex+1, user, len); } }); } }) }; }; } // Legge il primo record per lo user "user" per llen-1 volte. readLogs(0, user, llen-1); }); } else { // Se non ci sono piu' users, chiude i DB ed esce. logger.log("quit readUsers"); quit(); } }); }; readUsers(); });