usa channels diversi per ogni thread

This commit is contained in:
Miki 2015-11-23 17:39:17 +01:00
parent 19d6b22de6
commit a84acd14ca
5 changed files with 74 additions and 53 deletions

View file

@ -15,17 +15,17 @@ type consumed struct {
logins []string logins []string
} }
func consumer(consume chan produced, remove chan consumed) { func consumer(id int) {
// var conn = dbs.rdb.Get() // var conn = dbs.rdb.Get()
// defer conn.Close() // defer conn.Close()
for loop { for {
prod := <-consume prod := <-consume[id]
wg.Add(1) // wg.Add(1)
defer wg.Done() // defer wg.Done()
cons := consumed{ cons := consumed{
user: prod.user, user: prod.user,
@ -75,13 +75,13 @@ func consumer(consume chan produced, remove chan consumed) {
Date: time.Unix(date, 0), Date: time.Unix(date, 0),
} }
// inserisce il login su Mongodb // inserisce il login su Mongodb
count++
_, err = dbs.ll.Upsert(ind, ml) _, err = dbs.ll.Upsert(ind, ml)
if err != nil { if err != nil {
log.Printf("Insert error: %+v\n", err) log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
if !strings.Contains(err.Error(), "E11000") { if !strings.Contains(err.Error(), "E11000") {
errCount += 1 count.AddErr()
//return //return
continue continue
} }
@ -97,11 +97,13 @@ func consumer(consume chan produced, remove chan consumed) {
} }
} }
count.AddLog(len(prod.logins))
if opts.Debug { if opts.Debug {
fmt.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount()) fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n\r", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
log.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount()) log.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n\r", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
} }
remove <- cons remove[id] <- cons
} }
} }

46
main.go
View file

@ -11,7 +11,7 @@ import (
) )
const ( const (
_VERSION = "v2.2.1" _VERSION = "v2.2.2"
) )
var ( var (
@ -20,14 +20,15 @@ var (
LogFile: "log/llmongo.log", LogFile: "log/llmongo.log",
} }
loop = true loop []bool
count = 0 done []chan bool
errCount = 0 consume []chan produced
remove []chan consumed
done = make(chan bool)
wg sync.WaitGroup wg sync.WaitGroup
count *Counter
) )
func main() { func main() {
@ -38,7 +39,6 @@ func main() {
fmt.Println(os.Args[0], _VERSION) fmt.Println(os.Args[0], _VERSION)
os.Exit(0) os.Exit(0)
} }
defer close(done)
setTerm() setTerm()
@ -68,21 +68,31 @@ func main() {
time.AfterFunc(opts.Timeout, exit) time.AfterFunc(opts.Timeout, exit)
} }
for i := 0; i < opts.Concurrent; i++ { count = NewCounter()
consume := make(chan produced)
remove := make(chan consumed)
// defer close(consume)
// defer close(remove)
go producer(consume, done) for i := 0; i < opts.Concurrent; i++ {
go consumer(consume, remove) consume = append(consume, make(chan produced))
go remover(remove) remove = append(remove, make(chan consumed))
loop = append(loop, true)
done = append(done, make(chan bool))
go producer(i)
go consumer(i)
go remover(i)
} }
<-done for i := 0; i < opts.Concurrent; i++ {
<-done[i]
fmt.Printf("Done %d\n", i)
close(done[i])
}
fmt.Println("Waiting WG")
for i := 0; i < opts.Concurrent; i++ {
fmt.Printf("ID (%d): %d\n", i, count.ValWG(i))
}
wg.Wait() wg.Wait()
fmt.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount()) fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - conn: %d\n\r", time.Since(start), count.ValUser(), count.ValLog(), count.ValErr(), count.ValRem(), dbs.rdb.ActiveCount())
log.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount()) log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - conn: %d\n\r", time.Since(start), count.ValUser(), count.ValLog(), count.ValErr(), count.ValRem(), dbs.rdb.ActiveCount())
} }

View file

@ -13,27 +13,27 @@ type produced struct {
logins []string logins []string
} }
func producer(consume chan produced, done chan bool) { func producer(id int) {
conn := dbs.rdb.Get() conn := dbs.rdb.Get()
defer conn.Close() defer conn.Close()
for loop { for loop[id] {
start := time.Now() start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login // estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex")) user, err := redis.String(conn.Do("spop", "llindex"))
if opts.Debug { // if opts.Debug {
log.Printf("SPOP: %+v - %+v\n\r", user, err) // log.Printf("SPOP: %+v - %+v\n\r", user, err)
fmt.Printf("SPOP: %+v - %+v\n\r", user, err) // fmt.Printf("SPOP: %+v - %+v\n\r", user, err)
} // }
// se non ci sono piu' userid esce // se non ci sono piu' userid esce
if err != nil { if err != nil {
if opts.Debug { if opts.Debug {
log.Printf("LLINDEX empty: %v\n\r", err) log.Printf("LLINDEX empty: %v\n\r", err)
fmt.Printf("LLINDEX empty: %v\n\r", err) fmt.Printf("LLINDEX empty: %v\n\r", err)
} }
loop = false //loop[id] = false
done <- true done[id] <- true
break break
} }
@ -43,16 +43,21 @@ func producer(consume chan produced, done chan bool) {
fmt.Printf("LRANGE: %+v - %+v\n\r", err, logs) fmt.Printf("LRANGE: %+v - %+v\n\r", err, logs)
log.Printf("LRANGE: %+v - %+v\n\r", err, logs) log.Printf("LRANGE: %+v - %+v\n\r", err, logs)
} }
if opts.Debug { // if opts.Debug {
fmt.Printf("LRANGE: %s - %d\n\r", user, len(logs)) // fmt.Printf("LRANGE: %s - %d\n\r", user, len(logs))
log.Printf("LRANGE: %s - %d\n\r", user, len(logs)) // log.Printf("LRANGE: %s - %d\n\r", user, len(logs))
} // }
if opts.Debug { if opts.Debug {
fmt.Printf("PROD: user=%s in %v - conn=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount()) fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n\r", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n\r", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
} }
consume <- produced{ count.AddUser()
wg.Add(1)
count.AddWG(id)
consume[id] <- produced{
user: user, user: user,
logins: logs, logins: logs,
} }

View file

@ -7,15 +7,14 @@ import (
"time" "time"
) )
func remover(remove chan consumed) { func remover(id int) {
var conn = dbs.rdb.Get() var conn = dbs.rdb.Get()
defer conn.Close() defer conn.Close()
for loop { for {
rem := <-remove rem := <-remove[id]
wg.Add(1) // wg.Add(1)
defer wg.Done()
start := time.Now() start := time.Now()
for i := range rem.logins { for i := range rem.logins {
@ -25,9 +24,12 @@ func remover(remove chan consumed) {
} }
conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush() conn.Flush()
count.AddRem(len(rem.logins))
if opts.Debug { if opts.Debug {
log.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start)) log.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start))
fmt.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start)) fmt.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start))
} }
wg.Done()
count.RemWG(id)
} }
} }

View file

@ -10,10 +10,12 @@ import (
) )
func exit() { func exit() {
log.Println("EXIT") for i := 0; i < opts.Concurrent; i++ {
fmt.Println("EXIT") log.Println("EXIT ", i)
loop = false fmt.Println("EXIT ", i)
done <- true loop[i] = false
done[i] <- true
}
} }
func setTerm() { func setTerm() {