// aggregate package main import ( "fmt" "gopkg.in/mgo.v2/bson" "log" "time" ) func aggregate(ys time.Time, ye time.Time) { qStart := time.Now() p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) if opts.Debug { fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) } fmt.Printf("Date: %s - %s\n", ys, ye) log.Printf("Date: %s - %s\n", ys, ye) ar := Users{} it := p.Iter() for it.Next(&ar) { countTOT += 1 ll := LastLoginDay{} ll.User = ar.User ll.Date = ys // DEBUG if opts.Debug { fmt.Printf("User: %s\n\r", ar.User) } qStart = time.Now() nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) iter := nq.Iter() result := LastLogin{} ips := []IPs{} lastip := IPs{} for iter.Next(&result) { if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) } else { ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) lastip.Date = result.Date lastip.IP = result.IP } switch result.Protocol { case "pop3", "pop": ll.Protocols.Pop += 1 case "imap": ll.Protocols.Imap += 1 case "web": ll.Protocols.Web += 1 } } if err := iter.Close(); err != nil { log.Println("Iter: ", err) } ll.IPs = ips //fmt.Printf("Upsert %+v\n\r", ll) _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } countOK += 1 } }