counter usa i 'chanel'

This commit is contained in:
Michele 2016-12-20 15:37:32 +01:00
parent a3a02eac55
commit 36f6aa8a83
6 changed files with 127 additions and 43 deletions

View file

@ -104,10 +104,16 @@ func consumer() {
if !strings.Contains(err.Error(), "E11000") { if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Mongo Insert Err: %+v\n", err) fmt.Printf("Mongo Insert Err: %+v\n", err)
cons.error = true cons.error = true
count.AddErr(1) counter <- Counterchan{
tipo: "err",
val: 1,
}
continue continue
} else { } else {
count.AddDuplicate(1) counter <- Counterchan{
tipo: "dup",
val: 1,
}
} }
} }
} }
@ -117,10 +123,16 @@ func consumer() {
if !strings.Contains(err.Error(), "Duplicate primary key") { if !strings.Contains(err.Error(), "Duplicate primary key") {
fmt.Printf("RT Insert Err: %+v\n", err) fmt.Printf("RT Insert Err: %+v\n", err)
cons.error = true cons.error = true
count.AddErr(1) counter <- Counterchan{
tipo: "err",
val: 1,
}
continue continue
} else { } else {
count.AddDuplicate(1) counter <- Counterchan{
tipo: "dup",
val: 1,
}
} }
} }
} }
@ -145,24 +157,39 @@ func consumer() {
if !strings.Contains(err.Error(), "E11000") { if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err) fmt.Printf("Err: %+v\n", err)
cons.error = true cons.error = true
count.AddErr(1) counter <- Counterchan{
tipo: "err",
val: 1,
}
continue continue
} else { } else {
count.AddDuplicate(1) counter <- Counterchan{
tipo: "dup",
val: 1,
}
} }
} }
} }
if dbs.isRethink() { if dbs.isRethink() {
resp, err := dbs.rtdb.Insert(ml) resp, err := dbs.rtdb.Insert(ml)
count.AddInsert(resp.Inserted) counter <- Counterchan{
tipo: "ins",
val: resp.Inserted,
}
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") { if !strings.Contains(err.Error(), "Duplicate primary key") {
fmt.Printf("RT Insert Err: %+v\n", err) fmt.Printf("RT Insert Err: %+v\n", err)
cons.error = true cons.error = true
count.AddErr(1) counter <- Counterchan{
tipo: "err",
val: 1,
}
continue continue
} else { } else {
count.AddDuplicate(1) counter <- Counterchan{
tipo: "dup",
val: 1,
}
} }
} }
} }
@ -180,10 +207,16 @@ func consumer() {
if !strings.Contains(err.Error(), "E11000") { if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err) fmt.Printf("Err: %+v\n", err)
cons.error = true cons.error = true
count.AddErr(len(slogin[key])) counter <- Counterchan{
tipo: "err",
val: len(slogin[key]),
}
continue continue
} else { } else {
count.AddDuplicate(strings.Count(err.Error(), "E11000")) counter <- Counterchan{
tipo: "dup",
val: strings.Count(err.Error(), "E11000"),
}
} }
} }
cons.logins = append(cons.logins, slogin[key]...) cons.logins = append(cons.logins, slogin[key]...)
@ -191,21 +224,33 @@ func consumer() {
} }
if dbs.isRethink() { if dbs.isRethink() {
resp, err := dbs.rtdb.MultiInsert(rtbulk) resp, err := dbs.rtdb.MultiInsert(rtbulk)
count.AddInsert(resp.Inserted) counter <- Counterchan{
tipo: "ins",
val: resp.Inserted,
}
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") { if !strings.Contains(err.Error(), "Duplicate primary key") {
cons.error = true cons.error = true
count.AddErr(resp.Errors) counter <- Counterchan{
tipo: "err",
val: 1,
}
continue continue
} else { } else {
count.AddDuplicate(resp.Errors) counter <- Counterchan{
tipo: "dup",
val: 1,
}
} }
} }
cons.logins = append(cons.logins, slogin["rt"]...) cons.logins = append(cons.logins, slogin["rt"]...)
} }
} }
count.AddLog(len(prod.logins)) counter <- Counterchan{
tipo: "log",
val: len(prod.logins),
}
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
cons.empty = false cons.empty = false

View file

@ -6,6 +6,11 @@ import (
"time" "time"
) )
type Counterchan struct {
tipo string
val int
}
// Counter structure // Counter structure
type Counter struct { type Counter struct {
mu sync.Mutex mu sync.Mutex
@ -33,57 +38,71 @@ func NewCounter() *Counter {
} }
} }
func (c *Counter) Run() {
for {
tocount := <-counter
switch tocount.tipo {
case "user":
c.addUser()
case "dup":
c.addDuplicate(tocount.val)
case "ins":
c.addInsert(tocount.val)
case "log":
c.addLog(tocount.val)
case "rem":
c.addRem(tocount.val)
case "wg":
if tocount.val > 0 {
c.addWG()
} else {
c.delWG()
}
case "err":
c.addErr(tocount.val)
}
}
}
// AddUser increment number of users managed // AddUser increment number of users managed
func (c *Counter) AddUser() { func (c *Counter) addUser() {
c.mu.Lock()
defer c.mu.Unlock()
c.user++ c.user++
} }
// AddDuplicate increment number of duplicates log // AddDuplicate increment number of duplicates log
func (c *Counter) AddDuplicate(add int) { func (c *Counter) addDuplicate(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.dup += add c.dup += add
} }
// AddInsert increment number of inserted rows // AddInsert increment number of inserted rows
func (c *Counter) AddInsert(add int) { func (c *Counter) addInsert(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.insert += add c.insert += add
} }
// AddLog increment number of log's rows managed // AddLog increment number of log's rows managed
func (c *Counter) AddLog(add int) { func (c *Counter) addLog(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.log += add c.log += add
} }
//AddRem increment removed logs row //AddRem increment removed logs row
func (c *Counter) AddRem(add int) { func (c *Counter) addRem(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.rem += add c.rem += add
} }
// AddWG ... // AddWG ...
func (c *Counter) AddWG() { func (c *Counter) addWG() {
c.mu.Lock()
defer c.mu.Unlock()
c.wg++ c.wg++
} }
// AddErr ... // AddErr ...
func (c *Counter) AddErr(add int) { func (c *Counter) addErr(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.err += add c.err += add
} }
// DelWG ... // DelWG ...
func (c *Counter) DelWG() { func (c *Counter) delWG() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.wg-- c.wg--

View file

@ -11,7 +11,7 @@ import (
) )
const ( const (
_Version = "v3.0.0" _Version = "v3.1.0"
_Producer = 0 _Producer = 0
_Consumer = 1 _Consumer = 1
_Remover = 2 _Remover = 2
@ -24,6 +24,8 @@ var (
consume chan produced consume chan produced
remove chan consumed remove chan consumed
counter chan Counterchan
wg sync.WaitGroup wg sync.WaitGroup
status int status int
@ -92,7 +94,9 @@ func main() {
remove = make(chan consumed) remove = make(chan consumed)
loop = true loop = true
done = make(chan bool) done = make(chan bool)
counter = make(chan Counterchan)
go count.Run()
go producer() go producer()
go consumer() go consumer()
go remover() go remover()

View file

@ -60,9 +60,15 @@ func producer() {
// log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) // log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
} }
count.AddUser() counter <- Counterchan{
tipo: "user",
val: 1,
}
wg.Add(1) wg.Add(1)
count.AddWG() counter <- Counterchan{
tipo: "wg",
val: 1,
}
consume <- produced{ consume <- produced{
user: user, user: user,

View file

@ -37,11 +37,18 @@ func remover() {
} }
conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush() conn.Flush()
count.AddRem(len(rem.logins)) counter <- Counterchan{
tipo: "rem",
val: len(rem.logins),
}
if opts.Debug { if opts.Debug {
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start)) fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
} }
wg.Done() wg.Done()
count.DelWG() counter <- Counterchan{
tipo: "wg",
val: -1,
}
} }
} }

View file

@ -14,7 +14,10 @@ func kill() {
log.Printf("KILL %d\n", status) log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status) fmt.Printf("KILL %d\n", status)
wg.Done() wg.Done()
count.DelWG() counter <- Counterchan{
tipo: "wg",
val: -1,
}
done <- true done <- true
} }