diff --git a/consumer.go b/consumer.go index 3eea7b5..d56619c 100644 --- a/consumer.go +++ b/consumer.go @@ -3,104 +3,99 @@ package main import ( "fmt" - "github.com/garyburd/redigo/redis" + // "github.com/garyburd/redigo/redis" "log" "strconv" "strings" "time" ) -func consumer(user string) { - var date int64 - var lastval string +type consumed struct { + user string + logins []string +} + +func consumer(consume chan produced, remove chan consumed) { + var conn = dbs.rdb.Get() defer conn.Close() - start := time.Now() for { - // Estrae l'ultimo login dell'utente 'user' - val, err := redis.String(conn.Do("LINDEX", user, "-1")) - if err != nil { - if opts.Debug { - log.Printf("LINDEX error: %+v - %s\n\r", err, val) - fmt.Printf("LINDEX error: %+v - %s\n\r", err, val) - } - if strings.Contains(err.Error(), "nil returned") { - // se ha trovato user e righe di login - if lastval != "" { - // reinserisce l'ultimo login e imposta il ttl su Redis - retval, errr := conn.Do("lpush", user, lastval) - ttl, errt := conn.Do("expire", user, int(opts.RedisTTL.Seconds())) - if opts.Debug { - log.Printf("LPUSH retval: %+v %+v %+v %+v %s %s %d\n\r", retval, errr, ttl, errt, user, lastval, int(opts.RedisTTL.Seconds())) - fmt.Printf("LPUSH retval: %+v %+v %+v %+v %s %s %d\n\r", retval, errr, ttl, errt, user, lastval, int(opts.RedisTTL.Seconds())) - } - } - } else { - if opts.Debug { - fmt.Printf("SADD error: %+v - %s\n\r", err, val) - } - val, err := conn.Do("sadd", "llindex", user) - if err != nil { - log.Printf("SADD error: %+v - %s\n\r", err, val) - } - } - break + + prod := <-consume + + cons := consumed{ + user: prod.user, + logins: make([]string, 0), } - // se la riga di login e' vuota - if val == "" { - log.Println("Login empty: ", user) - retval, _ := conn.Do("lrem", user, "-1", val) - log.Println("LREM retval: ", user, val, retval) - //return - continue - } - sval := strings.Split(val, ":") - // se il formato della riga di login non e' corretto - if sval[1] == "" { - log.Println("Login format error: ", val, user) - retval, _ := conn.Do("lrem", user, "-1", val) - log.Println("LREM retval: ", user, val, retval) - //return - continue - } - // se il timestamp della riga di login non e' corretto - date, err = strconv.ParseInt(sval[1], 10, 64) - if err != nil { - log.Printf("Date Error: %+v - %s\n", err, user) - //return - continue - } - ml := MongoLogin{ - User: user, - Protocol: sval[0], - Ip: sval[2], - Date: time.Unix(date, 0), - } - ind := Index{ - User: user, - Date: time.Unix(date, 0), - } - // inserisce il login su Mongodb - count++ - _, err = dbs.ll.Upsert(ind, ml) - if err != nil { - log.Printf("Insert error: %+v\n", err) - // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente - if !strings.Contains(err.Error(), "E11000") { - errCount += 1 - //return + + start := time.Now() + for i := range prod.logins { + login := prod.logins[i] + // se la riga di login e' vuota + if login == "" { + log.Println("Login empty: ", prod.user) + cons.logins = append(cons.logins, login) + // retval, _ := conn.Do("lrem", user, "0", login) + // log.Println("LREM retval: ", user, login, retval) + // return continue } + sval := strings.Split(login, ":") + // se il formato della riga di login non e' corretto + if sval[1] == "" { + log.Println("Login format error: ", login, prod.user) + cons.logins = append(cons.logins, login) + // retval, _ := conn.Do("lrem", user, "0", login) + // log.Println("LREM retval: ", user, login, retval) + // return + continue + } + // se il timestamp della riga di login non e' corretto + date, err := strconv.ParseInt(sval[1], 10, 64) + if err != nil { + log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) + cons.logins = append(cons.logins, login) + // retval, _ := conn.Do("lrem", user, "0", login) + // log.Println("LREM retval: ", user, login, retval) + // return + continue + } + ml := MongoLogin{ + User: prod.user, + Protocol: sval[0], + Ip: sval[2], + Date: time.Unix(date, 0), + } + ind := Index{ + User: prod.user, + Date: time.Unix(date, 0), + } + // inserisce il login su Mongodb + count++ + _, err = dbs.ll.Upsert(ind, ml) + if err != nil { + log.Printf("Insert error: %+v\n", err) + // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente + if !strings.Contains(err.Error(), "E11000") { + errCount += 1 + //return + continue + } + } + if i < (len(prod.logins) - 1) { + cons.logins = append(cons.logins, login) + // cancella da Redis la riga di login inserita + // retval, _ := conn.Do("lrem", user, "0", login) + // if opts.Debug { + // log.Println("LREM retval: ", retval, user, login) + // fmt.Println("LREM retval: ", retval, user, login) + // } + } } - // cancella da Redis la riga di login inserita - retval, err := conn.Do("lrem", user, "-1", val) - if opts.Debug { - log.Println("LREM retval: ", retval, user, val) - fmt.Println("LREM retval: ", retval, user, val) - } - lastval = val + fmt.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount()) + log.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount()) + + remove <- cons } - fmt.Printf("CONS: user=%s in %v - active=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount()) - log.Printf("CONS: user=%s in %v - active=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount()) } diff --git a/main.go b/main.go index bbf0a7a..17ad44f 100644 --- a/main.go +++ b/main.go @@ -6,26 +6,11 @@ import ( "fmt" "log" "os" - "os/signal" - "path" - "path/filepath" - "syscall" "time" ) -type Options struct { - RedisTTL time.Duration - CurrentPath string - Exe string - LogFile string - Timeout bool - Debug bool - Version bool - BufferSize int -} - const ( - _VERSION = "v2.1.0" + _VERSION = "v2.2.0" ) var ( @@ -36,43 +21,13 @@ var ( loop = true ttl = time.Second * 55 - done = make(chan bool) - msgs chan string count = 0 errCount = 0 + + done = make(chan bool) ) -func usage() { - fmt.Println("Usage: llmongo -m -r -t -l -b -T -D -v\n") - os.Exit(0) -} - -func init() { - var err error - opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) - if err != nil { - log.Fatal(err) - } - - opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) - pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") - opts.Exe = path.Base(os.Args[0]) - - flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb") - flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis") - flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") - flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL") - flag.BoolVar(&opts.Version, "v", false, "Version") - flag.BoolVar(&opts.Timeout, "T", false, "Timeout") - flag.BoolVar(&opts.Debug, "D", false, "Debug") - flag.IntVar(&opts.BufferSize, "b", 1, "Buffer size") -} - -func stopLoop() { - loop = false -} - func main() { flag.Usage = usage flag.Parse() @@ -81,14 +36,9 @@ func main() { fmt.Println(os.Args[0], _VERSION) os.Exit(0) } + defer close(done) - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - signal.Notify(c, syscall.SIGTERM) - go func() { - <-c - stopLoop() - }() + setTerm() fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { @@ -113,22 +63,22 @@ func main() { defer dbs.mdb.Close() if opts.Timeout { - time.AfterFunc(ttl, stopLoop) + time.AfterFunc(ttl, exit) } - msgs = make(chan string, opts.BufferSize) + for i := 0; i < opts.BufferSize; i++ { + consume := make(chan produced) + remove := make(chan consumed) + // defer close(consume) + // defer close(remove) - go producer() - - for loop { - start := time.Now() - user := <-msgs - fmt.Printf("Wait: %v\n\r", time.Since(start)) - if user != "" { - go consumer(user) - } + go producer(consume, done) + go consumer(consume, remove) + go remover(remove) } + <-done + fmt.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount()) log.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount()) } diff --git a/options.go b/options.go new file mode 100644 index 0000000..d2e677f --- /dev/null +++ b/options.go @@ -0,0 +1,49 @@ +// options +package main + +import ( + "flag" + "fmt" + "log" + "os" + "path" + "path/filepath" + "time" +) + +type Options struct { + RedisTTL time.Duration + CurrentPath string + Exe string + LogFile string + Timeout bool + Debug bool + Version bool + BufferSize int +} + +func usage() { + fmt.Println("Usage: llmongo -m -r -t -l -b -T -D -v\n") + os.Exit(0) +} + +func init() { + var err error + opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) + pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") + opts.Exe = path.Base(os.Args[0]) + + flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb") + flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.BoolVar(&opts.Timeout, "T", false, "Timeout") + flag.BoolVar(&opts.Debug, "D", false, "Debug") + flag.IntVar(&opts.BufferSize, "b", 1, "Buffer size") +} diff --git a/producer.go b/producer.go index e64bee8..dba37d6 100644 --- a/producer.go +++ b/producer.go @@ -8,11 +8,17 @@ import ( "time" ) -func producer() { +type produced struct { + user string + logins []string +} + +func producer(consume chan produced, done chan bool) { conn := dbs.rdb.Get() defer conn.Close() - for loop { + for { + start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login user, err := redis.String(conn.Do("spop", "llindex")) @@ -26,9 +32,25 @@ func producer() { log.Printf("LLINDEX empty: %v\n\r", err) fmt.Printf("LLINDEX empty: %v\n\r", err) } - break + done <- true } - msgs <- user + + // estrae tutti i login dell'utente "user" + logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1")) + if err != nil { + fmt.Printf("LRANGE: %+v - %+v\n\r", err, logs) + log.Printf("LRANGE: %+v - %+v\n\r", err, logs) + } + if opts.Debug { + fmt.Printf("LRANGE: %s - %d\n\r", user, len(logs)) + log.Printf("LRANGE: %s - %d\n\r", user, len(logs)) + } + fmt.Printf("PROD: user=%s in %v - conn=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount()) + + consume <- produced{ + user: user, + logins: logs, + } } } diff --git a/remover.go b/remover.go new file mode 100644 index 0000000..dfd103c --- /dev/null +++ b/remover.go @@ -0,0 +1,29 @@ +// finalizer +package main + +import ( + "fmt" + "log" + "time" +) + +func remover(remove chan consumed) { + var conn = dbs.rdb.Get() + defer conn.Close() + + for { + rem := <-remove + + start := time.Now() + for i := range rem.logins { + login := rem.logins[i] + // cancella da Redis la riga di login inserita + conn.Send("lrem", rem.user, "0", login) + } + conn.Flush() + if opts.Debug { + log.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start)) + fmt.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start)) + } + } +} diff --git a/sigterm.go b/sigterm.go new file mode 100644 index 0000000..992e28a --- /dev/null +++ b/sigterm.go @@ -0,0 +1,22 @@ +// sigterm +package main + +import ( + "os" + "os/signal" + "syscall" +) + +func exit() { + done <- true +} + +func setTerm() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + <-c + exit() + }() +}