From c0d5972a4487da3f42b4772cac535893b21b0930 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 15 Apr 2016 15:28:52 +0200 Subject: [PATCH] primo test --- aggregate.go | 141 +++++++++++++++++++++++++++------------ dbs.go | 12 +++- lastlogin_consolidate.go | 31 +++++---- options.go | 4 +- 4 files changed, 129 insertions(+), 59 deletions(-) diff --git a/aggregate.go b/aggregate.go index 64c31bb..033fa39 100644 --- a/aggregate.go +++ b/aggregate.go @@ -4,62 +4,60 @@ package main import ( "fmt" "log" + // "sort" "time" "gopkg.in/mgo.v2/bson" ) -func aggregate(ys time.Time, ye time.Time) { +func consolidate(ys time.Time, ye time.Time) { - qStart := time.Now() + limit := 10000 - p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) + pre := dbs.mdbDst.DB("dovecot").C("pre_lastlogin_day") + + query := pre.Find(bson.M{}) - idb.Pipe = time.Since(qStart) if opts.Debug { - fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + tot, _ := query.Count() + fmt.Printf("Users: %d\n", tot) } - fmt.Printf("Date: %s - %s\n", ys, ye) - log.Printf("Date: %s - %s\n", ys, ye) + query.Batch(limit) + iter := query.Iter() - ar := Users{} - it := p.Iter() + if opts.Bulk { + dbs.bulk = dbs.lc.Bulk() + dbs.bulk.Unordered() + } - for it.Next(&ar) { + result := Users{} + + for iter.Next(&result) { idb.CountTOT += 1 ll := LastLoginDay{} - ll.User = ar.User + ll.User = result.User ll.Date = ys // DEBUG - if opts.Debug { - fmt.Printf("User: %s\n\r", ar.User) - } - qStart = time.Now() - nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") - //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) - iter := nq.Iter() + logins := result.Logins - if idb.Find == 0 { - idb.Find = time.Since(qStart) - } else { - idb.Find = (idb.Find + time.Since(qStart)) / 2 - } - - result := LastLogin{} ips := []IPs{} lastip := IPs{} - for iter.Next(&result) { - if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { - //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) + for l := range logins { + if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { + /* + if opts.Debug { + fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) + } + */ } else { - ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) - lastip.Date = result.Date - lastip.IP = result.IP + ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) + lastip.Date = logins[l].Date + lastip.IP = logins[l].IP } - switch result.Protocol { + switch logins[l].Protocol { case "pop3", "pop": ll.Protocols.Pop += 1 case "imap": @@ -68,23 +66,80 @@ func aggregate(ys time.Time, ye time.Time) { ll.Protocols.Web += 1 } } - if err := iter.Close(); err != nil { - log.Println("Iter: ", err) - } ll.IPs = ips - //fmt.Printf("Upsert %+v\n\r", ll) + + /* + if opts.Debug { + fmt.Printf("Insert %+v\n\r", ll) + } + */ + iStart := time.Now() - _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - if err != nil { - log.Println("Insert error: ", err) - } - if idb.Insert == 0 { - idb.Insert = time.Since(iStart) + + if opts.Bulk { + //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + dbs.bulk.Insert(ll) } else { - idb.Insert = (idb.Insert + time.Since(iStart)) / 2 + info, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + if err != nil { + log.Println("Insert error: ", err) + continue + } + fmt.Printf("Change: %+v\n", info) } + idb.Insert += time.Since(iStart) idb.CountOK += 1 } + if opts.Bulk { + res, err := dbs.bulk.Run() + if err != nil { + log.Println("Insert error: ", err) + } + fmt.Printf("Bulk res: %+v\n", res) + } + + if opts.Debug { + fmt.Printf("Insert: %d\n", idb.CountOK) + } + + pre.DropCollection() +} + +func aggregate(ys time.Time, ye time.Time) { + + qStart := time.Now() + + p := dbs.ll.Pipe([]bson.M{ + {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, + {"$group": bson.M{"_id": "$user"}}, + {"$project": bson.M{"user": "$_id"}}}) + + /* + p := dbs.ll.Pipe([]bson.M{ + {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, + // {"$sort": bson.M{"user": -1, "date": 1}}, + {"$group": bson.M{"_id": "$user", + "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}) + // {"$sort": bson.M{"_id": -1}}, + // {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}, + // {"$out": "pre_lastlogin_day"}}) + */ + + idb.Pipe = idb.Pipe + time.Since(qStart) + if opts.Debug { + res := new(interface{}) + err := p.Explain(res) + fmt.Printf("Pipe: %+v\nErr: %+v\n", *res, err) + res = new(interface{}) + err = p.One(res) + fmt.Printf("user: %+v\nErr: %+v\n", *res, err) + fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + } + + consolidate(ys, ye) + + fmt.Printf("Date: %s - %s\n", ys, ye) + log.Printf("Date: %s - %s\n", ys, ye) } diff --git a/dbs.go b/dbs.go index 6302a87..6d5239b 100644 --- a/dbs.go +++ b/dbs.go @@ -2,10 +2,11 @@ package main import ( - "gopkg.in/mgo.v2" "log" "os" "time" + + "gopkg.in/mgo.v2" ) type Dbs struct { @@ -13,6 +14,7 @@ type Dbs struct { mdbDst *mgo.Session ll *mgo.Collection lc *mgo.Collection + bulk *mgo.Bulk } type LastLogin struct { @@ -47,9 +49,16 @@ type Index struct { Date time.Time `json:"date"` } +type Users struct { + User string `json:"user"` + Logins []IPs `json:"ips"` +} + +/* type Users struct { User string `json:"user"` } +*/ func connectMongo() { @@ -77,5 +86,4 @@ func connectMongo() { } dbs.lc = dbs.mdbDst.DB("dovecot").C("lastlogin_day") } - } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 4da5df9..cb3b481 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.2.3" + _VERSION = "v1.3.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -70,7 +70,7 @@ func main() { connectMongo() defer dbs.mdbSrc.Close() - defer dbs.mdbDst.Clone() + defer dbs.mdbDst.Close() y, err := time.Parse(_tformat, opts.StartDate) if err != nil { @@ -84,18 +84,23 @@ func main() { var ys []time.Time var ye []time.Time - if opts.Duration <= (time.Hour * 24) { - ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[0].Add(opts.Duration)) - } else { - for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ { - // fmt.Println(i) - yt := y.Add(time.Hour * time.Duration(24*i)) - // fmt.Println(yt) - ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[i].Add(_24h)) + // if opts.Duration <= (time.Hour * 24) { + ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) + ye = append(ye, ys[0].Add(opts.Duration)) + + /* + } else { + for i := 0; i <= int(opts.Duration/(time.Hour*24)); i++ { + yt := y.Add(time.Hour * time.Duration(24*i)) + if opts.Debug { + fmt.Println(i) + fmt.Println(yt) + } + ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) + ye = append(ye, ys[i].Add(_24h)) + } } - } + */ // DEBUG if opts.Debug { diff --git a/options.go b/options.go index 23dcc86..0dddbaf 100644 --- a/options.go +++ b/options.go @@ -23,10 +23,11 @@ type Options struct { Hostname string Version bool Debug bool + Bulk bool } func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -I -v\n") + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -i -I -v\n") os.Exit(0) } @@ -48,4 +49,5 @@ func init() { flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") flag.BoolVar(&opts.Version, "v", false, "Version") flag.BoolVar(&opts.Debug, "debug", false, "Debug") + flag.BoolVar(&opts.Bulk, "bulk", false, "Bulk") }