diff --git a/consumer.go b/consumer.go index ba37db8..9f92ab7 100644 --- a/consumer.go +++ b/consumer.go @@ -15,17 +15,17 @@ type consumed struct { logins []string } -func consumer(consume chan produced, remove chan consumed) { +func consumer(id int) { // var conn = dbs.rdb.Get() // defer conn.Close() - for loop { + for { - prod := <-consume + prod := <-consume[id] - wg.Add(1) - defer wg.Done() + // wg.Add(1) + // defer wg.Done() cons := consumed{ user: prod.user, @@ -75,13 +75,13 @@ func consumer(consume chan produced, remove chan consumed) { Date: time.Unix(date, 0), } // inserisce il login su Mongodb - count++ + _, err = dbs.ll.Upsert(ind, ml) if err != nil { log.Printf("Insert error: %+v\n", err) // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente if !strings.Contains(err.Error(), "E11000") { - errCount += 1 + count.AddErr() //return continue } @@ -97,11 +97,13 @@ func consumer(consume chan produced, remove chan consumed) { } } + count.AddLog(len(prod.logins)) + if opts.Debug { - fmt.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount()) - log.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount()) + fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n\r", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) + log.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n\r", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } - remove <- cons + remove[id] <- cons } } diff --git a/main.go b/main.go index 2cdff59..b8a247e 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( ) const ( - _VERSION = "v2.2.1" + _VERSION = "v2.2.2" ) var ( @@ -20,14 +20,15 @@ var ( LogFile: "log/llmongo.log", } - loop = true + loop []bool - count = 0 - errCount = 0 - - done = make(chan bool) + done []chan bool + consume []chan produced + remove []chan consumed wg sync.WaitGroup + + count *Counter ) func main() { @@ -38,7 +39,6 @@ func main() { fmt.Println(os.Args[0], _VERSION) os.Exit(0) } - defer close(done) setTerm() @@ -68,21 +68,31 @@ func main() { time.AfterFunc(opts.Timeout, exit) } - for i := 0; i < opts.Concurrent; i++ { - consume := make(chan produced) - remove := make(chan consumed) - // defer close(consume) - // defer close(remove) + count = NewCounter() - go producer(consume, done) - go consumer(consume, remove) - go remover(remove) + for i := 0; i < opts.Concurrent; i++ { + consume = append(consume, make(chan produced)) + remove = append(remove, make(chan consumed)) + loop = append(loop, true) + done = append(done, make(chan bool)) + + go producer(i) + go consumer(i) + go remover(i) } - <-done + for i := 0; i < opts.Concurrent; i++ { + <-done[i] + fmt.Printf("Done %d\n", i) + close(done[i]) + } + fmt.Println("Waiting WG") + for i := 0; i < opts.Concurrent; i++ { + fmt.Printf("ID (%d): %d\n", i, count.ValWG(i)) + } wg.Wait() - fmt.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount()) - log.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount()) + fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - conn: %d\n\r", time.Since(start), count.ValUser(), count.ValLog(), count.ValErr(), count.ValRem(), dbs.rdb.ActiveCount()) + log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - conn: %d\n\r", time.Since(start), count.ValUser(), count.ValLog(), count.ValErr(), count.ValRem(), dbs.rdb.ActiveCount()) } diff --git a/producer.go b/producer.go index 7b64c6b..f50aebe 100644 --- a/producer.go +++ b/producer.go @@ -13,27 +13,27 @@ type produced struct { logins []string } -func producer(consume chan produced, done chan bool) { +func producer(id int) { conn := dbs.rdb.Get() defer conn.Close() - for loop { + for loop[id] { start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login user, err := redis.String(conn.Do("spop", "llindex")) - if opts.Debug { - log.Printf("SPOP: %+v - %+v\n\r", user, err) - fmt.Printf("SPOP: %+v - %+v\n\r", user, err) - } + // if opts.Debug { + // log.Printf("SPOP: %+v - %+v\n\r", user, err) + // fmt.Printf("SPOP: %+v - %+v\n\r", user, err) + // } // se non ci sono piu' userid esce if err != nil { if opts.Debug { log.Printf("LLINDEX empty: %v\n\r", err) fmt.Printf("LLINDEX empty: %v\n\r", err) } - loop = false - done <- true + //loop[id] = false + done[id] <- true break } @@ -43,16 +43,21 @@ func producer(consume chan produced, done chan bool) { fmt.Printf("LRANGE: %+v - %+v\n\r", err, logs) log.Printf("LRANGE: %+v - %+v\n\r", err, logs) } - if opts.Debug { - fmt.Printf("LRANGE: %s - %d\n\r", user, len(logs)) - log.Printf("LRANGE: %s - %d\n\r", user, len(logs)) - } + // if opts.Debug { + // fmt.Printf("LRANGE: %s - %d\n\r", user, len(logs)) + // log.Printf("LRANGE: %s - %d\n\r", user, len(logs)) + // } if opts.Debug { - fmt.Printf("PROD: user=%s in %v - conn=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount()) + fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n\r", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) + log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n\r", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) } - consume <- produced{ + count.AddUser() + wg.Add(1) + count.AddWG(id) + + consume[id] <- produced{ user: user, logins: logs, } diff --git a/remover.go b/remover.go index 47bf476..43f732c 100644 --- a/remover.go +++ b/remover.go @@ -7,15 +7,14 @@ import ( "time" ) -func remover(remove chan consumed) { +func remover(id int) { var conn = dbs.rdb.Get() defer conn.Close() - for loop { - rem := <-remove + for { + rem := <-remove[id] - wg.Add(1) - defer wg.Done() + // wg.Add(1) start := time.Now() for i := range rem.logins { @@ -25,9 +24,12 @@ func remover(remove chan consumed) { } conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) conn.Flush() + count.AddRem(len(rem.logins)) if opts.Debug { - log.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start)) - fmt.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start)) + log.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start)) + fmt.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start)) } + wg.Done() + count.RemWG(id) } } diff --git a/sigterm.go b/sigterm.go index ea5b8c1..d5bd082 100644 --- a/sigterm.go +++ b/sigterm.go @@ -10,10 +10,12 @@ import ( ) func exit() { - log.Println("EXIT") - fmt.Println("EXIT") - loop = false - done <- true + for i := 0; i < opts.Concurrent; i++ { + log.Println("EXIT ", i) + fmt.Println("EXIT ", i) + loop[i] = false + done[i] <- true + } } func setTerm() {