// consumer package main import ( "fmt" // "github.com/garyburd/redigo/redis" // "gopkg.in/mgo.v2/bson" "log" "strconv" "strings" "time" ) type consumed struct { user string error bool logins []string } func contains(s []Ips, e string) bool { for _, a := range s { if a.Ip == e { return true } } return false } func consumer(id int) { // var conn = dbs.rdb.Get() // defer conn.Close() for { prod := <-consume[id] // wg.Add(1) // defer wg.Done() cons := consumed{ user: prod.user, logins: make([]string, 0), error: false, } start := time.Now() for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) cons.logins = append(cons.logins, login) // retval, _ := conn.Do("lrem", user, "0", login) // log.Println("LREM retval: ", user, login, retval) // return continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) cons.logins = append(cons.logins, login) // retval, _ := conn.Do("lrem", user, "0", login) // log.Println("LREM retval: ", user, login, retval) // return continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) cons.logins = append(cons.logins, login) // retval, _ := conn.Do("lrem", user, "0", login) // log.Println("LREM retval: ", user, login, retval) // return continue } ml := MongoLogin{ User: prod.user, Protocol: sval[0], Ip: sval[2], Date: time.Unix(date, 0), } // cerca se esiste gia' un documento con gli stessi User & Date /* docfind := []Ips{} iter := dbs.ll.Find(bson.M{"user": prod.user, "date": time.Unix(date, 0)}).Select(bson.M{"ip": 1, "_id": 0}).Iter() iter.All(&docfind) if len(docfind) > 0 { count.AddDuplicate() if !contains(docfind, ml.Ip) { fmt.Printf("Insert != IP for same date: user=%s - date=%s\n - newip=%s - oldip=%s\n", ml.User, ml.Date, docfind, ml.Ip) // inserisce il login su Mongodb se gli IP sono != err := dbs.ll.Insert(ml) if err != nil { log.Printf("Insert error: %+v - %s\n", err, cons.user) count.AddErr() cons.error = true continue } } } else { */ // inserisce il login su Mongodb err = dbs.ll.Insert(ml) if err != nil { log.Printf("Insert error: %+v - %s\n", err, cons.user) count.AddErr() cons.error = true continue } // } // iter.Close() if opts.Debug { log.Printf("%+v - %+v\n", ml) } if i < (len(prod.logins) - 1) { cons.logins = append(cons.logins, login) // cancella da Redis la riga di login inserita // retval, _ := conn.Do("lrem", user, "0", login) // if opts.Debug { // log.Println("LREM retval: ", retval, user, login) // fmt.Println("LREM retval: ", retval, user, login) // } } } count.AddLog(len(prod.logins)) if opts.Debug { fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) // log.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } remove[id] <- cons } }