// aggregate package main import ( "fmt" "log" "time" "gopkg.in/mgo.v2/bson" ) type Aggregate struct { start time.Time stop time.Time users Users cUsers int cLogins int } func Consolidate(ys time.Time, ye time.Time) *Aggregate { a := Aggregate{ start: ys, stop: ye, cLogins: 0, cUsers: 0, } return &a } func (a Aggregate) Verify() int { tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count() if err != nil { fmt.Printf("Verify error: %+v\n", err) } return tot } func (a Aggregate) bulkWrite() { _, err := dbs.bulk.Run() if err != nil { log.Println("Insert error: ", err) } } func (a Aggregate) consolidate() { if opts.Bulk { dbs.bulk = dbs.lc.Bulk() dbs.bulk.Unordered() } idb.TotUsers += 1 ll := LastLoginDay{} ll.User = a.users.User ll.Date = a.start logins := a.users.Logins ips := []IPs{} lastip := IPs{} for l := range logins { if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { // a.discard += 1 // if opts.Debug { // fmt.Printf("\rDiscarded: %06d", a.discard) // } } else { ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) lastip.Date = logins[l].Date lastip.IP = logins[l].IP } switch logins[l].Protocol { case "pop3", "pop": ll.Protocols.Pop += 1 case "imap": ll.Protocols.Imap += 1 case "web": ll.Protocols.Web += 1 } } ll.IPs = ips iStart := time.Now() if opts.Bulk { dbs.bulk.Insert(ll) } else { _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } } idb.Insert += time.Since(iStart) if opts.Bulk { a.bulkWrite() } } func (a Aggregate) Start() { groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"} for g := range groups { qStart := time.Now() p := dbs.ll.Pipe([]bson.M{ {"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop}, "user": bson.RegEx{"^" + groups[g], ""}}}, {"$sort": bson.M{"user": -1, "date": 1}}, {"$group": bson.M{"_id": "$user", "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse() iter := p.Iter() defer iter.Close() a.cUsers = 0 a.cLogins = 0 a.users = *new(Users) for iter.Next(&a.users) { a.cUsers += 1 a.cLogins += len(a.users.Logins) a.consolidate() } idb.TotLogins += a.cLogins if opts.Debug { fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart)) } idb.Pipe = idb.Pipe + time.Since(qStart) } fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) }