diff --git a/consumer.go b/consumer.go index 954f345..a65a05d 100644 --- a/consumer.go +++ b/consumer.go @@ -4,11 +4,12 @@ package main import ( "fmt" // "github.com/garyburd/redigo/redis" - // "gopkg.in/mgo.v2/bson" "log" "strconv" "strings" "time" + + "gopkg.in/mgo.v2" ) type consumed struct { @@ -27,11 +28,16 @@ func contains(s []Ips, e string) bool { return false } -func consumer(id int) { +func consumer() { for { - prod := <-consume[id] + prod := <-consume + + var bulk = make(map[string]*mgo.Bulk) + + bulk[opts.Month] = dbs.ll.Bulk() + bulk[opts.Month].Unordered() cons := consumed{ user: prod.user, @@ -64,6 +70,8 @@ func consumer(id int) { continue } ml := MongoLogin{ + // genera l' _ID con user e timestamp + ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")), User: prod.user, Protocol: sval[0], IP: sval[2], @@ -72,38 +80,30 @@ func consumer(id int) { } if opts.Month != ml.Date.Format("0601") { - lt := dbs.mdb.DB("lastlogin").C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))) - err = lt.Insert(ml) - if err != nil { - log.Printf("Insert error: %+v - %s - %s\n", err, cons.user, lt.FullName) - count.AddErr() - cons.error = true - continue - } - if opts.Debug { - log.Printf("%s - %+v\n", lt.FullName, ml) + dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")) + if _, ok := bulk[dt]; !ok { + bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk() + bulk[dt].Unordered() } + bulk[dt].Insert(ml) } else { // inserisce il login su Mongodb - err = dbs.ll.Insert(ml) - if err != nil { - log.Printf("Insert error: %+v - %s\n", err, cons.user) - count.AddErr() - cons.error = true - continue - } - if opts.Debug { - log.Printf("%+v\n", ml) - } - // inserisce last timestamp su redis per consolidamento - /* - */ + bulk[opts.Month].Insert(ml) + // inserisce last timestamp su redis per consolidamento } cons.logins = append(cons.logins, login) } + for key, _ := range bulk { + _, err := bulk[key].Run() + if err != nil { + + fmt.Printf("Err: %+v\n", err) + } + } + count.AddLog(len(prod.logins)) if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { @@ -111,9 +111,10 @@ func consumer(id int) { } if opts.Debug { - fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) + fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } - remove[id] <- cons + wg.Done() + // remove <- cons } } diff --git a/counter.go b/counter.go index 952c04f..5132ff4 100644 --- a/counter.go +++ b/counter.go @@ -15,7 +15,7 @@ type Counter struct { err int dup int time time.Duration - wg []int + wg int } // NewCounter iniitialized Counter structure @@ -27,7 +27,7 @@ func NewCounter() *Counter { rem: 0, dup: 0, time: 0, - wg: make([]int, opts.Concurrent), + wg: 0, } } @@ -60,10 +60,10 @@ func (c *Counter) AddRem(add int) { } // AddWG ... -func (c *Counter) AddWG(id int) { +func (c *Counter) AddWG() { c.mu.Lock() defer c.mu.Unlock() - c.wg[id]++ + c.wg++ } // AddErr ... @@ -74,10 +74,10 @@ func (c *Counter) AddErr() { } // DelWG ... -func (c *Counter) DelWG(id int) { +func (c *Counter) DelWG() { c.mu.Lock() defer c.mu.Unlock() - c.wg[id]-- + c.wg-- } // GetUser return total users @@ -121,10 +121,10 @@ func (c *Counter) GetRem() (ret int) { } // GetWG ... -func (c *Counter) GetWG(id int) (ret int) { +func (c *Counter) GetWG() (ret int) { c.mu.Lock() defer c.mu.Unlock() - ret = c.wg[id] + ret = c.wg return } diff --git a/dbs.go b/dbs.go index 16c1924..79ac023 100644 --- a/dbs.go +++ b/dbs.go @@ -33,6 +33,7 @@ type Dbs struct { // MongoLogin structure type MongoLogin struct { + ID string `json:"_id" bson:"_id"` User string `json:"user"` Protocol string `json:"protocol"` IP string `json:"ip"` diff --git a/main.go b/main.go index 498c209..a0480e0 100644 --- a/main.go +++ b/main.go @@ -11,15 +11,15 @@ import ( ) const ( - _Version = "v2.8.0.1" + _Version = "v3.0.0" ) var ( - loop []bool + loop bool - done []chan bool - consume []chan produced - remove []chan consumed + done chan bool + consume chan produced + remove chan consumed wg sync.WaitGroup @@ -76,27 +76,20 @@ func main() { count = NewCounter() - for i := 0; i < opts.Concurrent; i++ { - consume = append(consume, make(chan produced)) - remove = append(remove, make(chan consumed)) - loop = append(loop, true) - done = append(done, make(chan bool)) + consume = make(chan produced) + remove = make(chan consumed) + loop = true + done = make(chan bool) - go producer(i) - go consumer(i) - go remover(i) - } + go producer() + go consumer() + go remover() - for i := 0; i < opts.Concurrent; i++ { - <-done[i] - fmt.Printf("Done %d\n", i) - close(done[i]) - } + <-done + fmt.Printf("Done\n") + close(done) fmt.Println("Waiting WG") - for i := 0; i < opts.Concurrent; i++ { - fmt.Printf("ID (%d): %d\n", i, count.GetWG(i)) - } wg.Wait() count.SetTime(time.Since(start)) @@ -107,8 +100,4 @@ func main() { if opts.Influxdb != "" { writeStats(start) } - - if opts.Xymon != "" { - sendStatus() - } } diff --git a/options.go b/options.go index 43b6026..3d9a640 100644 --- a/options.go +++ b/options.go @@ -24,7 +24,6 @@ type Options struct { MaxLogins int Debug bool Version bool - Concurrent int MaxError int Xymon string Influxdb string @@ -42,7 +41,7 @@ var ( ) func usage() { - fmt.Println("Usage: llmongo -m -r -t -l -b -T -x -H -i -v") + fmt.Println("Usage: llmongo -m -r -t -l -T -x -H -i -v") fmt.Println() os.Exit(0) } @@ -70,7 +69,6 @@ func init() { flag.BoolVar(&opts.Version, "v", false, "Version") flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") flag.BoolVar(&opts.Debug, "D", false, "Debug") - flag.IntVar(&opts.Concurrent, "c", 1, "Concurrent thread") flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error") flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file") } diff --git a/producer.go b/producer.go index b913325..29c17d1 100644 --- a/producer.go +++ b/producer.go @@ -15,11 +15,11 @@ type produced struct { logins []string } -func producer(id int) { +func producer() { conn := dbs.rdb.Get() defer conn.Close() - for loop[id] { + for loop { start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login @@ -53,19 +53,19 @@ func producer(id int) { // } if opts.Debug { - fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) + fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) // log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) } count.AddUser() wg.Add(1) - count.AddWG(id) + count.AddWG() - consume[id] <- produced{ + consume <- produced{ user: user, logins: logs, } } - done[id] <- true + done <- true } diff --git a/remover.go b/remover.go index dfc9e4e..664af70 100644 --- a/remover.go +++ b/remover.go @@ -7,26 +7,26 @@ import ( "time" ) -func remover(id int) { +func remover() { + var conn = dbs.rdb.Get() defer conn.Close() for { - rem := <-remove[id] + rem := <-remove // wg.Add(1) start := time.Now() for i := range rem.logins { - login := rem.logins[i] // cancella da Redis la riga di login inserita partendo da 1 - conn.Send("lrem", rem.user, "1", login) + conn.Send("lrem", rem.user, "1", rem.logins[i]) } // se ci sono errori o non e' vuota la lista di logins reinserisce lo user - if rem.error || !rem.empty { + if !rem.empty || rem.error { if opts.Debug { - fmt.Printf("SADD (%d): %s\n", id, rem.user) + fmt.Printf("SADD: %s\n", rem.user) } conn.Send("sadd", "llindex", rem.user) if count.GetErr() >= opts.MaxError { @@ -37,9 +37,9 @@ func remover(id int) { conn.Flush() count.AddRem(len(rem.logins)) if opts.Debug { - fmt.Printf("LREM (%d): %s - %d - %+v\n", id, rem.user, len(rem.logins), time.Since(start)) + fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start)) } wg.Done() - count.DelWG(id) + count.DelWG() } } diff --git a/sigterm.go b/sigterm.go index 254fece..3f3e4e2 100644 --- a/sigterm.go +++ b/sigterm.go @@ -10,12 +10,9 @@ import ( ) func exit() { - for i := 0; i < opts.Concurrent; i++ { - log.Println("EXIT ", i) - fmt.Println("EXIT ", i) - loop[i] = false - // done[i] <- true - } + log.Println("EXIT ") + fmt.Println("EXIT ") + loop = false } func setTerm() {