diff --git a/consumer.go b/consumer.go index f030e4a..d12bce5 100644 --- a/consumer.go +++ b/consumer.go @@ -17,7 +17,6 @@ type consumed struct { user string error bool logins []string - empty bool } func hash(val []byte) string { @@ -41,32 +40,23 @@ func consumer() { var bulk = make(map[string][]*mgo.Bulk) var allLogins = make(map[string]MongoLogin) - cons := consumed{ - user: prod.user, - logins: make([]string, 0), - error: false, - } - for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) - cons.logins = append(cons.logins, login) continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) - cons.logins = append(cons.logins, login) continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) - cons.logins = append(cons.logins, login) continue } mlID := hash([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405") @@ -105,7 +95,7 @@ func consumer() { if err != nil { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) - cons.error = true + prod.err = true counter <- Counterchan{ tipo: "err", val: len(prod.logins), @@ -128,13 +118,6 @@ func consumer() { } } } - - cons.logins = append(cons.logins, prod.logins...) - } - - counter <- Counterchan{ - tipo: "log", - val: len(prod.logins), } if opts.Debug { @@ -142,10 +125,17 @@ func consumer() { } if opts.Debug && len(prod.logins) > 10 { - fmt.Printf("LOGS: %+v\n", cons.logins) + fmt.Printf("LOGS: %+v\n", prod.logins) + } + + if !prod.err { + counter <- Counterchan{ + tipo: "rem", + val: len(prod.logins), + } } // wg.Done() - remove <- cons + remove <- prod } } diff --git a/main.go b/main.go index 68f54be..847ce99 100644 --- a/main.go +++ b/main.go @@ -21,8 +21,8 @@ var ( loop bool done chan bool - consume chan produced - remove chan consumed + consume chan loginsList + remove chan loginsList counter chan Counterchan @@ -85,8 +85,8 @@ func main() { count = NewCounter() - consume = make(chan produced, opts.Queue) - remove = make(chan consumed, opts.Queue) + consume = make(chan loginsList, opts.Queue) + remove = make(chan loginsList, opts.Queue) loop = true done = make(chan bool) counter = make(chan Counterchan) diff --git a/producer.go b/producer.go index 686c72a..cb5911d 100644 --- a/producer.go +++ b/producer.go @@ -15,6 +15,12 @@ type produced struct { logins []string } +type loginsList struct { + user string + logins []string + err bool +} + func producer() { conn := dbs.rdb.Get() defer conn.Close() @@ -54,11 +60,6 @@ func producer() { tipo: "user", val: 1, } - wg.Add(1) - counter <- Counterchan{ - tipo: "wg", - val: 1, - } if opts.Debug { fmt.Printf("PROD: %+v\n", time.Since(start)) @@ -68,9 +69,21 @@ func producer() { log.Printf("PROD: %s - %d\n", user, len(logs)) } - consume <- produced{ + wg.Add(1) + counter <- Counterchan{ + tipo: "wg", + val: 1, + } + + counter <- Counterchan{ + tipo: "log", + val: len(logs), + } + + consume <- loginsList{ user: user, logins: logs, + err: false, } } } diff --git a/remover.go b/remover.go index 4539662..9dc2679 100644 --- a/remover.go +++ b/remover.go @@ -18,7 +18,7 @@ func remover() { status = _Remover start := time.Now() - if !rem.error { + if !rem.err { for i := range rem.logins { // cancella da Redis la riga di login inserita partendo da 1 conn.Send("lrem", rem.user, "1", rem.logins[i]) @@ -26,7 +26,7 @@ func remover() { } // se ci sono errori o non e' vuota la lista di logins reinserisce lo user - if rem.error { + if rem.err { if opts.Debug { fmt.Printf("SADD: %s\n", rem.user) } @@ -38,20 +38,14 @@ func remover() { conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) conn.Flush() - if !rem.error { - counter <- Counterchan{ - tipo: "rem", - val: len(rem.logins), - } - } - if opts.Debug { fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start)) } - wg.Done() + counter <- Counterchan{ tipo: "wg", val: -1, } + wg.Done() } }