// consumer package main import ( "crypto/md5" "crypto/sha1" "crypto/sha256" "encoding/hex" "fmt" "log" "strconv" "strings" "time" "gopkg.in/mgo.v2" ) type consumed struct { user string error bool logins []string } func hash256(val []byte) string { h := sha256.New() h.Write(val) return hex.EncodeToString(h.Sum(nil)) } func hash160(val []byte) string { h := sha1.New() h.Write(val) return hex.EncodeToString(h.Sum(nil)) } func hash128(val []byte) string { h := md5.New() h.Write(val) return hex.EncodeToString(h.Sum(nil)) } // protocol:timestamp:ip:country func consumer() { for { prod := <-consume start := time.Now() status = _Consumer var bulk = make(map[string][]*mgo.Bulk) var allLogins = make(map[string]MongoLogin) for i := range prod.logins { login := prod.logins[i] // se la riga di login e' vuota if login == "" { log.Println("Login empty: ", prod.user) continue } sval := strings.Split(login, ":") // se il formato della riga di login non e' corretto if sval[1] == "" { log.Println("Login format error: ", login, prod.user) continue } // se il timestamp della riga di login non e' corretto date, err := strconv.ParseInt(sval[1], 10, 64) if err != nil { log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) continue } // se la data e' piu vecchia di RETENTION (15552000 sec) la scarta if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 { log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login) if opts.Debug { fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login) } if !opts.Test { continue } } // verifica se esiste la country if len(sval) <= 3 { sval = append(sval, "NONE") } // genera l' _ID con user + timestamp + ip mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405") ml := MongoLogin{ ID: mlID, User: prod.user, Protocol: sval[0], IP: sval[2], Date: time.Unix(date, 0), Insert: time.Now(), Country: sval[3], } allLogins[mlID] = ml } for _, val := range allLogins { dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay if _, ok := bulk[dt]; !ok { for j := range dbs.mdb { b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk() b.Unordered() bulk[dt] = append(bulk[dt], b) } } for _, bl := range bulk[dt] { bl.Insert(val) } } for _, val := range bulk { for j, bl := range val { result, err := bl.Run() if j == 0 { if err != nil { if !strings.Contains(err.Error(), "E11000") { fmt.Printf("Err: %+v\n", err) prod.err = true counter <- Counterchan{ tipo: "err", val: len(prod.logins), } if opts.Test || opts.Debug { log.Printf("ERR: %s - %+v\n", prod.user, prod.logins) } continue } else { counter <- Counterchan{ tipo: "dup", val: strings.Count(err.Error(), "E11000"), } if opts.Debug { log.Printf("DUP: %s - %+v\n", prod.user, prod.logins) } } } else { if opts.Test { log.Printf("OK: %s - %+v\n", prod.user, prod.logins) log.Printf("BulkResult: %s - %+v\n", prod.user, result) } counter <- Counterchan{ tipo: "ins", val: len(allLogins), } } } } } if opts.Debug { fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } if opts.Debug && len(prod.logins) > 10 { fmt.Printf("LOGS: %+v\n", prod.logins) } if !prod.err { counter <- Counterchan{ tipo: "rem", val: len(prod.logins), } } // wg.Done() remove <- prod } }