llconsolidate/aggregate.go

90 lines
2 KiB
Go

// aggregate
package main
import (
"fmt"
"log"
"time"
"gopkg.in/mgo.v2/bson"
)
func aggregate(ys time.Time, ye time.Time) {
qStart := time.Now()
p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}})
idb.Pipe = time.Since(qStart)
if opts.Debug {
fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart))
}
fmt.Printf("Date: %s - %s\n", ys, ye)
log.Printf("Date: %s - %s\n", ys, ye)
ar := Users{}
it := p.Iter()
for it.Next(&ar) {
idb.CountTOT += 1
ll := LastLoginDay{}
ll.User = ar.User
ll.Date = ys
// DEBUG
if opts.Debug {
fmt.Printf("User: %s\n\r", ar.User)
}
qStart = time.Now()
nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date")
//fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) )
iter := nq.Iter()
if idb.Find == 0 {
idb.Find = time.Since(qStart)
} else {
idb.Find = (idb.Find + time.Since(qStart)) / 2
}
result := LastLogin{}
ips := []IPs{}
lastip := IPs{}
for iter.Next(&result) {
if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval {
//fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date))
} else {
ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol})
lastip.Date = result.Date
lastip.IP = result.IP
}
switch result.Protocol {
case "pop3", "pop":
ll.Protocols.Pop += 1
case "imap":
ll.Protocols.Imap += 1
case "web":
ll.Protocols.Web += 1
}
}
if err := iter.Close(); err != nil {
log.Println("Iter: ", err)
}
ll.IPs = ips
//fmt.Printf("Upsert %+v\n\r", ll)
iStart := time.Now()
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil {
log.Println("Insert error: ", err)
}
if idb.Insert == 0 {
idb.Insert = time.Since(iStart)
} else {
idb.Insert = (idb.Insert + time.Since(iStart)) / 2
}
idb.CountOK += 1
}
}