From 467b9d93ac08bc208a7e08c012f57ef3a80f73f4 Mon Sep 17 00:00:00 2001 From: Miki Date: Wed, 1 Jun 2016 16:44:09 +0200 Subject: [PATCH 1/3] manda a influxdb i tempi medi,max e min di ogni worker ( producer, consumer, remover) --- consumer.go | 6 ++- counter.go | 123 +++++++++++++++++++++++++++++++++++++++++++++------- influxdb.go | 19 +++++--- main.go | 2 +- producer.go | 2 + remover.go | 3 ++ 6 files changed, 131 insertions(+), 24 deletions(-) diff --git a/consumer.go b/consumer.go index ebe2c92..ce173eb 100644 --- a/consumer.go +++ b/consumer.go @@ -72,7 +72,7 @@ func consumer(id int) { } if opts.Month != ml.Date.Format("0601") { - lt := dbs.mdb.DB("lastlogin").C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))) + lt := dbs.mdb.DB(dbs.Database).C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))) err = lt.Insert(ml) if err != nil { log.Printf("Insert error: %+v - %s - %s\n", err, cons.user, lt.FullName) @@ -103,7 +103,7 @@ func consumer(id int) { count.AddLog(len(prod.logins)) if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { - cons.empty = true + cons.empty = false } if opts.Debug { @@ -111,5 +111,7 @@ func consumer(id int) { } remove[id] <- cons + + count.AddConsumerTime(time.Since(start)) } } diff --git a/counter.go b/counter.go index 92f4ebc..9c4db6f 100644 --- a/counter.go +++ b/counter.go @@ -7,25 +7,39 @@ import ( ) type Counter struct { - mu sync.Mutex - user int - log int - rem int - err int - dup int - time time.Duration - wg []int + mu sync.Mutex + user int + log int + rem int + err int + dup int + time time.Duration + wg []int + tconsumer float64 + tremover float64 + tproducer float64 + maxconsumer float64 + maxremover float64 + minconsumer float64 + minremover float64 } func NewCounter() *Counter { return &Counter{ - user: 0, - log: 0, - err: 0, - rem: 0, - dup: 0, - time: 0, - wg: make([]int, opts.Concurrent), + user: 0, + log: 0, + err: 0, + rem: 0, + dup: 0, + time: 0, + maxconsumer: 0, + maxremover: 0, + minconsumer: 9999999999, + minremover: 9999999999, + tconsumer: 0, + tremover: 0, + tproducer: 0, + wg: make([]int, opts.Concurrent), } } @@ -65,6 +79,36 @@ func (c *Counter) AddErr() { c.err++ } +func (c *Counter) AddProducerTime(time time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.tproducer = (c.tproducer + time.Seconds()) / 2.0 +} + +func (c *Counter) AddConsumerTime(time time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.tconsumer = (c.tconsumer + time.Seconds()) / 2.0 + if c.maxconsumer < time.Seconds() { + c.maxconsumer = time.Seconds() + } + if c.minconsumer > time.Seconds() { + c.minconsumer = time.Seconds() + } +} + +func (c *Counter) AddRemoverTime(time time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.tremover = (c.tremover + time.Seconds()) / 2.0 + if c.maxremover < time.Seconds() { + c.maxremover = time.Seconds() + } + if c.minremover > time.Seconds() { + c.minremover = time.Seconds() + } +} + func (c *Counter) DelWG(id int) { c.mu.Lock() defer c.mu.Unlock() @@ -120,6 +164,55 @@ func (c *Counter) GetTime() (ret float64) { return } +func (c *Counter) GetConsumerTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.tconsumer + return +} + +func (c *Counter) GetRemoverTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.tremover + return +} + +func (c *Counter) GetProducerTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.tproducer + return +} + +func (c *Counter) GetMaxConsumer() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.maxconsumer + return +} + +func (c *Counter) GetMinConsumer() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.minconsumer + return +} + +func (c *Counter) GetMaxRemover() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.maxremover + return +} + +func (c *Counter) GetMinRemover() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.minremover + return +} + func (c *Counter) SetTime(t time.Duration) { c.mu.Lock() defer c.mu.Unlock() diff --git a/influxdb.go b/influxdb.go index a93b7ea..4229a1d 100644 --- a/influxdb.go +++ b/influxdb.go @@ -34,12 +34,19 @@ func writeStats(start time.Time) { tags := map[string]string{"server": opts.Hostname, "domain": dbs.Database} fields := map[string]interface{}{ - "user": count.GetUser(), - "log": count.GetLog(), - "err": count.GetErr(), - "rem": count.GetRem(), - "dup": count.GetDup(), - "stop": count.GetTime(), + "user": count.GetUser(), + "log": count.GetLog(), + "err": count.GetErr(), + "rem": count.GetRem(), + "dup": count.GetDup(), + "stop": count.GetTime(), + "consumer": count.GetConsumerTime(), + "producer": count.GetProducerTime(), + "remover": count.GetRemoverTime(), + "max_consumer": count.GetMaxConsumer(), + "min_consumer": count.GetMinConsumer(), + "max_remover": count.GetMaxRemover(), + "min_remover": count.GetMinRemover(), } pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start) if err != nil { diff --git a/main.go b/main.go index be29142..8f90f9e 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( ) const ( - _VERSION = "v2.8.0" + _VERSION = "v2.8.1" ) var ( diff --git a/producer.go b/producer.go index 4d173e1..5fac3b4 100644 --- a/producer.go +++ b/producer.go @@ -65,6 +65,8 @@ func producer(id int) { user: user, logins: logs, } + + count.AddProducerTime(time.Since(start)) } done[id] <- true diff --git a/remover.go b/remover.go index dfc9e4e..44e927b 100644 --- a/remover.go +++ b/remover.go @@ -17,6 +17,7 @@ func remover(id int) { // wg.Add(1) start := time.Now() + for i := range rem.logins { login := rem.logins[i] // cancella da Redis la riga di login inserita partendo da 1 @@ -41,5 +42,7 @@ func remover(id int) { } wg.Done() count.DelWG(id) + + count.AddRemoverTime(time.Since(start)) } } From 86c835f02bec3d65dcb9f945f37bb94be7ae7729 Mon Sep 17 00:00:00 2001 From: Miki Date: Wed, 1 Jun 2016 16:59:43 +0200 Subject: [PATCH 2/3] add debug output --- main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/main.go b/main.go index 8f90f9e..244e308 100644 --- a/main.go +++ b/main.go @@ -104,6 +104,10 @@ func main() { fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup()) log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup()) + if opts.Debug { + fmt.Printf("Time: \n\tConsumer: max: %.5f - min: %.5f - med: %.5f\n\tRemover: max: %.5f - min: %.5f - med: %.5f\n\tProducer: %.5f\n", count.GetMaxConsumer(), count.GetMinConsumer(), count.GetConsumerTime(), count.GetMaxRemover(), count.GetMinRemover(), count.GetRemoverTime(), count.GetProducerTime()) + } + if opts.Influxdb != "" { writeStats(start) } From 947b4dec64bc9c27921eb0a7afad49cd0f508476 Mon Sep 17 00:00:00 2001 From: Miki Date: Mon, 6 Jun 2016 10:21:47 +0200 Subject: [PATCH 3/3] . --- consumer.go | 6 +++--- main.go | 2 +- producer.go | 4 ---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/consumer.go b/consumer.go index ce173eb..8369227 100644 --- a/consumer.go +++ b/consumer.go @@ -37,7 +37,7 @@ func consumer(id int) { user: prod.user, logins: make([]string, 0), error: false, - empty: true, + empty: false, } start := time.Now() @@ -102,8 +102,8 @@ func consumer(id int) { count.AddLog(len(prod.logins)) - if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { - cons.empty = false + if opts.MaxLogins == -1 || len(prod.logins) < opts.MaxLogins { + cons.empty = true } if opts.Debug { diff --git a/main.go b/main.go index 244e308..ea1c871 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( ) const ( - _VERSION = "v2.8.1" + _VERSION = "v2.8.2" ) var ( diff --git a/producer.go b/producer.go index 5fac3b4..f738018 100644 --- a/producer.go +++ b/producer.go @@ -47,10 +47,6 @@ func producer(id int) { } log.Printf("LRANGE: %+v - %+v\n", err, logs) } - // if opts.Debug { - // fmt.Printf("LRANGE: %s - %d\n", user, len(logs)) - // log.Printf("LRANGE: %s - %d\n", user, len(logs)) - // } if opts.Debug { fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())