modificato il conteggio dei logins gestiti
This commit is contained in:
parent
8228f63fff
commit
58518c4b6a
4 changed files with 37 additions and 40 deletions
30
consumer.go
30
consumer.go
|
@ -17,7 +17,6 @@ type consumed struct {
|
|||
user string
|
||||
error bool
|
||||
logins []string
|
||||
empty bool
|
||||
}
|
||||
|
||||
func hash(val []byte) string {
|
||||
|
@ -41,32 +40,23 @@ func consumer() {
|
|||
var bulk = make(map[string][]*mgo.Bulk)
|
||||
var allLogins = make(map[string]MongoLogin)
|
||||
|
||||
cons := consumed{
|
||||
user: prod.user,
|
||||
logins: make([]string, 0),
|
||||
error: false,
|
||||
}
|
||||
|
||||
for i := range prod.logins {
|
||||
login := prod.logins[i]
|
||||
// se la riga di login e' vuota
|
||||
if login == "" {
|
||||
log.Println("Login empty: ", prod.user)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
sval := strings.Split(login, ":")
|
||||
// se il formato della riga di login non e' corretto
|
||||
if sval[1] == "" {
|
||||
log.Println("Login format error: ", login, prod.user)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
// se il timestamp della riga di login non e' corretto
|
||||
date, err := strconv.ParseInt(sval[1], 10, 64)
|
||||
if err != nil {
|
||||
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
mlID := hash([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
|
||||
|
@ -105,7 +95,7 @@ func consumer() {
|
|||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "E11000") {
|
||||
fmt.Printf("Err: %+v\n", err)
|
||||
cons.error = true
|
||||
prod.err = true
|
||||
counter <- Counterchan{
|
||||
tipo: "err",
|
||||
val: len(prod.logins),
|
||||
|
@ -128,13 +118,6 @@ func consumer() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
cons.logins = append(cons.logins, prod.logins...)
|
||||
}
|
||||
|
||||
counter <- Counterchan{
|
||||
tipo: "log",
|
||||
val: len(prod.logins),
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
|
@ -142,10 +125,17 @@ func consumer() {
|
|||
}
|
||||
|
||||
if opts.Debug && len(prod.logins) > 10 {
|
||||
fmt.Printf("LOGS: %+v\n", cons.logins)
|
||||
fmt.Printf("LOGS: %+v\n", prod.logins)
|
||||
}
|
||||
|
||||
if !prod.err {
|
||||
counter <- Counterchan{
|
||||
tipo: "rem",
|
||||
val: len(prod.logins),
|
||||
}
|
||||
}
|
||||
|
||||
// wg.Done()
|
||||
remove <- cons
|
||||
remove <- prod
|
||||
}
|
||||
}
|
||||
|
|
8
main.go
8
main.go
|
@ -21,8 +21,8 @@ var (
|
|||
loop bool
|
||||
|
||||
done chan bool
|
||||
consume chan produced
|
||||
remove chan consumed
|
||||
consume chan loginsList
|
||||
remove chan loginsList
|
||||
|
||||
counter chan Counterchan
|
||||
|
||||
|
@ -85,8 +85,8 @@ func main() {
|
|||
|
||||
count = NewCounter()
|
||||
|
||||
consume = make(chan produced, opts.Queue)
|
||||
remove = make(chan consumed, opts.Queue)
|
||||
consume = make(chan loginsList, opts.Queue)
|
||||
remove = make(chan loginsList, opts.Queue)
|
||||
loop = true
|
||||
done = make(chan bool)
|
||||
counter = make(chan Counterchan)
|
||||
|
|
25
producer.go
25
producer.go
|
@ -15,6 +15,12 @@ type produced struct {
|
|||
logins []string
|
||||
}
|
||||
|
||||
type loginsList struct {
|
||||
user string
|
||||
logins []string
|
||||
err bool
|
||||
}
|
||||
|
||||
func producer() {
|
||||
conn := dbs.rdb.Get()
|
||||
defer conn.Close()
|
||||
|
@ -54,11 +60,6 @@ func producer() {
|
|||
tipo: "user",
|
||||
val: 1,
|
||||
}
|
||||
wg.Add(1)
|
||||
counter <- Counterchan{
|
||||
tipo: "wg",
|
||||
val: 1,
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("PROD: %+v\n", time.Since(start))
|
||||
|
@ -68,9 +69,21 @@ func producer() {
|
|||
log.Printf("PROD: %s - %d\n", user, len(logs))
|
||||
}
|
||||
|
||||
consume <- produced{
|
||||
wg.Add(1)
|
||||
counter <- Counterchan{
|
||||
tipo: "wg",
|
||||
val: 1,
|
||||
}
|
||||
|
||||
counter <- Counterchan{
|
||||
tipo: "log",
|
||||
val: len(logs),
|
||||
}
|
||||
|
||||
consume <- loginsList{
|
||||
user: user,
|
||||
logins: logs,
|
||||
err: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
14
remover.go
14
remover.go
|
@ -18,7 +18,7 @@ func remover() {
|
|||
status = _Remover
|
||||
|
||||
start := time.Now()
|
||||
if !rem.error {
|
||||
if !rem.err {
|
||||
for i := range rem.logins {
|
||||
// cancella da Redis la riga di login inserita partendo da 1
|
||||
conn.Send("lrem", rem.user, "1", rem.logins[i])
|
||||
|
@ -26,7 +26,7 @@ func remover() {
|
|||
}
|
||||
|
||||
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
|
||||
if rem.error {
|
||||
if rem.err {
|
||||
if opts.Debug {
|
||||
fmt.Printf("SADD: %s\n", rem.user)
|
||||
}
|
||||
|
@ -38,20 +38,14 @@ func remover() {
|
|||
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
|
||||
conn.Flush()
|
||||
|
||||
if !rem.error {
|
||||
counter <- Counterchan{
|
||||
tipo: "rem",
|
||||
val: len(rem.logins),
|
||||
}
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
|
||||
}
|
||||
wg.Done()
|
||||
|
||||
counter <- Counterchan{
|
||||
tipo: "wg",
|
||||
val: -1,
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue