diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..8dca1ac --- /dev/null +++ b/consumer.go @@ -0,0 +1,91 @@ +// consumer +package main + +import ( + "fmt" + "github.com/garyburd/redigo/redis" + "log" + "strconv" + "strings" + "time" +) + +func consumer() { + var date int64 + var lastval string + var conn = dbs.rdb.Get() + defer conn.Close() + + for { + user := <-msgs + // Estrae l'ultimo login dell'utente 'user' + val, err := redis.String(conn.Do("LINDEX", user, "-1")) + if err != nil { + if opts.Debug { + log.Printf("LINDEX error: %+v - %s\n\r", err, val) + fmt.Printf("LINDEX error: %+v - %s\n\r", err, val) + } + // se ha trovato user e righe di login + if lastval != "" { + // reinserisce l'ultimo login e imposta il ttl su Redis + retval, _ := conn.Do("lpush", user, lastval) + ttl, _ := conn.Do("expire", user, opts.RedisTTL.Seconds()) + if opts.Debug { + log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) + fmt.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) + } + } + // break + continue + } + // se la riga di login e' vuota + if val == "" { + log.Println("Login empty: ", user) + retval, _ := conn.Do("lrem", user, "-1", val) + log.Println("LREM retval: ", user, val, retval) + continue + } + sval := strings.Split(val, ":") + // se il formato della riga di login non e' corretto + if sval[1] == "" { + log.Println("Login format error: ", val, user) + retval, _ := conn.Do("lrem", user, "-1", val) + log.Println("LREM retval: ", user, val, retval) + continue + } + // se il timestamp della riga di login non e' corretto + date, err = strconv.ParseInt(sval[1], 10, 64) + if err != nil { + log.Printf("Date Error: %+v - %s\n", err, user) + continue + } + ml := MongoLogin{ + User: user, + Protocol: sval[0], + Ip: sval[2], + Date: time.Unix(date, 0), + } + ind := Index{ + User: user, + Date: time.Unix(date, 0), + } + // inserisce il login su Mongodb + count++ + _, err = dbs.ll.Upsert(ind, ml) + if err != nil { + log.Printf("Insert error: %+v\n", err) + // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente + if !strings.Contains(err.Error(), "E11000") { + errCount += 1 + continue + } + } + // cancella da Redis la riga di login inserita + retval, err := conn.Do("lrem", user, "-1", val) + if opts.Debug { + log.Println("LREM retval: ", retval, user, val) + fmt.Println("LREM retval: ", retval, user, val) + } + lastval = val + } +} diff --git a/dbs.go b/dbs.go index 448416c..235de8d 100644 --- a/dbs.go +++ b/dbs.go @@ -2,7 +2,8 @@ package main import ( - "github.com/fzzy/radix/redis" + "github.com/garyburd/redigo/redis" + // "github.com/fzzy/radix/redis" "gopkg.in/mgo.v2" "log" "os" @@ -19,7 +20,7 @@ var ( type Dbs struct { MongoUri string RedisUri string - rdb *redis.Client + rdb *redis.Pool //*redis.Client mdb *mgo.Session ll *mgo.Collection // us *mgo.Collection @@ -43,6 +44,27 @@ type Index struct { Date time.Time `json:"date"` } +func (db *Dbs) poolRedis() { + + dbs.rdb = &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", db.RedisUri) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + +} + +/* func (db *Dbs) connectRedis() { var err error db.rdb, err = redis.Dial("tcp", db.RedisUri) @@ -51,6 +73,7 @@ func (db *Dbs) connectRedis() { os.Exit(-1) } } +*/ func (db *Dbs) connectMongo() { var err error diff --git a/main.go b/main.go index d33bfbc..3346835 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,6 @@ import ( "os" "path" "path/filepath" - "strconv" - "strings" "time" ) @@ -24,7 +22,7 @@ type Options struct { } const ( - _VERSION = "v1.2.2" + _VERSION = "v2.0.0" ) var ( @@ -35,6 +33,11 @@ var ( loop = true ttl = time.Second * 55 + done = make(chan bool) + msgs = make(chan string) + + count = 0 + errCount = 0 ) func usage() { @@ -91,7 +94,7 @@ func main() { fmt.Printf("Start: %+v\n", opts) log.Printf("Start: %+v\n", opts) - dbs.connectRedis() + dbs.poolRedis() defer dbs.rdb.Close() dbs.connectMongo() @@ -101,94 +104,9 @@ func main() { time.AfterFunc(ttl, stopLoop) } - count := 0 - errCount := 0 - for loop { - // estrae un userid dalla lista degli utenti che hanno fatto login - spop := dbs.rdb.Cmd("spop", "llindex") - user, err := spop.Str() - if opts.Debug { - log.Printf("SPOP: %+v %+v\n", spop, user) - } - // se non ci sono piu' userid esce - if err != nil { - if opts.Debug { - log.Printf("LLINDEX empty: %v\n", err) - } - break - } - var date int64 - var lastval, val string - for { - // Estrae l'ultimo login dell'utente 'user' - val, err = dbs.rdb.Cmd("lindex", user, "-1").Str() - if err != nil { - if opts.Debug { - log.Printf("LINDEX error: %+v\n", err) - } - // se ha trovato user e righe di login - if lastval != "" { - // reinserisce l'ultimo login e imposta il ttl su Redis - retval := dbs.rdb.Cmd("lpush", user, lastval) - ttl := dbs.rdb.Cmd("expire", user, opts.RedisTTL.Seconds()) - if opts.Debug { - log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) - } - } - break - } - // se la riga di login e' vuota - if val == "" { - log.Println("Login empty: ", user) - retval := dbs.rdb.Cmd("lrem", user, "-1", val) - log.Println("LREM retval: ", user, val, retval) - continue - } - sval := strings.Split(val, ":") - // se il formato della riga di login non e' corretto - if sval[1] == "" { - log.Println("Login format error: ", val, user) - retval := dbs.rdb.Cmd("lrem", user, "-1", val) - log.Println("LREM retval: ", user, val, retval) - continue - } - // se il timestamp della riga di login non e' corretto - date, err = strconv.ParseInt(sval[1], 10, 64) - if err != nil { - log.Printf("Date Error: %+v - %s\n", err, user) - continue - } - ml := MongoLogin{ - User: user, - Protocol: sval[0], - Ip: sval[2], - Date: time.Unix(date, 0), - } - ind := Index{ - User: user, - Date: time.Unix(date, 0), - } - // inserisce il login su Mongodb - count++ - _, err := dbs.ll.Upsert(ind, ml) - if err != nil { - log.Printf("Insert error: %+v\n", err) - // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente - if !strings.Contains(err.Error(), "E11000") { - errCount += 1 - continue - } - } - // inserisce lo user nella collectione che gestira' il lock nella procedura di consolidamento - // _ err := dbs.us.Upsert(ind, ul) - // cancella da Redis la riga di login inserita - retval := dbs.rdb.Cmd("lrem", user, "-1", val) - if opts.Debug { - log.Println("LREM retval: ", retval, user, val) - } - lastval = val - } - } + go producer() + go consumer() + <-done fmt.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount) log.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount) diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..4185a85 --- /dev/null +++ b/producer.go @@ -0,0 +1,32 @@ +// iterator +package main + +import ( + "fmt" + "github.com/garyburd/redigo/redis" + "log" +) + +func producer() { + conn := dbs.rdb.Get() + defer conn.Close() + + for loop { + // estrae un userid dalla lista degli utenti che hanno fatto login + user, err := redis.String(conn.Do("spop", "llindex")) + if opts.Debug { + log.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err) + fmt.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err) + } + // se non ci sono piu' userid esce + if err != nil { + if opts.Debug { + log.Printf("LLINDEX empty: %v\n\r", err) + fmt.Printf("LLINDEX empty: %v\n\r", err) + } + break + } + msgs <- user + } + done <- true +}