From 0eca19256c2f1e6cafc6fee09b5f69171e2c5732 Mon Sep 17 00:00:00 2001
From: Miki <mikif70@gmail.com>
Date: Fri, 15 Apr 2016 17:31:00 +0200
Subject: [PATCH] skip+limit test

---
 aggregate.go             | 94 +++++++++++++++++-----------------------
 lastlogin_consolidate.go |  1 +
 options.go               |  1 +
 3 files changed, 42 insertions(+), 54 deletions(-)

diff --git a/aggregate.go b/aggregate.go
index 033fa39..e5ce833 100644
--- a/aggregate.go
+++ b/aggregate.go
@@ -10,38 +10,30 @@ import (
 	"gopkg.in/mgo.v2/bson"
 )
 
-func consolidate(ys time.Time, ye time.Time) {
-
-	limit := 10000
-
-	pre := dbs.mdbDst.DB("dovecot").C("pre_lastlogin_day")
-
-	query := pre.Find(bson.M{})
-
-	if opts.Debug {
-		tot, _ := query.Count()
-		fmt.Printf("Users: %d\n", tot)
+func bulkWrite() {
+	res, err := dbs.bulk.Run()
+	if err != nil {
+		log.Println("Insert error: ", err)
 	}
+	fmt.Printf("Bulk res: %+v\n", res)
+}
 
-	query.Batch(limit)
-	iter := query.Iter()
+func consolidate(users []Users, ys time.Time, ye time.Time) {
 
 	if opts.Bulk {
 		dbs.bulk = dbs.lc.Bulk()
 		dbs.bulk.Unordered()
 	}
 
-	result := Users{}
-
-	for iter.Next(&result) {
+	for _, user := range users {
 		idb.CountTOT += 1
 		ll := LastLoginDay{}
-		ll.User = result.User
+		ll.User = user.User
 		ll.Date = ys
 
 		// DEBUG
 
-		logins := result.Logins
+		logins := user.Logins
 
 		ips := []IPs{}
 		lastip := IPs{}
@@ -68,12 +60,6 @@ func consolidate(ys time.Time, ye time.Time) {
 		}
 		ll.IPs = ips
 
-		/*
-			if opts.Debug {
-				fmt.Printf("Insert %+v\n\r", ll)
-			}
-		*/
-
 		iStart := time.Now()
 
 		if opts.Bulk {
@@ -93,53 +79,53 @@ func consolidate(ys time.Time, ye time.Time) {
 	}
 
 	if opts.Bulk {
-		res, err := dbs.bulk.Run()
-		if err != nil {
-			log.Println("Insert error: ", err)
-		}
-		fmt.Printf("Bulk res: %+v\n", res)
+		bulkWrite()
 	}
 
 	if opts.Debug {
-		fmt.Printf("Insert: %d\n", idb.CountOK)
+		fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert)
 	}
 
-	pre.DropCollection()
+	//	pre.DropCollection()
 }
 
 func aggregate(ys time.Time, ye time.Time) {
 
-	qStart := time.Now()
+	skip := 0
+	limit := 100000
 
-	p := dbs.ll.Pipe([]bson.M{
-		{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}},
-		{"$group": bson.M{"_id": "$user"}},
-		{"$project": bson.M{"user": "$_id"}}})
+	for {
+
+		qStart := time.Now()
 
-	/*
 		p := dbs.ll.Pipe([]bson.M{
 			{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}},
-			//		{"$sort": bson.M{"user": -1, "date": 1}},
+			{"$sort": bson.M{"user": -1, "date": 1}},
 			{"$group": bson.M{"_id": "$user",
-				"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}})
-		//		{"$sort": bson.M{"_id": -1}},
-		//		{"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}},
-		//		{"$out": "pre_lastlogin_day"}})
-	*/
+				"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}},
+			{"$sort": bson.M{"_id": -1}},
+			{"$skip": skip},
+			{"$limit": limit},
+			{"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}}).AllowDiskUse()
 
-	idb.Pipe = idb.Pipe + time.Since(qStart)
-	if opts.Debug {
-		res := new(interface{})
-		err := p.Explain(res)
-		fmt.Printf("Pipe: %+v\nErr: %+v\n", *res, err)
-		res = new(interface{})
-		err = p.One(res)
-		fmt.Printf("user: %+v\nErr: %+v\n", *res, err)
-		fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart))
+		result := make([]Users, limit)
+		p.All(&result)
+		if len(result) <= 0 {
+			break
+		}
+
+		if opts.Debug {
+			last := len(result) - 1
+			fmt.Printf("Res %d: %d - first: %+v - last: %+v - time: %+v\n", skip, len(result), result[0].User, result[last].User, time.Since(qStart))
+		}
+
+		idb.Pipe = idb.Pipe + time.Since(qStart)
+
+		consolidate(result, ys, ye)
+
+		skip += limit
 	}
 
-	consolidate(ys, ye)
-
 	fmt.Printf("Date: %s - %s\n", ys, ye)
 	log.Printf("Date: %s - %s\n", ys, ye)
 }
diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go
index cb3b481..379992c 100644
--- a/lastlogin_consolidate.go
+++ b/lastlogin_consolidate.go
@@ -26,6 +26,7 @@ var (
 		StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat),
 		Duration:  _24h,
 		Interval:  _15m,
+		Batch:     10000,
 	}
 
 	dbs = Dbs{}
diff --git a/options.go b/options.go
index 0dddbaf..e696e42 100644
--- a/options.go
+++ b/options.go
@@ -24,6 +24,7 @@ type Options struct {
 	Version   bool
 	Debug     bool
 	Bulk      bool
+	Batch     int
 }
 
 func usage() {