implementata la scrittura Bulk e lo split dell'aggregato per username

This commit is contained in:
Miki 2016-04-21 12:36:06 +02:00
parent 0eca19256c
commit e3d7b1dae5
2 changed files with 82 additions and 91 deletions

View file

@ -11,119 +11,110 @@ import (
) )
func bulkWrite() { func bulkWrite() {
res, err := dbs.bulk.Run() _, err := dbs.bulk.Run()
if err != nil { if err != nil {
log.Println("Insert error: ", err) log.Println("Insert error: ", err)
} }
fmt.Printf("Bulk res: %+v\n", res) //fmt.Printf("Bulk res: %+v\n", res)
} }
func consolidate(users []Users, ys time.Time, ye time.Time) { func consolidate(user Users, ys time.Time, ye time.Time) {
if opts.Bulk { if opts.Bulk {
dbs.bulk = dbs.lc.Bulk() dbs.bulk = dbs.lc.Bulk()
dbs.bulk.Unordered() dbs.bulk.Unordered()
} }
for _, user := range users { idb.CountTOT += 1
idb.CountTOT += 1 ll := LastLoginDay{}
ll := LastLoginDay{} ll.User = user.User
ll.User = user.User ll.Date = ys
ll.Date = ys
// DEBUG // DEBUG
logins := user.Logins logins := user.Logins
ips := []IPs{} ips := []IPs{}
lastip := IPs{} lastip := IPs{}
for l := range logins { for l := range logins {
if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval {
/* /*
if opts.Debug { if opts.Debug {
fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date))
} }
*/ */
} else {
ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
lastip.Date = logins[l].Date
lastip.IP = logins[l].IP
}
switch logins[l].Protocol {
case "pop3", "pop":
ll.Protocols.Pop += 1
case "imap":
ll.Protocols.Imap += 1
case "web":
ll.Protocols.Web += 1
}
}
ll.IPs = ips
iStart := time.Now()
if opts.Bulk {
//dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
dbs.bulk.Insert(ll)
} else { } else {
info, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
if err != nil { lastip.Date = logins[l].Date
log.Println("Insert error: ", err) lastip.IP = logins[l].IP
continue }
} switch logins[l].Protocol {
fmt.Printf("Change: %+v\n", info) case "pop3", "pop":
ll.Protocols.Pop += 1
case "imap":
ll.Protocols.Imap += 1
case "web":
ll.Protocols.Web += 1
} }
idb.Insert += time.Since(iStart)
idb.CountOK += 1
} }
ll.IPs = ips
iStart := time.Now()
if opts.Bulk {
//dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
dbs.bulk.Insert(ll)
} else {
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil {
log.Println("Insert error: ", err)
}
// fmt.Printf("Change: %+v\n", info)
}
idb.Insert += time.Since(iStart)
idb.CountOK += 1
if opts.Bulk { if opts.Bulk {
bulkWrite() bulkWrite()
} }
if opts.Debug { // if opts.Debug {
fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert) // fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert)
} // }
// pre.DropCollection()
} }
func aggregate(ys time.Time, ye time.Time) { func aggregate(ys time.Time, ye time.Time) {
skip := 0 groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"}
limit := 100000
for { for g := range groups {
qStart := time.Now() qStart := time.Now()
p := dbs.ll.Pipe([]bson.M{ p := dbs.ll.Pipe([]bson.M{
{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye},
"user": bson.RegEx{"^" + groups[g], ""}}},
{"$sort": bson.M{"user": -1, "date": 1}}, {"$sort": bson.M{"user": -1, "date": 1}},
{"$group": bson.M{"_id": "$user", {"$group": bson.M{"_id": "$user",
"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}, "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse()
{"$sort": bson.M{"_id": -1}},
{"$skip": skip},
{"$limit": limit},
{"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}}).AllowDiskUse()
result := make([]Users, limit) iter := p.Iter()
p.All(&result) defer iter.Close()
if len(result) <= 0 {
break var result Users
for iter.Next(&result) {
consolidate(result, ys, ye)
} }
if opts.Debug { if opts.Debug {
last := len(result) - 1 fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart))
fmt.Printf("Res %d: %d - first: %+v - last: %+v - time: %+v\n", skip, len(result), result[0].User, result[last].User, time.Since(qStart))
} }
// p.All(&result)
idb.Pipe = idb.Pipe + time.Since(qStart) idb.Pipe = idb.Pipe + time.Since(qStart)
consolidate(result, ys, ye)
skip += limit
} }
fmt.Printf("Date: %s - %s\n", ys, ye) fmt.Printf("Date: %s - %s\n", ys, ye)

38
dbs.go
View file

@ -18,40 +18,40 @@ type Dbs struct {
} }
type LastLogin struct { type LastLogin struct {
User string `json: "user"` User string `json: "user" bson:"user"`
Protocol string `json: "protocol"` Protocol string `json: "protocol" bson:"protocol"`
IP string `json: "ip"` IP string `json: "ip" bson:"ip"`
Date time.Time `json: "date"` Date time.Time `json: "date" bson:"date"`
ID string `json: "_id"` ID string `json: "_id" bson:"_id"`
} }
type LastLoginDay struct { type LastLoginDay struct {
User string `json:"user"` User string `json:"user" bson:"user"`
Date time.Time `json:"date"` Date time.Time `json:"date" bson:"date"`
Protocols Protocols `json:"protocols"` Protocols Protocols `json:"protocols" bson:"protocols"`
IPs []IPs `json:"ips"` IPs []IPs `json:"ips" bson:"ips"`
} }
type IPs struct { type IPs struct {
IP string `json:"ip"` IP string `json:"ip" bson:"ip"`
Date time.Time `json:"date"` Date time.Time `json:"date" bson:"date"`
Protocol string `json:"protocol"` Protocol string `json:"protocol" bson:"protocol"`
} }
type Protocols struct { type Protocols struct {
Pop int `json:"pop"` Pop int `json:"pop" bson:"pop"`
Imap int `json:"imap"` Imap int `json:"imap" bson:"imap"`
Web int `json:"web"` Web int `json:"web" bson:"web"`
} }
type Index struct { type Index struct {
User string `json:"user"` User string `json:"user" bson:"user"`
Date time.Time `json:"date"` Date time.Time `json:"date" bson:"date"`
} }
type Users struct { type Users struct {
User string `json:"user"` User string `json:"_id" bson:"_id"`
Logins []IPs `json:"ips"` Logins []IPs `json:"logins" bson:"logins"`
} }
/* /*