From b095dea871fe780c9fea636af360f6eed552541b Mon Sep 17 00:00:00 2001 From: Michele Date: Fri, 4 Nov 2016 11:50:27 +0100 Subject: [PATCH] aggiunta la gestione errori dopo 20sec si killa se bloccato dopo il timeout --- consumer.go | 97 +++++++++++++++++++++++++++++++++++++---------------- counter.go | 8 ++--- main.go | 7 +++- options.go | 4 +-- producer.go | 3 ++ remover.go | 2 ++ sigterm.go | 14 ++++++-- xymon.go | 2 ++ 8 files changed, 99 insertions(+), 38 deletions(-) diff --git a/consumer.go b/consumer.go index a65a05d..2b901ea 100644 --- a/consumer.go +++ b/consumer.go @@ -19,25 +19,24 @@ type consumed struct { empty bool } -func contains(s []Ips, e string) bool { - for _, a := range s { - if a.IP == e { - return true - } - } - return false -} - func consumer() { for { prod := <-consume - var bulk = make(map[string]*mgo.Bulk) + status = _Consumer - bulk[opts.Month] = dbs.ll.Bulk() - bulk[opts.Month].Unordered() + var bulk = make(map[string]*mgo.Bulk) + var col = make(map[string]*mgo.Collection) + var slogin = make(map[string][]string) + + if opts.Bulk { + bulk[opts.Month] = dbs.ll.Bulk() + bulk[opts.Month].Unordered() + } else { + col[opts.Month] = dbs.ll + } cons := consumed{ user: prod.user, @@ -81,40 +80,80 @@ func consumer() { if opts.Month != ml.Date.Format("0601") { dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")) - if _, ok := bulk[dt]; !ok { - bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk() - bulk[dt].Unordered() + if opts.Bulk { + if _, ok := bulk[dt]; !ok { + bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk() + bulk[dt].Unordered() + } + bulk[dt].Insert(ml) + slogin[dt] = append(slogin[dt], login) + } else { + if _, ok := col[dt]; !ok { + col[dt] = dbs.mdb.DB("lastlogin").C(dt) + } + err = col[dt].Insert(ml) + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } + } + cons.logins = append(cons.logins, login) } - bulk[dt].Insert(ml) } else { // inserisce il login su Mongodb - - bulk[opts.Month].Insert(ml) - // inserisce last timestamp su redis per consolidamento + if opts.Bulk { + bulk[opts.Month].Insert(ml) + slogin[opts.Month] = append(slogin[opts.Month], login) + } else { + err = col[opts.Month].Insert(ml) + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } + } + cons.logins = append(cons.logins, login) + } } - - cons.logins = append(cons.logins, login) } - for key, _ := range bulk { - _, err := bulk[key].Run() - if err != nil { - - fmt.Printf("Err: %+v\n", err) + if opts.Bulk { + for key, _ := range bulk { + _, err := bulk[key].Run() + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Err: %+v\n", err) + cons.error = true + count.AddErr(len(slogin[key])) + continue + } else { + count.AddDuplicate(strings.Count(err.Error(), "E11000")) + } + } + cons.logins = append(cons.logins, slogin[key]...) } } count.AddLog(len(prod.logins)) if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { - cons.empty = true + cons.empty = false } if opts.Debug { fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } - wg.Done() - // remove <- cons + // wg.Done() + remove <- cons } } diff --git a/counter.go b/counter.go index 5132ff4..5370cb5 100644 --- a/counter.go +++ b/counter.go @@ -39,10 +39,10 @@ func (c *Counter) AddUser() { } // AddDuplicate increment number of duplicates log -func (c *Counter) AddDuplicate() { +func (c *Counter) AddDuplicate(add int) { c.mu.Lock() defer c.mu.Unlock() - c.dup++ + c.dup += add } // AddLog increment number of log's rows managed @@ -67,10 +67,10 @@ func (c *Counter) AddWG() { } // AddErr ... -func (c *Counter) AddErr() { +func (c *Counter) AddErr(add int) { c.mu.Lock() defer c.mu.Unlock() - c.err++ + c.err += add } // DelWG ... diff --git a/main.go b/main.go index a0480e0..aa02fd4 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,10 @@ import ( ) const ( - _Version = "v3.0.0" + _Version = "v3.0.0" + _Producer = 0 + _Consumer = 1 + _Remover = 2 ) var ( @@ -23,6 +26,8 @@ var ( wg sync.WaitGroup + status int + count *Counter ) diff --git a/options.go b/options.go index 3d9a640..2fb240b 100644 --- a/options.go +++ b/options.go @@ -24,8 +24,8 @@ type Options struct { MaxLogins int Debug bool Version bool + Bulk bool MaxError int - Xymon string Influxdb string Hostname string Month string @@ -57,7 +57,6 @@ func init() { opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") opts.Exe = path.Base(os.Args[0]) - flag.StringVar(&opts.Xymon, "x", "", "xymon server") flag.StringVar(&opts.Influxdb, "i", "", "influxdb server") flag.StringVar(&opts.Hostname, "H", "", "hostname") flag.StringVar(&dbs.MongoURI, "m", dbs.MongoURI, "Mongodb") @@ -69,6 +68,7 @@ func init() { flag.BoolVar(&opts.Version, "v", false, "Version") flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") flag.BoolVar(&opts.Debug, "D", false, "Debug") + flag.BoolVar(&opts.Bulk, "B", false, "Bulk") flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error") flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file") } diff --git a/producer.go b/producer.go index 29c17d1..a7c3e6a 100644 --- a/producer.go +++ b/producer.go @@ -21,6 +21,9 @@ func producer() { for loop { + wg.Wait() + status = _Producer + start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login user, err := redis.String(conn.Do("spop", "llindex")) diff --git a/remover.go b/remover.go index 664af70..53de5e9 100644 --- a/remover.go +++ b/remover.go @@ -15,6 +15,8 @@ func remover() { for { rem := <-remove + status = _Remover + // wg.Add(1) start := time.Now() diff --git a/sigterm.go b/sigterm.go index 3f3e4e2..c55b35b 100644 --- a/sigterm.go +++ b/sigterm.go @@ -7,12 +7,22 @@ import ( "os" "os/signal" "syscall" + "time" ) +func kill() { + log.Printf("KILL %d\n", status) + fmt.Printf("KILL %d\n", status) + wg.Done() + count.DelWG() + done <- true +} + func exit() { - log.Println("EXIT ") - fmt.Println("EXIT ") + log.Printf("EXIT %d\n", status) + fmt.Printf("EXIT %d\n", status) loop = false + time.AfterFunc(time.Second*20, kill) } func setTerm() { diff --git a/xymon.go b/xymon.go index 1cced4a..24c450f 100644 --- a/xymon.go +++ b/xymon.go @@ -1,6 +1,7 @@ // xymon.go package main +/* import ( "fmt" "log" @@ -59,3 +60,4 @@ func xymonSend(msg []byte) error { return nil } +*/