skip+limit test
This commit is contained in:
parent
c0d5972a44
commit
0eca19256c
3 changed files with 42 additions and 54 deletions
94
aggregate.go
94
aggregate.go
|
@ -10,38 +10,30 @@ import (
|
|||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
func consolidate(ys time.Time, ye time.Time) {
|
||||
|
||||
limit := 10000
|
||||
|
||||
pre := dbs.mdbDst.DB("dovecot").C("pre_lastlogin_day")
|
||||
|
||||
query := pre.Find(bson.M{})
|
||||
|
||||
if opts.Debug {
|
||||
tot, _ := query.Count()
|
||||
fmt.Printf("Users: %d\n", tot)
|
||||
func bulkWrite() {
|
||||
res, err := dbs.bulk.Run()
|
||||
if err != nil {
|
||||
log.Println("Insert error: ", err)
|
||||
}
|
||||
fmt.Printf("Bulk res: %+v\n", res)
|
||||
}
|
||||
|
||||
query.Batch(limit)
|
||||
iter := query.Iter()
|
||||
func consolidate(users []Users, ys time.Time, ye time.Time) {
|
||||
|
||||
if opts.Bulk {
|
||||
dbs.bulk = dbs.lc.Bulk()
|
||||
dbs.bulk.Unordered()
|
||||
}
|
||||
|
||||
result := Users{}
|
||||
|
||||
for iter.Next(&result) {
|
||||
for _, user := range users {
|
||||
idb.CountTOT += 1
|
||||
ll := LastLoginDay{}
|
||||
ll.User = result.User
|
||||
ll.User = user.User
|
||||
ll.Date = ys
|
||||
|
||||
// DEBUG
|
||||
|
||||
logins := result.Logins
|
||||
logins := user.Logins
|
||||
|
||||
ips := []IPs{}
|
||||
lastip := IPs{}
|
||||
|
@ -68,12 +60,6 @@ func consolidate(ys time.Time, ye time.Time) {
|
|||
}
|
||||
ll.IPs = ips
|
||||
|
||||
/*
|
||||
if opts.Debug {
|
||||
fmt.Printf("Insert %+v\n\r", ll)
|
||||
}
|
||||
*/
|
||||
|
||||
iStart := time.Now()
|
||||
|
||||
if opts.Bulk {
|
||||
|
@ -93,53 +79,53 @@ func consolidate(ys time.Time, ye time.Time) {
|
|||
}
|
||||
|
||||
if opts.Bulk {
|
||||
res, err := dbs.bulk.Run()
|
||||
if err != nil {
|
||||
log.Println("Insert error: ", err)
|
||||
}
|
||||
fmt.Printf("Bulk res: %+v\n", res)
|
||||
bulkWrite()
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("Insert: %d\n", idb.CountOK)
|
||||
fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert)
|
||||
}
|
||||
|
||||
pre.DropCollection()
|
||||
// pre.DropCollection()
|
||||
}
|
||||
|
||||
func aggregate(ys time.Time, ye time.Time) {
|
||||
|
||||
qStart := time.Now()
|
||||
skip := 0
|
||||
limit := 100000
|
||||
|
||||
p := dbs.ll.Pipe([]bson.M{
|
||||
{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}},
|
||||
{"$group": bson.M{"_id": "$user"}},
|
||||
{"$project": bson.M{"user": "$_id"}}})
|
||||
for {
|
||||
|
||||
qStart := time.Now()
|
||||
|
||||
/*
|
||||
p := dbs.ll.Pipe([]bson.M{
|
||||
{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}},
|
||||
// {"$sort": bson.M{"user": -1, "date": 1}},
|
||||
{"$sort": bson.M{"user": -1, "date": 1}},
|
||||
{"$group": bson.M{"_id": "$user",
|
||||
"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}})
|
||||
// {"$sort": bson.M{"_id": -1}},
|
||||
// {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}},
|
||||
// {"$out": "pre_lastlogin_day"}})
|
||||
*/
|
||||
"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}},
|
||||
{"$sort": bson.M{"_id": -1}},
|
||||
{"$skip": skip},
|
||||
{"$limit": limit},
|
||||
{"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}}).AllowDiskUse()
|
||||
|
||||
idb.Pipe = idb.Pipe + time.Since(qStart)
|
||||
if opts.Debug {
|
||||
res := new(interface{})
|
||||
err := p.Explain(res)
|
||||
fmt.Printf("Pipe: %+v\nErr: %+v\n", *res, err)
|
||||
res = new(interface{})
|
||||
err = p.One(res)
|
||||
fmt.Printf("user: %+v\nErr: %+v\n", *res, err)
|
||||
fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart))
|
||||
result := make([]Users, limit)
|
||||
p.All(&result)
|
||||
if len(result) <= 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
last := len(result) - 1
|
||||
fmt.Printf("Res %d: %d - first: %+v - last: %+v - time: %+v\n", skip, len(result), result[0].User, result[last].User, time.Since(qStart))
|
||||
}
|
||||
|
||||
idb.Pipe = idb.Pipe + time.Since(qStart)
|
||||
|
||||
consolidate(result, ys, ye)
|
||||
|
||||
skip += limit
|
||||
}
|
||||
|
||||
consolidate(ys, ye)
|
||||
|
||||
fmt.Printf("Date: %s - %s\n", ys, ye)
|
||||
log.Printf("Date: %s - %s\n", ys, ye)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ var (
|
|||
StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat),
|
||||
Duration: _24h,
|
||||
Interval: _15m,
|
||||
Batch: 10000,
|
||||
}
|
||||
|
||||
dbs = Dbs{}
|
||||
|
|
|
@ -24,6 +24,7 @@ type Options struct {
|
|||
Version bool
|
||||
Debug bool
|
||||
Bulk bool
|
||||
Batch int
|
||||
}
|
||||
|
||||
func usage() {
|
||||
|
|
Loading…
Add table
Reference in a new issue