in caso di errore di inserimento su mongo, reinserisce lo user su redis (LLINDEX) e continua.

This commit is contained in:
Miki 2015-11-24 12:08:43 +01:00
parent 7ed0686953
commit 98b1858329
3 changed files with 10 additions and 2 deletions

View file

@ -12,6 +12,7 @@ import (
type consumed struct { type consumed struct {
user string user string
error bool
logins []string logins []string
} }
@ -30,6 +31,7 @@ func consumer(id int) {
cons := consumed{ cons := consumed{
user: prod.user, user: prod.user,
logins: make([]string, 0), logins: make([]string, 0),
error: false,
} }
start := time.Now() start := time.Now()
@ -81,6 +83,7 @@ func consumer(id int) {
log.Printf("Insert error: %+v\n", err) log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
count.AddErr() count.AddErr()
cons.error = true
continue continue
} }
if retval.Updated == 1 { if retval.Updated == 1 {

View file

@ -11,7 +11,7 @@ import (
) )
const ( const (
_VERSION = "v2.2.3" _VERSION = "v2.2.4"
) )
var ( var (

View file

@ -22,11 +22,16 @@ func remover(id int) {
// cancella da Redis la riga di login inserita // cancella da Redis la riga di login inserita
conn.Send("lrem", rem.user, "0", login) conn.Send("lrem", rem.user, "0", login)
} }
if rem.error {
if opts.Debug {
fmt.Printf("SADD (%d): %s\n", id, rem.user)
}
conn.Send("sadd", "llindex", rem.user)
}
conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush() conn.Flush()
count.AddRem(len(rem.logins)) count.AddRem(len(rem.logins))
if opts.Debug { if opts.Debug {
// log.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start))
fmt.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start)) fmt.Printf("LREM (%d): %s - %d - %+v\n\r", id, rem.user, len(rem.logins), time.Since(start))
} }
wg.Done() wg.Done()