// consumer package m2r import ( "fmt" // "github.com/garyburd/redigo/redis" "log" "strconv" "strings" "time" "gopkg.in/mgo.v2" ) type consumed struct { user string error bool logins []string empty bool } func consumer() { for { prod := <-consume status = _Consumer var bulk = make(map[string]*mgo.Bulk) var rtbulk []MongoLogin var col = make(map[string]*mgo.Collection) var slogin = make(map[string][]string) if opts.Bulk { bulk[opts.Month] = dbs.ll.Bulk() bulk[opts.Month].Unordered() } else { col[opts.Month] = dbs.ll } cons := consumed{ user: prod.user, logins: make([]string, 0), error: false, empty: true, } start := time.Now() for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) cons.logins = append(cons.logins, login) continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) cons.logins = append(cons.logins, login) continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) cons.logins = append(cons.logins, login) continue } ml := MongoLogin{ // genera l' _ID con user e timestamp ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")), User: prod.user, Protocol: sval[0], IP: sval[2], Date: time.Unix(date, 0), Insert: time.Now(), } // inserisce il login su Mongodb if opts.Bulk { rtbulk = append(rtbulk, ml) slogin["rt"] = append(slogin["rt"], login) } else { resp, err := dbs.rtdb.Insert(ml) count.AddInsert(resp.Inserted) if err != nil { if !strings.Contains(err.Error(), "Duplicate primary key") { fmt.Printf("RT Insert Err: %+v\n", err) cons.error = true count.AddErr(1) continue } else { count.AddDuplicate(1) } } cons.logins = append(cons.logins, login) } } if opts.Bulk { resp, err := dbs.rtdb.MultiInsert(rtbulk) count.AddInsert(resp.Inserted) if err != nil { if !strings.Contains(err.Error(), "Duplicate primary key") { cons.error = true count.AddErr(resp.Errors) continue } else { count.AddDuplicate(resp.Errors) } } cons.logins = append(cons.logins, slogin["rt"]...) } count.AddLog(len(prod.logins)) if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { cons.empty = false } if opts.Debug { fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start)) } // wg.Done() remove <- cons } }