// consumer package main import ( "fmt" // "github.com/garyburd/redigo/redis" "log" "strconv" "strings" "time" "gopkg.in/mgo.v2" ) type consumed struct { user string error bool logins []string empty bool } func consumer() { for { prod := <-consume status = _Consumer var bulk = make(map[string]*mgo.Bulk) var col = make(map[string]*mgo.Collection) var slogin = make(map[string][]string) if opts.Bulk { bulk[opts.Month] = dbs.ll.Bulk() bulk[opts.Month].Unordered() } else { col[opts.Month] = dbs.ll } cons := consumed{ user: prod.user, logins: make([]string, 0), error: false, empty: true, } start := time.Now() for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) cons.logins = append(cons.logins, login) continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) cons.logins = append(cons.logins, login) continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) cons.logins = append(cons.logins, login) continue } ml := MongoLogin{ // genera l' _ID con user e timestamp ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")), User: prod.user, Protocol: sval[0], IP: sval[2], Date: time.Unix(date, 0), Insert: time.Now(), } if opts.Month != ml.Date.Format("0601") { dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")) if opts.Bulk { if _, ok := bulk[dt]; !ok { bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk() bulk[dt].Unordered() } bulk[dt].Insert(ml) slogin[dt] = append(slogin[dt], login) } else { if _, ok := col[dt]; !ok { col[dt] = dbs.mdb.DB("lastlogin").C(dt) } err = col[dt].Insert(ml) if err != nil { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) cons.error = true count.AddErr(1) continue } else { count.AddDuplicate(1) } } cons.logins = append(cons.logins, login) } } else { // inserisce il login su Mongodb if opts.Bulk { bulk[opts.Month].Insert(ml) slogin[opts.Month] = append(slogin[opts.Month], login) } else { err = col[opts.Month].Insert(ml) if err != nil { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) cons.error = true count.AddErr(1) continue } else { count.AddDuplicate(1) } } cons.logins = append(cons.logins, login) } } } if opts.Bulk { for key, _ := range bulk { _, err := bulk[key].Run() if err != nil { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) cons.error = true count.AddErr(len(slogin[key])) continue } else { count.AddDuplicate(strings.Count(err.Error(), "E11000")) } } cons.logins = append(cons.logins, slogin[key]...) } } count.AddLog(len(prod.logins)) if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { cons.empty = false } if opts.Debug { fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } // wg.Done() remove <- cons } }