manda a influxdb i tempi medi,max e min di ogni worker ( producer, consumer, remover)

This commit is contained in:
Miki 2016-06-01 16:44:09 +02:00
parent 0d018caa98
commit 467b9d93ac
6 changed files with 131 additions and 24 deletions

View file

@ -72,7 +72,7 @@ func consumer(id int) {
}
if opts.Month != ml.Date.Format("0601") {
lt := dbs.mdb.DB("lastlogin").C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")))
lt := dbs.mdb.DB(dbs.Database).C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")))
err = lt.Insert(ml)
if err != nil {
log.Printf("Insert error: %+v - %s - %s\n", err, cons.user, lt.FullName)
@ -103,7 +103,7 @@ func consumer(id int) {
count.AddLog(len(prod.logins))
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
cons.empty = true
cons.empty = false
}
if opts.Debug {
@ -111,5 +111,7 @@ func consumer(id int) {
}
remove[id] <- cons
count.AddConsumerTime(time.Since(start))
}
}

View file

@ -7,25 +7,39 @@ import (
)
type Counter struct {
mu sync.Mutex
user int
log int
rem int
err int
dup int
time time.Duration
wg []int
mu sync.Mutex
user int
log int
rem int
err int
dup int
time time.Duration
wg []int
tconsumer float64
tremover float64
tproducer float64
maxconsumer float64
maxremover float64
minconsumer float64
minremover float64
}
func NewCounter() *Counter {
return &Counter{
user: 0,
log: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: make([]int, opts.Concurrent),
user: 0,
log: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
maxconsumer: 0,
maxremover: 0,
minconsumer: 9999999999,
minremover: 9999999999,
tconsumer: 0,
tremover: 0,
tproducer: 0,
wg: make([]int, opts.Concurrent),
}
}
@ -65,6 +79,36 @@ func (c *Counter) AddErr() {
c.err++
}
func (c *Counter) AddProducerTime(time time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.tproducer = (c.tproducer + time.Seconds()) / 2.0
}
func (c *Counter) AddConsumerTime(time time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.tconsumer = (c.tconsumer + time.Seconds()) / 2.0
if c.maxconsumer < time.Seconds() {
c.maxconsumer = time.Seconds()
}
if c.minconsumer > time.Seconds() {
c.minconsumer = time.Seconds()
}
}
func (c *Counter) AddRemoverTime(time time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.tremover = (c.tremover + time.Seconds()) / 2.0
if c.maxremover < time.Seconds() {
c.maxremover = time.Seconds()
}
if c.minremover > time.Seconds() {
c.minremover = time.Seconds()
}
}
func (c *Counter) DelWG(id int) {
c.mu.Lock()
defer c.mu.Unlock()
@ -120,6 +164,55 @@ func (c *Counter) GetTime() (ret float64) {
return
}
func (c *Counter) GetConsumerTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.tconsumer
return
}
func (c *Counter) GetRemoverTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.tremover
return
}
func (c *Counter) GetProducerTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.tproducer
return
}
func (c *Counter) GetMaxConsumer() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.maxconsumer
return
}
func (c *Counter) GetMinConsumer() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.minconsumer
return
}
func (c *Counter) GetMaxRemover() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.maxremover
return
}
func (c *Counter) GetMinRemover() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.minremover
return
}
func (c *Counter) SetTime(t time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

View file

@ -34,12 +34,19 @@ func writeStats(start time.Time) {
tags := map[string]string{"server": opts.Hostname, "domain": dbs.Database}
fields := map[string]interface{}{
"user": count.GetUser(),
"log": count.GetLog(),
"err": count.GetErr(),
"rem": count.GetRem(),
"dup": count.GetDup(),
"stop": count.GetTime(),
"user": count.GetUser(),
"log": count.GetLog(),
"err": count.GetErr(),
"rem": count.GetRem(),
"dup": count.GetDup(),
"stop": count.GetTime(),
"consumer": count.GetConsumerTime(),
"producer": count.GetProducerTime(),
"remover": count.GetRemoverTime(),
"max_consumer": count.GetMaxConsumer(),
"min_consumer": count.GetMinConsumer(),
"max_remover": count.GetMaxRemover(),
"min_remover": count.GetMinRemover(),
}
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
if err != nil {

View file

@ -11,7 +11,7 @@ import (
)
const (
_VERSION = "v2.8.0"
_VERSION = "v2.8.1"
)
var (

View file

@ -65,6 +65,8 @@ func producer(id int) {
user: user,
logins: logs,
}
count.AddProducerTime(time.Since(start))
}
done[id] <- true

View file

@ -17,6 +17,7 @@ func remover(id int) {
// wg.Add(1)
start := time.Now()
for i := range rem.logins {
login := rem.logins[i]
// cancella da Redis la riga di login inserita partendo da 1
@ -41,5 +42,7 @@ func remover(id int) {
}
wg.Done()
count.DelWG(id)
count.AddRemoverTime(time.Since(start))
}
}