// consumer package main import ( "fmt" // "github.com/garyburd/redigo/redis" // "gopkg.in/mgo.v2/bson" "log" "strconv" "strings" "time" ) type consumed struct { user string error bool logins []string empty bool } func contains(s []Ips, e string) bool { for _, a := range s { if a.Ip == e { return true } } return false } func consumer(id int) { for { prod := <-consume[id] cons := consumed{ user: prod.user, logins: make([]string, 0), error: false, empty: true, } start := time.Now() for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) cons.logins = append(cons.logins, login) continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) cons.logins = append(cons.logins, login) continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) cons.logins = append(cons.logins, login) continue } ml := MongoLogin{ User: prod.user, Protocol: sval[0], Ip: sval[2], Date: time.Unix(date, 0), Insert: time.Now(), } if opts.Month != ml.Date.Format("0601") { lt := dbs.mdb.DB("lastlogin").C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))) err = lt.Insert(ml) if err != nil { log.Printf("Insert error: %+v - %s - %s\n", err, cons.user, lt.FullName) count.AddErr() cons.error = true continue } if opts.Debug { log.Printf("%s - %+v\n", lt.FullName, ml) } } else { // inserisce il login su Mongodb err = dbs.ll.Insert(ml) if err != nil { log.Printf("Insert error: %+v - %s\n", err, cons.user) count.AddErr() cons.error = true continue } if opts.Debug { log.Printf("%+v\n", ml) } } cons.logins = append(cons.logins, login) } count.AddLog(len(prod.logins)) if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { cons.empty = true } if opts.Debug { fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } remove[id] <- cons } }