sistemato il timer di uscita.
prima di uscire attende che tutti i processi running abbiano finito.
This commit is contained in:
parent
699a3aaae4
commit
e258d8951c
6 changed files with 42 additions and 19 deletions
12
consumer.go
12
consumer.go
|
@ -17,13 +17,16 @@ type consumed struct {
|
|||
|
||||
func consumer(consume chan produced, remove chan consumed) {
|
||||
|
||||
var conn = dbs.rdb.Get()
|
||||
defer conn.Close()
|
||||
// var conn = dbs.rdb.Get()
|
||||
// defer conn.Close()
|
||||
|
||||
for {
|
||||
for loop {
|
||||
|
||||
prod := <-consume
|
||||
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
|
||||
cons := consumed{
|
||||
user: prod.user,
|
||||
logins: make([]string, 0),
|
||||
|
@ -93,8 +96,11 @@ func consumer(consume chan produced, remove chan consumed) {
|
|||
// }
|
||||
}
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount())
|
||||
log.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount())
|
||||
}
|
||||
|
||||
remove <- cons
|
||||
}
|
||||
|
|
14
main.go
14
main.go
|
@ -6,11 +6,12 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
_VERSION = "v2.2.0"
|
||||
_VERSION = "v2.2.1"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -20,12 +21,13 @@ var (
|
|||
}
|
||||
|
||||
loop = true
|
||||
ttl = time.Second * 55
|
||||
|
||||
count = 0
|
||||
errCount = 0
|
||||
|
||||
done = make(chan bool)
|
||||
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -62,11 +64,11 @@ func main() {
|
|||
dbs.connectMongo()
|
||||
defer dbs.mdb.Close()
|
||||
|
||||
if opts.Timeout {
|
||||
time.AfterFunc(ttl, exit)
|
||||
if opts.Timeout > 0 {
|
||||
time.AfterFunc(opts.Timeout, exit)
|
||||
}
|
||||
|
||||
for i := 0; i < opts.BufferSize; i++ {
|
||||
for i := 0; i < opts.Concurrent; i++ {
|
||||
consume := make(chan produced)
|
||||
remove := make(chan consumed)
|
||||
// defer close(consume)
|
||||
|
@ -79,6 +81,8 @@ func main() {
|
|||
|
||||
<-done
|
||||
|
||||
wg.Wait()
|
||||
|
||||
fmt.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount())
|
||||
log.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount())
|
||||
}
|
||||
|
|
12
options.go
12
options.go
|
@ -16,14 +16,14 @@ type Options struct {
|
|||
CurrentPath string
|
||||
Exe string
|
||||
LogFile string
|
||||
Timeout bool
|
||||
Timeout time.Duration
|
||||
Debug bool
|
||||
Version bool
|
||||
BufferSize int
|
||||
Concurrent int
|
||||
}
|
||||
|
||||
func usage() {
|
||||
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -b <buffer size> -T -D -v\n")
|
||||
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -b <concurrent thread> -T <running ttl> -D -v\n")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
|
@ -41,9 +41,9 @@ func init() {
|
|||
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
|
||||
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
|
||||
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
|
||||
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
|
||||
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
|
||||
flag.BoolVar(&opts.Version, "v", false, "Version")
|
||||
flag.BoolVar(&opts.Timeout, "T", false, "Timeout")
|
||||
flag.DurationVar(&opts.Timeout, "T", 0, "Timeout")
|
||||
flag.BoolVar(&opts.Debug, "D", false, "Debug")
|
||||
flag.IntVar(&opts.BufferSize, "b", 1, "Buffer size")
|
||||
flag.IntVar(&opts.Concurrent, "c", 1, "Concurrent thread")
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ func producer(consume chan produced, done chan bool) {
|
|||
conn := dbs.rdb.Get()
|
||||
defer conn.Close()
|
||||
|
||||
for {
|
||||
for loop {
|
||||
|
||||
start := time.Now()
|
||||
// estrae un userid dalla lista degli utenti che hanno fatto login
|
||||
|
@ -32,7 +32,9 @@ func producer(consume chan produced, done chan bool) {
|
|||
log.Printf("LLINDEX empty: %v\n\r", err)
|
||||
fmt.Printf("LLINDEX empty: %v\n\r", err)
|
||||
}
|
||||
loop = false
|
||||
done <- true
|
||||
break
|
||||
}
|
||||
|
||||
// estrae tutti i login dell'utente "user"
|
||||
|
@ -46,7 +48,9 @@ func producer(consume chan produced, done chan bool) {
|
|||
log.Printf("LRANGE: %s - %d\n\r", user, len(logs))
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("PROD: user=%s in %v - conn=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount())
|
||||
}
|
||||
|
||||
consume <- produced{
|
||||
user: user,
|
||||
|
|
|
@ -11,15 +11,19 @@ func remover(remove chan consumed) {
|
|||
var conn = dbs.rdb.Get()
|
||||
defer conn.Close()
|
||||
|
||||
for {
|
||||
for loop {
|
||||
rem := <-remove
|
||||
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
|
||||
start := time.Now()
|
||||
for i := range rem.logins {
|
||||
login := rem.logins[i]
|
||||
// cancella da Redis la riga di login inserita
|
||||
conn.Send("lrem", rem.user, "0", login)
|
||||
}
|
||||
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
|
||||
conn.Flush()
|
||||
if opts.Debug {
|
||||
log.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start))
|
||||
|
|
|
@ -2,12 +2,17 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func exit() {
|
||||
log.Println("EXIT")
|
||||
fmt.Println("EXIT")
|
||||
loop = false
|
||||
done <- true
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue