llmongodb/producer.go

92 lines
1.5 KiB
Go

// iterator
package main
import (
"fmt"
"log"
// "strconv"
"time"
"github.com/garyburd/redigo/redis"
)
type produced struct {
user string
logins []string
}
type loginsList struct {
user string
logins []string
err bool
}
func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
for loop {
wg.Wait()
status = _Producer
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("LLINDEX empty: %v\n", err)
}
log.Printf("LLINDEX empty: %v\n", err)
break
}
// estrae tutti i logins dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
if err != nil {
if opts.Debug {
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
}
log.Printf("LRANGE: %+v - %+v\n", err, logs)
}
if opts.Debug {
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
if len(logs) > 0 {
counter <- Counterchan{
tipo: "user",
val: 1,
}
if opts.Debug {
fmt.Printf("PROD: %+v\n", time.Since(start))
}
if opts.Test {
log.Printf("PROD: %s - %d\n", user, len(logs))
}
wg.Add(1)
counter <- Counterchan{
tipo: "wg",
val: 1,
}
counter <- Counterchan{
tipo: "log",
val: len(logs),
}
consume <- loginsList{
user: user,
logins: logs,
err: false,
}
}
}
done <- true
}