aggiunta la gestione errori

dopo 20sec si killa se bloccato dopo il timeout
This commit is contained in:
Michele 2016-11-04 11:50:27 +01:00
parent d7d68f4bf4
commit b095dea871
8 changed files with 99 additions and 38 deletions

View file

@ -19,25 +19,24 @@ type consumed struct {
empty bool
}
func contains(s []Ips, e string) bool {
for _, a := range s {
if a.IP == e {
return true
}
}
return false
}
func consumer() {
for {
prod := <-consume
var bulk = make(map[string]*mgo.Bulk)
status = _Consumer
var bulk = make(map[string]*mgo.Bulk)
var col = make(map[string]*mgo.Collection)
var slogin = make(map[string][]string)
if opts.Bulk {
bulk[opts.Month] = dbs.ll.Bulk()
bulk[opts.Month].Unordered()
} else {
col[opts.Month] = dbs.ll
}
cons := consumed{
user: prod.user,
@ -81,40 +80,80 @@ func consumer() {
if opts.Month != ml.Date.Format("0601") {
dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))
if opts.Bulk {
if _, ok := bulk[dt]; !ok {
bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk()
bulk[dt].Unordered()
}
bulk[dt].Insert(ml)
slogin[dt] = append(slogin[dt], login)
} else {
// inserisce il login su Mongodb
bulk[opts.Month].Insert(ml)
// inserisce last timestamp su redis per consolidamento
if _, ok := col[dt]; !ok {
col[dt] = dbs.mdb.DB("lastlogin").C(dt)
}
err = col[dt].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
}
}
cons.logins = append(cons.logins, login)
}
} else {
// inserisce il login su Mongodb
if opts.Bulk {
bulk[opts.Month].Insert(ml)
slogin[opts.Month] = append(slogin[opts.Month], login)
} else {
err = col[opts.Month].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
}
}
cons.logins = append(cons.logins, login)
}
}
}
if opts.Bulk {
for key, _ := range bulk {
_, err := bulk[key].Run()
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(len(slogin[key]))
continue
} else {
count.AddDuplicate(strings.Count(err.Error(), "E11000"))
}
}
cons.logins = append(cons.logins, slogin[key]...)
}
}
count.AddLog(len(prod.logins))
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
cons.empty = true
cons.empty = false
}
if opts.Debug {
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}
wg.Done()
// remove <- cons
// wg.Done()
remove <- cons
}
}

View file

@ -39,10 +39,10 @@ func (c *Counter) AddUser() {
}
// AddDuplicate increment number of duplicates log
func (c *Counter) AddDuplicate() {
func (c *Counter) AddDuplicate(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.dup++
c.dup += add
}
// AddLog increment number of log's rows managed
@ -67,10 +67,10 @@ func (c *Counter) AddWG() {
}
// AddErr ...
func (c *Counter) AddErr() {
func (c *Counter) AddErr(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.err++
c.err += add
}
// DelWG ...

View file

@ -12,6 +12,9 @@ import (
const (
_Version = "v3.0.0"
_Producer = 0
_Consumer = 1
_Remover = 2
)
var (
@ -23,6 +26,8 @@ var (
wg sync.WaitGroup
status int
count *Counter
)

View file

@ -24,8 +24,8 @@ type Options struct {
MaxLogins int
Debug bool
Version bool
Bulk bool
MaxError int
Xymon string
Influxdb string
Hostname string
Month string
@ -57,7 +57,6 @@ func init() {
opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.Xymon, "x", "", "xymon server")
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&opts.Hostname, "H", "", "hostname")
flag.StringVar(&dbs.MongoURI, "m", dbs.MongoURI, "Mongodb")
@ -69,6 +68,7 @@ func init() {
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Bulk, "B", false, "Bulk")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")
}

View file

@ -21,6 +21,9 @@ func producer() {
for loop {
wg.Wait()
status = _Producer
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))

View file

@ -15,6 +15,8 @@ func remover() {
for {
rem := <-remove
status = _Remover
// wg.Add(1)
start := time.Now()

View file

@ -7,12 +7,22 @@ import (
"os"
"os/signal"
"syscall"
"time"
)
func kill() {
log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status)
wg.Done()
count.DelWG()
done <- true
}
func exit() {
log.Println("EXIT ")
fmt.Println("EXIT ")
log.Printf("EXIT %d\n", status)
fmt.Printf("EXIT %d\n", status)
loop = false
time.AfterFunc(time.Second*20, kill)
}
func setTerm() {

View file

@ -1,6 +1,7 @@
// xymon.go
package main
/*
import (
"fmt"
"log"
@ -59,3 +60,4 @@ func xymonSend(msg []byte) error {
return nil
}
*/