From 36f6aa8a83033d70986d44a5891e3fd7bffc3a48 Mon Sep 17 00:00:00 2001 From: Michele Date: Tue, 20 Dec 2016 15:37:32 +0100 Subject: [PATCH] counter usa i 'chanel' --- consumer.go | 75 ++++++++++++++++++++++++++++++++++++++++++----------- counter.go | 63 ++++++++++++++++++++++++++++---------------- main.go | 6 ++++- producer.go | 10 +++++-- remover.go | 11 ++++++-- sigterm.go | 5 +++- 6 files changed, 127 insertions(+), 43 deletions(-) diff --git a/consumer.go b/consumer.go index 109812c..8059ba0 100644 --- a/consumer.go +++ b/consumer.go @@ -104,10 +104,16 @@ func consumer() { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Mongo Insert Err: %+v\n", err) cons.error = true - count.AddErr(1) + counter <- Counterchan{ + tipo: "err", + val: 1, + } continue } else { - count.AddDuplicate(1) + counter <- Counterchan{ + tipo: "dup", + val: 1, + } } } } @@ -117,10 +123,16 @@ func consumer() { if !strings.Contains(err.Error(), "Duplicate primary key") { fmt.Printf("RT Insert Err: %+v\n", err) cons.error = true - count.AddErr(1) + counter <- Counterchan{ + tipo: "err", + val: 1, + } continue } else { - count.AddDuplicate(1) + counter <- Counterchan{ + tipo: "dup", + val: 1, + } } } } @@ -145,24 +157,39 @@ func consumer() { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) cons.error = true - count.AddErr(1) + counter <- Counterchan{ + tipo: "err", + val: 1, + } continue } else { - count.AddDuplicate(1) + counter <- Counterchan{ + tipo: "dup", + val: 1, + } } } } if dbs.isRethink() { resp, err := dbs.rtdb.Insert(ml) - count.AddInsert(resp.Inserted) + counter <- Counterchan{ + tipo: "ins", + val: resp.Inserted, + } if err != nil { if !strings.Contains(err.Error(), "Duplicate primary key") { fmt.Printf("RT Insert Err: %+v\n", err) cons.error = true - count.AddErr(1) + counter <- Counterchan{ + tipo: "err", + val: 1, + } continue } else { - count.AddDuplicate(1) + counter <- Counterchan{ + tipo: "dup", + val: 1, + } } } } @@ -180,10 +207,16 @@ func consumer() { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) cons.error = true - count.AddErr(len(slogin[key])) + counter <- Counterchan{ + tipo: "err", + val: len(slogin[key]), + } continue } else { - count.AddDuplicate(strings.Count(err.Error(), "E11000")) + counter <- Counterchan{ + tipo: "dup", + val: strings.Count(err.Error(), "E11000"), + } } } cons.logins = append(cons.logins, slogin[key]...) @@ -191,21 +224,33 @@ func consumer() { } if dbs.isRethink() { resp, err := dbs.rtdb.MultiInsert(rtbulk) - count.AddInsert(resp.Inserted) + counter <- Counterchan{ + tipo: "ins", + val: resp.Inserted, + } if err != nil { if !strings.Contains(err.Error(), "Duplicate primary key") { cons.error = true - count.AddErr(resp.Errors) + counter <- Counterchan{ + tipo: "err", + val: 1, + } continue } else { - count.AddDuplicate(resp.Errors) + counter <- Counterchan{ + tipo: "dup", + val: 1, + } } } cons.logins = append(cons.logins, slogin["rt"]...) } } - count.AddLog(len(prod.logins)) + counter <- Counterchan{ + tipo: "log", + val: len(prod.logins), + } if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { cons.empty = false diff --git a/counter.go b/counter.go index 6f6c4b8..c7634d0 100644 --- a/counter.go +++ b/counter.go @@ -6,6 +6,11 @@ import ( "time" ) +type Counterchan struct { + tipo string + val int +} + // Counter structure type Counter struct { mu sync.Mutex @@ -33,57 +38,71 @@ func NewCounter() *Counter { } } +func (c *Counter) Run() { + for { + tocount := <-counter + + switch tocount.tipo { + case "user": + c.addUser() + case "dup": + c.addDuplicate(tocount.val) + case "ins": + c.addInsert(tocount.val) + case "log": + c.addLog(tocount.val) + case "rem": + c.addRem(tocount.val) + case "wg": + if tocount.val > 0 { + c.addWG() + } else { + c.delWG() + } + case "err": + c.addErr(tocount.val) + + } + } +} + // AddUser increment number of users managed -func (c *Counter) AddUser() { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addUser() { c.user++ } // AddDuplicate increment number of duplicates log -func (c *Counter) AddDuplicate(add int) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addDuplicate(add int) { c.dup += add } // AddInsert increment number of inserted rows -func (c *Counter) AddInsert(add int) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addInsert(add int) { c.insert += add } // AddLog increment number of log's rows managed -func (c *Counter) AddLog(add int) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addLog(add int) { c.log += add } //AddRem increment removed logs row -func (c *Counter) AddRem(add int) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addRem(add int) { c.rem += add } // AddWG ... -func (c *Counter) AddWG() { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addWG() { c.wg++ } // AddErr ... -func (c *Counter) AddErr(add int) { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Counter) addErr(add int) { c.err += add } // DelWG ... -func (c *Counter) DelWG() { +func (c *Counter) delWG() { c.mu.Lock() defer c.mu.Unlock() c.wg-- diff --git a/main.go b/main.go index 5a9c508..b24d2ee 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( ) const ( - _Version = "v3.0.0" + _Version = "v3.1.0" _Producer = 0 _Consumer = 1 _Remover = 2 @@ -24,6 +24,8 @@ var ( consume chan produced remove chan consumed + counter chan Counterchan + wg sync.WaitGroup status int @@ -92,7 +94,9 @@ func main() { remove = make(chan consumed) loop = true done = make(chan bool) + counter = make(chan Counterchan) + go count.Run() go producer() go consumer() go remover() diff --git a/producer.go b/producer.go index a7c3e6a..3425e40 100644 --- a/producer.go +++ b/producer.go @@ -60,9 +60,15 @@ func producer() { // log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) } - count.AddUser() + counter <- Counterchan{ + tipo: "user", + val: 1, + } wg.Add(1) - count.AddWG() + counter <- Counterchan{ + tipo: "wg", + val: 1, + } consume <- produced{ user: user, diff --git a/remover.go b/remover.go index 53de5e9..7eae5ed 100644 --- a/remover.go +++ b/remover.go @@ -37,11 +37,18 @@ func remover() { } conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) conn.Flush() - count.AddRem(len(rem.logins)) + counter <- Counterchan{ + tipo: "rem", + val: len(rem.logins), + } + if opts.Debug { fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start)) } wg.Done() - count.DelWG() + counter <- Counterchan{ + tipo: "wg", + val: -1, + } } } diff --git a/sigterm.go b/sigterm.go index c55b35b..79fc906 100644 --- a/sigterm.go +++ b/sigterm.go @@ -14,7 +14,10 @@ func kill() { log.Printf("KILL %d\n", status) fmt.Printf("KILL %d\n", status) wg.Done() - count.DelWG() + counter <- Counterchan{ + tipo: "wg", + val: -1, + } done <- true }