reimplementato usando i channels

This commit is contained in:
Miki 2015-11-20 15:23:12 +01:00
parent ba85c9ecc9
commit 699a3aaae4
6 changed files with 222 additions and 155 deletions

View file

@ -3,104 +3,99 @@ package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
// "github.com/garyburd/redigo/redis"
"log"
"strconv"
"strings"
"time"
)
func consumer(user string) {
var date int64
var lastval string
type consumed struct {
user string
logins []string
}
func consumer(consume chan produced, remove chan consumed) {
var conn = dbs.rdb.Get()
defer conn.Close()
start := time.Now()
for {
// Estrae l'ultimo login dell'utente 'user'
val, err := redis.String(conn.Do("LINDEX", user, "-1"))
if err != nil {
if opts.Debug {
log.Printf("LINDEX error: %+v - %s\n\r", err, val)
fmt.Printf("LINDEX error: %+v - %s\n\r", err, val)
}
if strings.Contains(err.Error(), "nil returned") {
// se ha trovato user e righe di login
if lastval != "" {
// reinserisce l'ultimo login e imposta il ttl su Redis
retval, errr := conn.Do("lpush", user, lastval)
ttl, errt := conn.Do("expire", user, int(opts.RedisTTL.Seconds()))
if opts.Debug {
log.Printf("LPUSH retval: %+v %+v %+v %+v %s %s %d\n\r", retval, errr, ttl, errt, user, lastval, int(opts.RedisTTL.Seconds()))
fmt.Printf("LPUSH retval: %+v %+v %+v %+v %s %s %d\n\r", retval, errr, ttl, errt, user, lastval, int(opts.RedisTTL.Seconds()))
}
}
} else {
if opts.Debug {
fmt.Printf("SADD error: %+v - %s\n\r", err, val)
}
val, err := conn.Do("sadd", "llindex", user)
if err != nil {
log.Printf("SADD error: %+v - %s\n\r", err, val)
}
}
break
prod := <-consume
cons := consumed{
user: prod.user,
logins: make([]string, 0),
}
// se la riga di login e' vuota
if val == "" {
log.Println("Login empty: ", user)
retval, _ := conn.Do("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
//return
continue
}
sval := strings.Split(val, ":")
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", val, user)
retval, _ := conn.Do("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
//return
continue
}
// se il timestamp della riga di login non e' corretto
date, err = strconv.ParseInt(sval[1], 10, 64)
if err != nil {
log.Printf("Date Error: %+v - %s\n", err, user)
//return
continue
}
ml := MongoLogin{
User: user,
Protocol: sval[0],
Ip: sval[2],
Date: time.Unix(date, 0),
}
ind := Index{
User: user,
Date: time.Unix(date, 0),
}
// inserisce il login su Mongodb
count++
_, err = dbs.ll.Upsert(ind, ml)
if err != nil {
log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
if !strings.Contains(err.Error(), "E11000") {
errCount += 1
//return
start := time.Now()
for i := range prod.logins {
login := prod.logins[i]
// se la riga di login e' vuota
if login == "" {
log.Println("Login empty: ", prod.user)
cons.logins = append(cons.logins, login)
// retval, _ := conn.Do("lrem", user, "0", login)
// log.Println("LREM retval: ", user, login, retval)
// return
continue
}
sval := strings.Split(login, ":")
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", login, prod.user)
cons.logins = append(cons.logins, login)
// retval, _ := conn.Do("lrem", user, "0", login)
// log.Println("LREM retval: ", user, login, retval)
// return
continue
}
// se il timestamp della riga di login non e' corretto
date, err := strconv.ParseInt(sval[1], 10, 64)
if err != nil {
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
cons.logins = append(cons.logins, login)
// retval, _ := conn.Do("lrem", user, "0", login)
// log.Println("LREM retval: ", user, login, retval)
// return
continue
}
ml := MongoLogin{
User: prod.user,
Protocol: sval[0],
Ip: sval[2],
Date: time.Unix(date, 0),
}
ind := Index{
User: prod.user,
Date: time.Unix(date, 0),
}
// inserisce il login su Mongodb
count++
_, err = dbs.ll.Upsert(ind, ml)
if err != nil {
log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
if !strings.Contains(err.Error(), "E11000") {
errCount += 1
//return
continue
}
}
if i < (len(prod.logins) - 1) {
cons.logins = append(cons.logins, login)
// cancella da Redis la riga di login inserita
// retval, _ := conn.Do("lrem", user, "0", login)
// if opts.Debug {
// log.Println("LREM retval: ", retval, user, login)
// fmt.Println("LREM retval: ", retval, user, login)
// }
}
}
// cancella da Redis la riga di login inserita
retval, err := conn.Do("lrem", user, "-1", val)
if opts.Debug {
log.Println("LREM retval: ", retval, user, val)
fmt.Println("LREM retval: ", retval, user, val)
}
lastval = val
fmt.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount())
log.Printf("CONS: user=%s in %v - active=%d\n\r", prod.user, time.Since(start), dbs.rdb.ActiveCount())
remove <- cons
}
fmt.Printf("CONS: user=%s in %v - active=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount())
log.Printf("CONS: user=%s in %v - active=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount())
}

82
main.go
View file

@ -6,26 +6,11 @@ import (
"fmt"
"log"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"
"time"
)
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
Timeout bool
Debug bool
Version bool
BufferSize int
}
const (
_VERSION = "v2.1.0"
_VERSION = "v2.2.0"
)
var (
@ -36,43 +21,13 @@ var (
loop = true
ttl = time.Second * 55
done = make(chan bool)
msgs chan string
count = 0
errCount = 0
done = make(chan bool)
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -b <buffer size> -T -D -v\n")
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.BoolVar(&opts.Timeout, "T", false, "Timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.IntVar(&opts.BufferSize, "b", 1, "Buffer size")
}
func stopLoop() {
loop = false
}
func main() {
flag.Usage = usage
flag.Parse()
@ -81,14 +36,9 @@ func main() {
fmt.Println(os.Args[0], _VERSION)
os.Exit(0)
}
defer close(done)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
stopLoop()
}()
setTerm()
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
@ -113,22 +63,22 @@ func main() {
defer dbs.mdb.Close()
if opts.Timeout {
time.AfterFunc(ttl, stopLoop)
time.AfterFunc(ttl, exit)
}
msgs = make(chan string, opts.BufferSize)
for i := 0; i < opts.BufferSize; i++ {
consume := make(chan produced)
remove := make(chan consumed)
// defer close(consume)
// defer close(remove)
go producer()
for loop {
start := time.Now()
user := <-msgs
fmt.Printf("Wait: %v\n\r", time.Since(start))
if user != "" {
go consumer(user)
}
go producer(consume, done)
go consumer(consume, remove)
go remover(remove)
}
<-done
fmt.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount())
log.Printf("Stop %v - login: %d - errors: %d - conn: %d\n\r", time.Since(start), count, errCount, dbs.rdb.ActiveCount())
}

49
options.go Normal file
View file

@ -0,0 +1,49 @@
// options
package main
import (
"flag"
"fmt"
"log"
"os"
"path"
"path/filepath"
"time"
)
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
Timeout bool
Debug bool
Version bool
BufferSize int
}
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -b <buffer size> -T -D -v\n")
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.BoolVar(&opts.Timeout, "T", false, "Timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.IntVar(&opts.BufferSize, "b", 1, "Buffer size")
}

View file

@ -8,11 +8,17 @@ import (
"time"
)
func producer() {
type produced struct {
user string
logins []string
}
func producer(consume chan produced, done chan bool) {
conn := dbs.rdb.Get()
defer conn.Close()
for loop {
for {
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))
@ -26,9 +32,25 @@ func producer() {
log.Printf("LLINDEX empty: %v\n\r", err)
fmt.Printf("LLINDEX empty: %v\n\r", err)
}
break
done <- true
}
msgs <- user
// estrae tutti i login dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
if err != nil {
fmt.Printf("LRANGE: %+v - %+v\n\r", err, logs)
log.Printf("LRANGE: %+v - %+v\n\r", err, logs)
}
if opts.Debug {
fmt.Printf("LRANGE: %s - %d\n\r", user, len(logs))
log.Printf("LRANGE: %s - %d\n\r", user, len(logs))
}
fmt.Printf("PROD: user=%s in %v - conn=%d\n\r", user, time.Since(start), dbs.rdb.ActiveCount())
consume <- produced{
user: user,
logins: logs,
}
}
}

29
remover.go Normal file
View file

@ -0,0 +1,29 @@
// finalizer
package main
import (
"fmt"
"log"
"time"
)
func remover(remove chan consumed) {
var conn = dbs.rdb.Get()
defer conn.Close()
for {
rem := <-remove
start := time.Now()
for i := range rem.logins {
login := rem.logins[i]
// cancella da Redis la riga di login inserita
conn.Send("lrem", rem.user, "0", login)
}
conn.Flush()
if opts.Debug {
log.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start))
fmt.Printf("LREM: %s - %d - %+v\n\r", rem.user, len(rem.logins), time.Since(start))
}
}
}

22
sigterm.go Normal file
View file

@ -0,0 +1,22 @@
// sigterm
package main
import (
"os"
"os/signal"
"syscall"
)
func exit() {
done <- true
}
func setTerm() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
exit()
}()
}