llconsolidate/aggregate.go

123 lines
2.4 KiB
Go
Raw Normal View History

// aggregate
package main
import (
"fmt"
"log"
2016-04-15 15:28:52 +02:00
// "sort"
"time"
2016-04-06 12:05:40 +02:00
"gopkg.in/mgo.v2/bson"
)
2016-04-15 17:31:00 +02:00
func bulkWrite() {
_, err := dbs.bulk.Run()
2016-04-15 17:31:00 +02:00
if err != nil {
log.Println("Insert error: ", err)
}
//fmt.Printf("Bulk res: %+v\n", res)
2016-04-15 17:31:00 +02:00
}
func consolidate(user Users, ys time.Time, ye time.Time) {
2016-04-15 15:28:52 +02:00
if opts.Bulk {
dbs.bulk = dbs.lc.Bulk()
dbs.bulk.Unordered()
}
idb.CountTOT += 1
ll := LastLoginDay{}
ll.User = user.User
ll.Date = ys
// DEBUG
2016-04-15 15:28:52 +02:00
logins := user.Logins
2016-04-15 15:28:52 +02:00
ips := []IPs{}
lastip := IPs{}
for l := range logins {
if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval {
/*
if opts.Debug {
fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date))
}
*/
2016-04-06 12:05:40 +02:00
} else {
ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
lastip.Date = logins[l].Date
lastip.IP = logins[l].IP
}
switch logins[l].Protocol {
case "pop3", "pop":
ll.Protocols.Pop += 1
case "imap":
ll.Protocols.Imap += 1
case "web":
ll.Protocols.Web += 1
2016-04-06 12:05:40 +02:00
}
}
ll.IPs = ips
iStart := time.Now()
2016-04-15 15:28:52 +02:00
if opts.Bulk {
//dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
dbs.bulk.Insert(ll)
} else {
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil {
log.Println("Insert error: ", err)
}
// fmt.Printf("Change: %+v\n", info)
2016-04-15 15:28:52 +02:00
}
idb.Insert += time.Since(iStart)
2016-04-15 15:28:52 +02:00
idb.CountOK += 1
if opts.Bulk {
bulkWrite()
2016-04-15 15:28:52 +02:00
}
// if opts.Debug {
// fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert)
// }
2016-04-15 15:28:52 +02:00
}
func aggregate(ys time.Time, ye time.Time) {
groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"}
2016-04-15 17:31:00 +02:00
for g := range groups {
2016-04-15 15:28:52 +02:00
2016-04-15 17:31:00 +02:00
qStart := time.Now()
2016-04-15 15:28:52 +02:00
p := dbs.ll.Pipe([]bson.M{
{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye},
"user": bson.RegEx{"^" + groups[g], ""}}},
2016-04-15 17:31:00 +02:00
{"$sort": bson.M{"user": -1, "date": 1}},
2016-04-15 15:28:52 +02:00
{"$group": bson.M{"_id": "$user",
"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse()
iter := p.Iter()
defer iter.Close()
var result Users
for iter.Next(&result) {
consolidate(result, ys, ye)
2016-04-15 17:31:00 +02:00
}
2016-04-15 15:28:52 +02:00
2016-04-15 17:31:00 +02:00
if opts.Debug {
fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart))
2016-04-15 17:31:00 +02:00
}
// p.All(&result)
2016-04-15 15:28:52 +02:00
idb.Pipe = idb.Pipe + time.Since(qStart)
2016-04-15 17:31:00 +02:00
}
2016-04-15 15:28:52 +02:00
fmt.Printf("Date: %s - %s\n", ys, ye)
log.Printf("Date: %s - %s\n", ys, ye)
}