// consumer package main import ( "fmt" // "github.com/garyburd/redigo/redis" "log" "strconv" "strings" "time" ) type consumed struct { user string error bool logins []string } func consumer(id int) { // var conn = dbs.rdb.Get() // defer conn.Close() for { prod := <-consume[id] // wg.Add(1) // defer wg.Done() cons := consumed{ user: prod.user, logins: make([]string, 0), error: false, } start := time.Now() for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) cons.logins = append(cons.logins, login) // retval, _ := conn.Do("lrem", user, "0", login) // log.Println("LREM retval: ", user, login, retval) // return continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) cons.logins = append(cons.logins, login) // retval, _ := conn.Do("lrem", user, "0", login) // log.Println("LREM retval: ", user, login, retval) // return continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) cons.logins = append(cons.logins, login) // retval, _ := conn.Do("lrem", user, "0", login) // log.Println("LREM retval: ", user, login, retval) // return continue } ml := MongoLogin{ User: prod.user, Protocol: sval[0], Ip: sval[2], Date: time.Unix(date, 0), } ind := Index{ User: prod.user, Date: time.Unix(date, 0), } // inserisce il login su Mongodb retval, err := dbs.ll.Upsert(ind, ml) if err != nil { log.Printf("Insert error: %+v - %s\n", err, cons.user) // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente count.AddErr() cons.error = true continue } if retval.Updated == 1 { count.AddDuplicate() } if opts.Debug { log.Printf("%+v - %+v\n", ml, retval) } if i < (len(prod.logins) - 1) { cons.logins = append(cons.logins, login) // cancella da Redis la riga di login inserita // retval, _ := conn.Do("lrem", user, "0", login) // if opts.Debug { // log.Println("LREM retval: ", retval, user, login) // fmt.Println("LREM retval: ", retval, user, login) // } } } count.AddLog(len(prod.logins)) if opts.Debug { fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n\r", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) // log.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n\r", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } remove[id] <- cons } }