This commit is contained in:
Michele Fadda 2015-07-30 10:21:01 +02:00
parent c084ad175f
commit 7c48a60d3b

View file

@ -19,11 +19,6 @@ import (
)
type Options struct {
MongoUri string
RedisUri string
rdb *redis.Client
mdb *mgo.Session
ll *mgo.Collection
RedisTTL time.Duration
CurrentPath string
Exe string
@ -32,6 +27,14 @@ type Options struct {
Version bool
}
type Dbs struct {
MongoUri string
RedisUri string
rdb *redis.Client
mdb *mgo.Session
ll *mgo.Collection
}
type MongoLogin struct {
User string `json:"user"`
Protocol string `json:"protocol"`
@ -45,18 +48,19 @@ type Index struct {
}
const (
_VERSION = "v1.1.0"
_VERSION = "v1.1.1"
)
var (
opts = Options{
MongoUri: "mongodb://127.0.0.1:27018",
RedisUri: "redis-ll.mail.tiscali.sys:6379",
//MongoUri: "mongodb://10.39.81.85:27018",
//RedisUri: "127.0.0.1:6379",
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
dbs = Dbs{
MongoUri: "mongodb://127.0.0.1:27018",
RedisUri: "127.0.0.1:6379",
}
)
func usage() {
@ -75,32 +79,30 @@ func init() {
opts.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.MongoUri, "m", opts.MongoUri, "Mongodb")
flag.StringVar(&opts.RedisUri, "r", opts.RedisUri, "Redis")
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
}
func connectRedis(uri string) *redis.Client {
rdb, err := redis.Dial("tcp", opts.RedisUri)
func (db *Dbs) connectRedis() {
var err error
db.rdb, err = redis.Dial("tcp", db.RedisUri)
if err != nil {
log.Println("Redis connect Error: ", err.Error())
os.Exit(-1)
}
return rdb
}
func connectMongo(uri string) (*mgo.Session, *mgo.Collection) {
mdb, err := mgo.Dial(uri)
func (db *Dbs) connectMongo() {
var err error
db.mdb, err = mgo.Dial(db.MongoUri)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
ll := opts.mdb.DB("dovecot").C("lastlogin")
return mdb, ll
db.ll = db.mdb.DB("dovecot").C("lastlogin")
}
func checkPID(bpid []byte) bool {
@ -112,9 +114,12 @@ func checkPID(bpid []byte) bool {
return false
}
cmd := bytes.Trim(bcmd, "\x00")
fmt.Println(string(cmd), opts.Exe)
// fmt.Println(string(cmd), opts.Exe)
if strings.Contains(string(cmd), opts.Exe) {
return true
} else {
fmt.Printf("PID %s used by %s\n", pid, cmd)
return true
}
return true
}
@ -146,13 +151,14 @@ func main() {
}
defer os.Remove(opts.PIDFile)
fmt.Println(os.Stat(opts.PIDFile))
// fmt.Println(os.Stat(opts.PIDFile))
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
os.Exit(-4)
}
defer fs.Close()
log.SetOutput(fs)
// log.SetPrefix("[llmongo] ")
@ -161,11 +167,11 @@ func main() {
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
opts.rdb = connectRedis(opts.RedisUri)
defer opts.rdb.Close()
dbs.connectRedis()
defer dbs.rdb.Close()
opts.mdb, opts.ll = connectMongo(opts.MongoUri)
defer opts.mdb.Close()
dbs.connectMongo()
defer dbs.mdb.Close()
// // Estrae la lista degli utenti che hanno fatto login negli ultimi X min.
// llindex := opts.rdb.Cmd("smembers", "llindex")
@ -176,9 +182,11 @@ func main() {
// // for _, user := range lista {
// // cicla fino a che esistono righe di login
// var wg sync.WaitGroup
count := 0
for {
// estrae un userid dalla lista degli utenti che hanno fatto login
spop := opts.rdb.Cmd("spop", "llindex")
spop := dbs.rdb.Cmd("spop", "llindex")
user, err := spop.Str()
log.Printf("SPOP: %+v %+v\n", spop, user)
// se non ci sono piu' userid esce
@ -186,19 +194,20 @@ func main() {
log.Printf("LLINDEX empty: %v\n", err)
break
}
count++
// user := spop.String()
var date int64
var lastval, val string
for {
// Estrae l'ultimo login dell'utente 'user'
val, err = opts.rdb.Cmd("lindex", user, "-1").Str()
val, err = dbs.rdb.Cmd("lindex", user, "-1").Str()
if err != nil {
log.Printf("LINDEX error: %+v\n", err)
// se ha trovato user e righe di login
if lastval != "" {
// reinserisce l'ultimo login e imposta il ttl su Redis
retval := opts.rdb.Cmd("lpush", user, lastval)
ttl := opts.rdb.Cmd("expire", user, opts.RedisTTL.Seconds())
retval := dbs.rdb.Cmd("lpush", user, lastval)
ttl := dbs.rdb.Cmd("expire", user, opts.RedisTTL.Seconds())
log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
}
break
@ -206,7 +215,7 @@ func main() {
// se la riga di login e' vuota
if val == "" {
log.Println("Login empty: ", user)
retval := opts.rdb.Cmd("lrem", user, "-1", val)
retval := dbs.rdb.Cmd("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
@ -214,7 +223,7 @@ func main() {
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", val, user)
retval := opts.rdb.Cmd("lrem", user, "-1", val)
retval := dbs.rdb.Cmd("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
@ -235,7 +244,8 @@ func main() {
Date: time.Unix(date, 0),
}
// inserisce il login su Mongodb
_, err = opts.ll.Upsert(ind, ml)
_, err := dbs.ll.Upsert(ind, ml)
if err != nil {
log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
@ -243,8 +253,9 @@ func main() {
continue
}
}
// fmt.Println("Info: ", info)
// cancella da Redis la riga di login inserita
retval := opts.rdb.Cmd("lrem", user, "-1", val)
retval := dbs.rdb.Cmd("lrem", user, "-1", val)
log.Println("LREM retval: ", retval, user, val)
lastval = val
}
@ -259,6 +270,6 @@ func main() {
*/
}
fmt.Printf("Stop %v\n", time.Since(start))
log.Printf("Stop %v\n", time.Since(start))
fmt.Printf("Stop %v - %d\n", time.Since(start), count)
log.Printf("Stop %v - %d\n", time.Since(start), count)
}