// aggregate package main import ( "fmt" "log" // "sort" "time" "gopkg.in/mgo.v2/bson" ) func consolidate(ys time.Time, ye time.Time) { limit := 10000 pre := dbs.mdbDst.DB("dovecot").C("pre_lastlogin_day") query := pre.Find(bson.M{}) if opts.Debug { tot, _ := query.Count() fmt.Printf("Users: %d\n", tot) } query.Batch(limit) iter := query.Iter() if opts.Bulk { dbs.bulk = dbs.lc.Bulk() dbs.bulk.Unordered() } result := Users{} for iter.Next(&result) { idb.CountTOT += 1 ll := LastLoginDay{} ll.User = result.User ll.Date = ys // DEBUG logins := result.Logins ips := []IPs{} lastip := IPs{} for l := range logins { if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { /* if opts.Debug { fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) } */ } else { ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) lastip.Date = logins[l].Date lastip.IP = logins[l].IP } switch logins[l].Protocol { case "pop3", "pop": ll.Protocols.Pop += 1 case "imap": ll.Protocols.Imap += 1 case "web": ll.Protocols.Web += 1 } } ll.IPs = ips /* if opts.Debug { fmt.Printf("Insert %+v\n\r", ll) } */ iStart := time.Now() if opts.Bulk { //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) dbs.bulk.Insert(ll) } else { info, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) continue } fmt.Printf("Change: %+v\n", info) } idb.Insert += time.Since(iStart) idb.CountOK += 1 } if opts.Bulk { res, err := dbs.bulk.Run() if err != nil { log.Println("Insert error: ", err) } fmt.Printf("Bulk res: %+v\n", res) } if opts.Debug { fmt.Printf("Insert: %d\n", idb.CountOK) } pre.DropCollection() } func aggregate(ys time.Time, ye time.Time) { qStart := time.Now() p := dbs.ll.Pipe([]bson.M{ {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) /* p := dbs.ll.Pipe([]bson.M{ {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, // {"$sort": bson.M{"user": -1, "date": 1}}, {"$group": bson.M{"_id": "$user", "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}) // {"$sort": bson.M{"_id": -1}}, // {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}, // {"$out": "pre_lastlogin_day"}}) */ idb.Pipe = idb.Pipe + time.Since(qStart) if opts.Debug { res := new(interface{}) err := p.Explain(res) fmt.Printf("Pipe: %+v\nErr: %+v\n", *res, err) res = new(interface{}) err = p.One(res) fmt.Printf("user: %+v\nErr: %+v\n", *res, err) fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) } consolidate(ys, ye) fmt.Printf("Date: %s - %s\n", ys, ye) log.Printf("Date: %s - %s\n", ys, ye) }