From da68bd79e99ba2aa0bfbac94658b5c6656422ff2 Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 12:05:23 +0200 Subject: [PATCH 01/29] . --- lastlogin_consolidate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 8e5c2d3..deb511e 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v0.5" + _VERSION = "v1.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) From d55ba3051df0060a7876f8880f109b08eee3c37c Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 12:30:28 +0200 Subject: [PATCH 02/29] aggiunte scritture sul file di log ( stampava a video ) --- lastlogin_consolidate.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index deb511e..18935cd 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0" + _VERSION = "v1.0.1" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -22,8 +22,8 @@ const ( var ( opts = Options{ - // MongoSrc: "mongodb://10.39.81.85:27018", - LogFile: "log/llmongo.log", + MongoSrc: "mongodb://127.0.0.1:27018", + LogFile: "log/llconsolidate.log", StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat), Duration: _24h, Interval: _10m, @@ -141,7 +141,6 @@ func main() { } log.SetOutput(fs) - // log.SetPrefix("[llmongo] ") start := time.Now() fmt.Printf("Start: %+v\n", opts) @@ -153,11 +152,11 @@ func main() { y, err := time.Parse(_tformat, opts.StartDate) if err != nil { - fmt.Println("Date Error: ", err) + log.Println("Date Error: ", err) os.Exit(-1) } - fmt.Println(y) + // fmt.Println(y) var ys []time.Time var ye []time.Time @@ -167,9 +166,9 @@ func main() { ye = append(ye, ys[0].Add(opts.Duration)) } else { for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ { - fmt.Println(i) + // fmt.Println(i) yt := y.Add(time.Hour * time.Duration(24*i)) - fmt.Println(yt) + // fmt.Println(yt) ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) ye = append(ye, ys[i].Add(_24h)) } @@ -184,12 +183,13 @@ func main() { q.Distinct("user", &ar) fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) + log.Printf("Date: %s - %s\n", ys[i], ye[i]) for u := range ar { ll := LastLoginDay{} ll.User = ar[u] ll.Date = ys[i] - fmt.Println(ar[u]) + // fmt.Println(ar[u]) nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date") iter := nq.Iter() result := LastLogin{} @@ -213,15 +213,16 @@ func main() { } } if err := iter.Close(); err != nil { - fmt.Println("Iter: ", err) + log.Println("Iter: ", err) } ll.IPs = ips _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { - fmt.Println("Insert error: ", err) + log.Println("Insert error: ", err) } } } fmt.Println("Stop: ", time.Since(start)) + log.Println("Stop: ", time.Since(start)) } From bcf786f39259f6247b839d8c9d267cfeb47aae7b Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 12:33:05 +0200 Subject: [PATCH 03/29] default mongoSrc in localhost:27018 --- lastlogin_consolidate.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 18935cd..f16459d 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.1" + _VERSION = "v1.0.2" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -88,7 +88,7 @@ func init() { opts.LogFile = path.Join(current, opts.LogFile) - flag.StringVar(&opts.MongoSrc, "ms", "", "Mongodb Source") + flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") From dc2e4988be18e104f1ee0197b7d76d123306ad50 Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 13:35:39 +0200 Subject: [PATCH 04/29] stampa il tempo parziale --- lastlogin_consolidate.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index f16459d..8447750 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -174,7 +174,7 @@ func main() { } } - fmt.Println(ys, ye) + // fmt.Println(ys, ye) for i := range ys { q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") @@ -221,6 +221,7 @@ func main() { log.Println("Insert error: ", err) } } + fmt.Printf("Stop %s: %s\n", ys[i], time.Since(start)) } fmt.Println("Stop: ", time.Since(start)) From 58211e5daed517c60019523cbeee7e7512b4ed2d Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 13:36:30 +0200 Subject: [PATCH 05/29] . --- lastlogin_consolidate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 8447750..4678093 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.2" + _VERSION = "v1.0.3" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) From 9c49a04667e02de4b96fb8b43c350485b0efdad3 Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 15:00:46 +0200 Subject: [PATCH 06/29] aggiunto il protcollo nell'array degli IP --- lastlogin_consolidate.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 4678093..2fdce48 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,10 +14,11 @@ import ( ) const ( - _VERSION = "v1.0.3" + _VERSION = "v1.0.4" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) + _15m = (time.Minute * 15) ) var ( @@ -26,7 +27,7 @@ var ( LogFile: "log/llconsolidate.log", StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat), Duration: _24h, - Interval: _10m, + Interval: _15m, } ) @@ -60,8 +61,9 @@ type LastLoginDay struct { } type IPs struct { - IP string `json:"ip"` - Date time.Time `json:"date"` + IP string `json:"ip"` + Date time.Time `json:"date"` + Protocol string `json:"protocol"` } type Protocols struct { @@ -199,7 +201,7 @@ func main() { if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) } else { - ips = append(ips, IPs{IP: result.IP, Date: result.Date}) + ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) lastip.Date = result.Date lastip.IP = result.IP } From 42bb907c477492a79b9dc87511320f632d2a8d27 Mon Sep 17 00:00:00 2001 From: Michele Fadda Date: Tue, 21 Jul 2015 15:45:06 +0200 Subject: [PATCH 07/29] aggiunti i tempi parziali --- lastlogin_consolidate.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 2fdce48..c03687b 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.4" + _VERSION = "v1.0.5" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -179,6 +179,9 @@ func main() { // fmt.Println(ys, ye) for i := range ys { + + pStart := time.Now() + q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") ar := []string{} @@ -223,7 +226,7 @@ func main() { log.Println("Insert error: ", err) } } - fmt.Printf("Stop %s: %s\n", ys[i], time.Since(start)) + fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } fmt.Println("Stop: ", time.Since(start)) From cd673bba2da311a202a8941f5f22f2629d988afd Mon Sep 17 00:00:00 2001 From: Miki Date: Mon, 9 Nov 2015 15:54:05 +0100 Subject: [PATCH 08/29] aggiunta l'opzione di debug --- lastlogin_consolidate.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index c03687b..46e9f80 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.5" + _VERSION = "v1.0.6" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -43,6 +43,7 @@ type Options struct { Interval time.Duration LogFile string Version bool + Debug bool } type LastLogin struct { @@ -78,7 +79,7 @@ type Index struct { } func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -v\n") + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -v\n") os.Exit(0) } @@ -97,6 +98,7 @@ func init() { flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") flag.BoolVar(&opts.Version, "v", false, "Version") + flag.BoolVar(&opts.Debug, "debug", false, "Debug") } func connectMongo() { @@ -158,7 +160,8 @@ func main() { os.Exit(-1) } - // fmt.Println(y) + // DEBUG + //fmt.Printf("Start %+v\n\r", y) var ys []time.Time var ye []time.Time @@ -170,23 +173,38 @@ func main() { for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ { // fmt.Println(i) yt := y.Add(time.Hour * time.Duration(24*i)) - // fmt.Println(yt) + // fmt.Println(yt) ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) ye = append(ye, ys[i].Add(_24h)) } } - // fmt.Println(ys, ye) + // DEBUG + if opts.Debug { + fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) + } for i := range ys { pStart := time.Now() + qStart := time.Now() + q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") + if opts.Debug { + fmt.Printf("Find from date: %s\n\r", time.Since(qStart)) + } + + qStart = time.Now() + ar := []string{} q.Distinct("user", &ar) + if opts.Debug { + fmt.Printf("Distinct: %s\n\r", time.Since(qStart)) + } + fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) log.Printf("Date: %s - %s\n", ys[i], ye[i]) @@ -194,8 +212,15 @@ func main() { ll := LastLoginDay{} ll.User = ar[u] ll.Date = ys[i] - // fmt.Println(ar[u]) + + // DEBUG + if opts.Debug { + fmt.Printf("User: %s\n\r", ar[u]) + } + + qStart = time.Now() nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date") + //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) iter := nq.Iter() result := LastLogin{} ips := []IPs{} @@ -221,6 +246,7 @@ func main() { log.Println("Iter: ", err) } ll.IPs = ips + //fmt.Printf("Upsert %+v\n\r", ll) _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) From c96f963f2152264376a1f434199804335a4c4a10 Mon Sep 17 00:00:00 2001 From: Miki Date: Mon, 9 Nov 2015 16:43:07 +0100 Subject: [PATCH 09/29] impostato ad indefinito il SocketTimeout del mongodb source --- lastlogin_consolidate.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 46e9f80..2c41fe6 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.6" + _VERSION = "v1.0.7" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -107,11 +107,13 @@ func connectMongo() { log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) } var err error + //opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5) opts.mdbSrc, err = mgo.Dial(opts.MongoSrc) if err != nil { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } + opts.mdbSrc.SetSocketTimeout(0) opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin") if opts.MongoDst == "" { From e20784a0d5a26dad63328bfc55a50985b3082336 Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 12 Nov 2015 09:33:06 +0100 Subject: [PATCH 10/29] aggiunto il numero di user consolidati e il totale user da consolidare al file di log --- lastlogin_consolidate.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 2c41fe6..05a7607 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.7" + _VERSION = "v1.0.8" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -186,6 +186,9 @@ func main() { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } + countTOT := 0 + countOK := 0 + for i := range ys { pStart := time.Now() @@ -204,13 +207,14 @@ func main() { q.Distinct("user", &ar) if opts.Debug { - fmt.Printf("Distinct: %s\n\r", time.Since(qStart)) + fmt.Printf("Distinct user: %s\n\r", time.Since(qStart)) } fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) log.Printf("Date: %s - %s\n", ys[i], ye[i]) for u := range ar { + countTOT += 1 ll := LastLoginDay{} ll.User = ar[u] ll.Date = ys[i] @@ -253,10 +257,12 @@ func main() { if err != nil { log.Println("Insert error: ", err) } + countOK += 1 } fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) + log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } - fmt.Println("Stop: ", time.Since(start)) - log.Println("Stop: ", time.Since(start)) + fmt.Printf("Stop: OK: %i - TOT: %i - Time: %s\n\r", countOK, countTOT, time.Since(start)) + log.Printf("Stop: OK: %i - TOT: %i - Time: %s\n\r", countOK, countTOT, time.Since(start)) } From 16fd3dbd697f25fb29da6d1841e8415bcc48762d Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 13 Nov 2015 12:58:09 +0100 Subject: [PATCH 11/29] . --- lastlogin_consolidate.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 05a7607..eca313d 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -14,7 +14,7 @@ import ( ) const ( - _VERSION = "v1.0.8" + _VERSION = "v1.0.9" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -263,6 +263,6 @@ func main() { log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } - fmt.Printf("Stop: OK: %i - TOT: %i - Time: %s\n\r", countOK, countTOT, time.Since(start)) - log.Printf("Stop: OK: %i - TOT: %i - Time: %s\n\r", countOK, countTOT, time.Since(start)) + fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, time.Since(start)) + log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, time.Since(start)) } From a375f8dedff45ee425bae1d4512e9de73c18bc06 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 20 Nov 2015 17:11:36 +0100 Subject: [PATCH 12/29] modificata la struttura del sorgente --- dbs.go | 70 ++++++++++++++++++++++++++ lastlogin_consolidate.go | 105 +-------------------------------------- options.go | 51 +++++++++++++++++++ 3 files changed, 123 insertions(+), 103 deletions(-) create mode 100644 dbs.go create mode 100644 options.go diff --git a/dbs.go b/dbs.go new file mode 100644 index 0000000..e153bae --- /dev/null +++ b/dbs.go @@ -0,0 +1,70 @@ +// dbs +package main + +import ( + "gopkg.in/mgo.v2" + "log" + "os" + "time" +) + +type LastLogin struct { + User string `json: "user"` + Protocol string `json: "protocol"` + IP string `json: "ip"` + Date time.Time `json: "date"` + ID string `json: "_id"` +} + +type LastLoginDay struct { + User string `json:"user"` + Date time.Time `json:"date"` + Protocols Protocols `json:"protocols"` + IPs []IPs `json:"ips"` +} + +type IPs struct { + IP string `json:"ip"` + Date time.Time `json:"date"` + Protocol string `json:"protocol"` +} + +type Protocols struct { + Pop int `json:"pop"` + Imap int `json:"imap"` + Web int `json:"web"` +} + +type Index struct { + User string `json:"user"` + Date time.Time `json:"date"` +} + +func connectMongo() { + + if opts.MongoSrc == "" { + log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) + } + var err error + //opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5) + opts.mdbSrc, err = mgo.Dial(opts.MongoSrc) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + opts.mdbSrc.SetSocketTimeout(0) + opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin") + + if opts.MongoDst == "" { + opts.mdbDst = opts.mdbSrc + opts.lc = opts.mdbSrc.DB("dovecot").C("lastlogin_day") + } else { + opts.mdbDst, err = mgo.Dial(opts.MongoDst) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + opts.lc = opts.mdbDst.DB("dovecot").C("lastlogin_day") + } + +} diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index eca313d..52c9db3 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -4,17 +4,15 @@ package main import ( "flag" "fmt" - "gopkg.in/mgo.v2" + // "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "log" "os" - "path" - "path/filepath" "time" ) const ( - _VERSION = "v1.0.9" + _VERSION = "v1.0.10" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -31,105 +29,6 @@ var ( } ) -type Options struct { - MongoSrc string - MongoDst string - mdbSrc *mgo.Session - mdbDst *mgo.Session - ll *mgo.Collection - lc *mgo.Collection - StartDate string - Duration time.Duration - Interval time.Duration - LogFile string - Version bool - Debug bool -} - -type LastLogin struct { - User string `json: "user"` - Protocol string `json: "protocol"` - IP string `json: "ip"` - Date time.Time `json: "date"` - ID string `json: "_id"` -} - -type LastLoginDay struct { - User string `json:"user"` - Date time.Time `json:"date"` - Protocols Protocols `json:"protocols"` - IPs []IPs `json:"ips"` -} - -type IPs struct { - IP string `json:"ip"` - Date time.Time `json:"date"` - Protocol string `json:"protocol"` -} - -type Protocols struct { - Pop int `json:"pop"` - Imap int `json:"imap"` - Web int `json:"web"` -} - -type Index struct { - User string `json:"user"` - Date time.Time `json:"date"` -} - -func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -v\n") - os.Exit(0) -} - -func init() { - current, err := filepath.Abs(filepath.Dir(os.Args[0])) - if err != nil { - log.Fatal(err) - } - - opts.LogFile = path.Join(current, opts.LogFile) - - flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") - flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") - flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") - flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") - flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") - flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") - flag.BoolVar(&opts.Version, "v", false, "Version") - flag.BoolVar(&opts.Debug, "debug", false, "Debug") -} - -func connectMongo() { - - if opts.MongoSrc == "" { - log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) - } - var err error - //opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5) - opts.mdbSrc, err = mgo.Dial(opts.MongoSrc) - if err != nil { - log.Println("Mongodb connect Error: ", err.Error()) - os.Exit(-3) - } - opts.mdbSrc.SetSocketTimeout(0) - opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin") - - if opts.MongoDst == "" { - opts.mdbDst = opts.mdbSrc - opts.lc = opts.mdbSrc.DB("dovecot").C("lastlogin_day") - } else { - opts.mdbDst, err = mgo.Dial(opts.MongoDst) - if err != nil { - log.Println("Mongodb connect Error: ", err.Error()) - os.Exit(-3) - } - opts.lc = opts.mdbDst.DB("dovecot").C("lastlogin_day") - } - -} - func main() { flag.Usage = usage diff --git a/options.go b/options.go new file mode 100644 index 0000000..88b5043 --- /dev/null +++ b/options.go @@ -0,0 +1,51 @@ +// options +package main + +import ( + "flag" + "fmt" + "gopkg.in/mgo.v2" + "log" + "os" + "path" + "path/filepath" + "time" +) + +type Options struct { + MongoSrc string + MongoDst string + mdbSrc *mgo.Session + mdbDst *mgo.Session + ll *mgo.Collection + lc *mgo.Collection + StartDate string + Duration time.Duration + Interval time.Duration + LogFile string + Version bool + Debug bool +} + +func usage() { + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -v\n") + os.Exit(0) +} + +func init() { + current, err := filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(current, opts.LogFile) + + flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") + flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") + flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") + flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.BoolVar(&opts.Debug, "debug", false, "Debug") +} From 0de9b415b81e1a1ef2a68e1e5820cc6e4a74ed4d Mon Sep 17 00:00:00 2001 From: Miki Date: Wed, 25 Nov 2015 15:08:29 +0100 Subject: [PATCH 13/29] ottimizzata la query per l'estrazione degli utenti --- aggregate.go | 74 +++++++++++++++++++++++++++++++++++++++ dbs.go | 4 +++ lastlogin_consolidate.go | 75 +++------------------------------------- 3 files changed, 83 insertions(+), 70 deletions(-) create mode 100644 aggregate.go diff --git a/aggregate.go b/aggregate.go new file mode 100644 index 0000000..c5c93c6 --- /dev/null +++ b/aggregate.go @@ -0,0 +1,74 @@ +// aggregate +package main + +import ( + "fmt" + "gopkg.in/mgo.v2/bson" + "log" + "time" +) + +func aggregate(ys time.Time, ye time.Time) { + + qStart := time.Now() + + p := opts.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) + + if opts.Debug { + fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + } + + fmt.Printf("Date: %s - %s\n", ys, ye) + log.Printf("Date: %s - %s\n", ys, ye) + + ar := Users{} + it := p.Iter() + + for it.Next(&ar) { + countTOT += 1 + ll := LastLoginDay{} + ll.User = ar.User + ll.Date = ys + + // DEBUG + if opts.Debug { + fmt.Printf("User: %s\n\r", ar.User) + } + + qStart = time.Now() + nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") + //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) + iter := nq.Iter() + result := LastLogin{} + ips := []IPs{} + lastip := IPs{} + for iter.Next(&result) { + if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { + //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) + } else { + ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) + lastip.Date = result.Date + lastip.IP = result.IP + } + switch result.Protocol { + case "pop3", "pop": + ll.Protocols.Pop += 1 + case "imap": + ll.Protocols.Imap += 1 + case "web": + ll.Protocols.Web += 1 + } + } + if err := iter.Close(); err != nil { + log.Println("Iter: ", err) + } + ll.IPs = ips + //fmt.Printf("Upsert %+v\n\r", ll) + _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + if err != nil { + log.Println("Insert error: ", err) + } + countOK += 1 + } + +} diff --git a/dbs.go b/dbs.go index e153bae..95fa063 100644 --- a/dbs.go +++ b/dbs.go @@ -40,6 +40,10 @@ type Index struct { Date time.Time `json:"date"` } +type Users struct { + User string `json:"user"` +} + func connectMongo() { if opts.MongoSrc == "" { diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 52c9db3..c2a9869 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -5,14 +5,14 @@ import ( "flag" "fmt" // "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" + // "gopkg.in/mgo.v2/bson" "log" "os" "time" ) const ( - _VERSION = "v1.0.10" + _VERSION = "v1.1.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -27,6 +27,8 @@ var ( Duration: _24h, Interval: _15m, } + countTOT = 0 + countOK = 0 ) func main() { @@ -85,79 +87,12 @@ func main() { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } - countTOT := 0 - countOK := 0 - for i := range ys { pStart := time.Now() - qStart := time.Now() + aggregate(ys[i], ye[i]) - q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") - - if opts.Debug { - fmt.Printf("Find from date: %s\n\r", time.Since(qStart)) - } - - qStart = time.Now() - - ar := []string{} - q.Distinct("user", &ar) - - if opts.Debug { - fmt.Printf("Distinct user: %s\n\r", time.Since(qStart)) - } - - fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) - log.Printf("Date: %s - %s\n", ys[i], ye[i]) - - for u := range ar { - countTOT += 1 - ll := LastLoginDay{} - ll.User = ar[u] - ll.Date = ys[i] - - // DEBUG - if opts.Debug { - fmt.Printf("User: %s\n\r", ar[u]) - } - - qStart = time.Now() - nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date") - //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) - iter := nq.Iter() - result := LastLogin{} - ips := []IPs{} - lastip := IPs{} - for iter.Next(&result) { - if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { - //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) - } else { - ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) - lastip.Date = result.Date - lastip.IP = result.IP - } - switch result.Protocol { - case "pop3", "pop": - ll.Protocols.Pop += 1 - case "imap": - ll.Protocols.Imap += 1 - case "web": - ll.Protocols.Web += 1 - } - } - if err := iter.Close(); err != nil { - log.Println("Iter: ", err) - } - ll.IPs = ips - //fmt.Printf("Upsert %+v\n\r", ll) - _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - if err != nil { - log.Println("Insert error: ", err) - } - countOK += 1 - } fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } From 494696c336470677ecc3235c471a6b23c26ab4a6 Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 26 Nov 2015 09:19:16 +0100 Subject: [PATCH 14/29] creata una struttura DBS per contenere i puntatori delle connessioni al DB --- aggregate.go | 6 +++--- dbs.go | 21 ++++++++++++++------- lastlogin_consolidate.go | 9 ++++++--- options.go | 6 +----- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/aggregate.go b/aggregate.go index c5c93c6..9c015b8 100644 --- a/aggregate.go +++ b/aggregate.go @@ -12,7 +12,7 @@ func aggregate(ys time.Time, ye time.Time) { qStart := time.Now() - p := opts.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) + p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) if opts.Debug { fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) @@ -36,7 +36,7 @@ func aggregate(ys time.Time, ye time.Time) { } qStart = time.Now() - nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") + nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) iter := nq.Iter() result := LastLogin{} @@ -64,7 +64,7 @@ func aggregate(ys time.Time, ye time.Time) { } ll.IPs = ips //fmt.Printf("Upsert %+v\n\r", ll) - _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } diff --git a/dbs.go b/dbs.go index 95fa063..6302a87 100644 --- a/dbs.go +++ b/dbs.go @@ -8,6 +8,13 @@ import ( "time" ) +type Dbs struct { + mdbSrc *mgo.Session + mdbDst *mgo.Session + ll *mgo.Collection + lc *mgo.Collection +} + type LastLogin struct { User string `json: "user"` Protocol string `json: "protocol"` @@ -51,24 +58,24 @@ func connectMongo() { } var err error //opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5) - opts.mdbSrc, err = mgo.Dial(opts.MongoSrc) + dbs.mdbSrc, err = mgo.Dial(opts.MongoSrc) if err != nil { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } - opts.mdbSrc.SetSocketTimeout(0) - opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin") + dbs.mdbSrc.SetSocketTimeout(0) + dbs.ll = dbs.mdbSrc.DB("dovecot").C("lastlogin") if opts.MongoDst == "" { - opts.mdbDst = opts.mdbSrc - opts.lc = opts.mdbSrc.DB("dovecot").C("lastlogin_day") + dbs.mdbDst = dbs.mdbSrc + dbs.lc = dbs.mdbSrc.DB("dovecot").C("lastlogin_day") } else { - opts.mdbDst, err = mgo.Dial(opts.MongoDst) + dbs.mdbDst, err = mgo.Dial(opts.MongoDst) if err != nil { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } - opts.lc = opts.mdbDst.DB("dovecot").C("lastlogin_day") + dbs.lc = dbs.mdbDst.DB("dovecot").C("lastlogin_day") } } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index c2a9869..b80d85f 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.1.0" + _VERSION = "v1.1.1" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -27,6 +27,9 @@ var ( Duration: _24h, Interval: _15m, } + + dbs = Dbs{} + countTOT = 0 countOK = 0 ) @@ -54,8 +57,8 @@ func main() { log.Printf("Start: %+v\n", opts) connectMongo() - defer opts.mdbSrc.Close() - defer opts.mdbDst.Clone() + defer dbs.mdbSrc.Close() + defer dbs.mdbDst.Clone() y, err := time.Parse(_tformat, opts.StartDate) if err != nil { diff --git a/options.go b/options.go index 88b5043..975a6f1 100644 --- a/options.go +++ b/options.go @@ -4,7 +4,7 @@ package main import ( "flag" "fmt" - "gopkg.in/mgo.v2" + // "gopkg.in/mgo.v2" "log" "os" "path" @@ -15,10 +15,6 @@ import ( type Options struct { MongoSrc string MongoDst string - mdbSrc *mgo.Session - mdbDst *mgo.Session - ll *mgo.Collection - lc *mgo.Collection StartDate string Duration time.Duration Interval time.Duration From a03057fe819da3e6d9da76af8c1e85bcf458e303 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 19 Feb 2016 10:48:55 +0100 Subject: [PATCH 15/29] write stats to influxdb --- lastlogin_consolidate.go | 20 +++++++++++++++++--- options.go | 6 +++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index b80d85f..16d2648 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.1.1" + _VERSION = "v1.2.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -44,6 +44,14 @@ func main() { os.Exit(0) } + if opts.Hostname == "" { + var err error + opts.Hostname, err = os.Hostname() + if err != nil { + fmt.Println("Hostname error: ", err.Error()) + } + } + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) @@ -100,6 +108,12 @@ func main() { log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } - fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, time.Since(start)) - log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, time.Since(start)) + stop := time.Since(start) + + fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, stop) + log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, stop) + + if opts.Influxdb != "" { + writeStats(start, stop) + } } diff --git a/options.go b/options.go index 975a6f1..23dcc86 100644 --- a/options.go +++ b/options.go @@ -19,12 +19,14 @@ type Options struct { Duration time.Duration Interval time.Duration LogFile string + Influxdb string + Hostname string Version bool Debug bool } func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -v\n") + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -I -v\n") os.Exit(0) } @@ -38,6 +40,8 @@ func init() { flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") + flag.StringVar(&opts.Influxdb, "I", "", "Influxdb uri") + flag.StringVar(&opts.Hostname, "H", "", "Hostname") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") From 1ebc85d1c3df2e2eb7c2411df03a0d3f4979fd72 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 25 Mar 2016 10:15:19 +0100 Subject: [PATCH 16/29] aggiunto timeout per la connessione con influxdb --- influxdb.go | 51 ++++++++++++++++++++++++++++++++++++++++ lastlogin_consolidate.go | 2 +- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 influxdb.go diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..e9daead --- /dev/null +++ b/influxdb.go @@ -0,0 +1,51 @@ +// influxdb +package main + +import ( + "fmt" + "time" + + influxdb "github.com/influxdata/influxdb/client/v2" +) + +func writeStats(start time.Time, stop time.Duration) { + if opts.Debug { + fmt.Printf("writing to influxdb server: %s", opts.Influxdb) + } + + c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: opts.Influxdb, + Timeout: 2 * time.Second, + }) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + defer c.Close() + + bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ + Database: "dovecot", + Precision: "s", + }) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + + tags := map[string]string{"server": opts.Hostname} + fields := map[string]interface{}{ + "LoginOK": countOK, + "LoginTOT": countTOT, + "stop": stop.Seconds(), + } + pt, err := influxdb.NewPoint("llday", tags, fields, start) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + + bp.AddPoint(pt) + + // Write the batch + c.Write(bp) +} diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 16d2648..226b50b 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.2.0" + _VERSION = "v1.2.1" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) From 34e86dd7fd6a861a331684118396c00468f81fcb Mon Sep 17 00:00:00 2001 From: Miki Date: Wed, 6 Apr 2016 12:05:40 +0200 Subject: [PATCH 17/29] aggiunti parametri per il monitoraggio --- aggregate.go | 22 +++++++++++++++++++--- influxdb.go | 20 ++++++++++++++++---- lastlogin_consolidate.go | 18 +++++++++++------- 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/aggregate.go b/aggregate.go index 9c015b8..64c31bb 100644 --- a/aggregate.go +++ b/aggregate.go @@ -3,9 +3,10 @@ package main import ( "fmt" - "gopkg.in/mgo.v2/bson" "log" "time" + + "gopkg.in/mgo.v2/bson" ) func aggregate(ys time.Time, ye time.Time) { @@ -14,6 +15,7 @@ func aggregate(ys time.Time, ye time.Time) { p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) + idb.Pipe = time.Since(qStart) if opts.Debug { fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) } @@ -25,7 +27,7 @@ func aggregate(ys time.Time, ye time.Time) { it := p.Iter() for it.Next(&ar) { - countTOT += 1 + idb.CountTOT += 1 ll := LastLoginDay{} ll.User = ar.User ll.Date = ys @@ -39,6 +41,13 @@ func aggregate(ys time.Time, ye time.Time) { nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) iter := nq.Iter() + + if idb.Find == 0 { + idb.Find = time.Since(qStart) + } else { + idb.Find = (idb.Find + time.Since(qStart)) / 2 + } + result := LastLogin{} ips := []IPs{} lastip := IPs{} @@ -64,11 +73,18 @@ func aggregate(ys time.Time, ye time.Time) { } ll.IPs = ips //fmt.Printf("Upsert %+v\n\r", ll) + iStart := time.Now() _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } - countOK += 1 + if idb.Insert == 0 { + idb.Insert = time.Since(iStart) + } else { + idb.Insert = (idb.Insert + time.Since(iStart)) / 2 + } + + idb.CountOK += 1 } } diff --git a/influxdb.go b/influxdb.go index e9daead..8590fdd 100644 --- a/influxdb.go +++ b/influxdb.go @@ -8,7 +8,16 @@ import ( influxdb "github.com/influxdata/influxdb/client/v2" ) -func writeStats(start time.Time, stop time.Duration) { +type InfluxdbOutput struct { + CountOK int + CountTOT int + Stop time.Duration + Pipe time.Duration + Find time.Duration + Insert time.Duration +} + +func writeStats(start time.Time) { if opts.Debug { fmt.Printf("writing to influxdb server: %s", opts.Influxdb) } @@ -34,9 +43,12 @@ func writeStats(start time.Time, stop time.Duration) { tags := map[string]string{"server": opts.Hostname} fields := map[string]interface{}{ - "LoginOK": countOK, - "LoginTOT": countTOT, - "stop": stop.Seconds(), + "LoginOK": idb.CountOK, + "LoginTOT": idb.CountTOT, + "stop": idb.Stop.Seconds(), + "pipe": idb.Pipe.Seconds(), + "find": idb.Find.Seconds(), + "insert": idb.Insert.Seconds(), } pt, err := influxdb.NewPoint("llday", tags, fields, start) if err != nil { diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 226b50b..75f4402 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.2.1" + _VERSION = "v1.2.2" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -30,8 +30,12 @@ var ( dbs = Dbs{} - countTOT = 0 - countOK = 0 + idb = InfluxdbOutput{ + CountTOT: 0, + CountOK: 0, + Insert: 0, + Find: 0, + } ) func main() { @@ -108,12 +112,12 @@ func main() { log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } - stop := time.Since(start) + idb.Stop = time.Since(start) - fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, stop) - log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, stop) + fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) + log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) if opts.Influxdb != "" { - writeStats(start, stop) + writeStats(start) } } From 3078fe215da4c4d65c3c148d648685b6e7fd1783 Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 7 Apr 2016 17:44:53 +0200 Subject: [PATCH 18/29] return nanoseconds --- influxdb.go | 6 +++--- lastlogin_consolidate.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/influxdb.go b/influxdb.go index 8590fdd..b56eb9f 100644 --- a/influxdb.go +++ b/influxdb.go @@ -46,9 +46,9 @@ func writeStats(start time.Time) { "LoginOK": idb.CountOK, "LoginTOT": idb.CountTOT, "stop": idb.Stop.Seconds(), - "pipe": idb.Pipe.Seconds(), - "find": idb.Find.Seconds(), - "insert": idb.Insert.Seconds(), + "pipe": idb.Pipe.Nanoseconds(), + "find": idb.Find.Nanoseconds(), + "insert": idb.Insert.Nanoseconds(), } pt, err := influxdb.NewPoint("llday", tags, fields, start) if err != nil { diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 75f4402..4da5df9 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.2.2" + _VERSION = "v1.2.3" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) From c0d5972a4487da3f42b4772cac535893b21b0930 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 15 Apr 2016 15:28:52 +0200 Subject: [PATCH 19/29] primo test --- aggregate.go | 141 +++++++++++++++++++++++++++------------ dbs.go | 12 +++- lastlogin_consolidate.go | 31 +++++---- options.go | 4 +- 4 files changed, 129 insertions(+), 59 deletions(-) diff --git a/aggregate.go b/aggregate.go index 64c31bb..033fa39 100644 --- a/aggregate.go +++ b/aggregate.go @@ -4,62 +4,60 @@ package main import ( "fmt" "log" + // "sort" "time" "gopkg.in/mgo.v2/bson" ) -func aggregate(ys time.Time, ye time.Time) { +func consolidate(ys time.Time, ye time.Time) { - qStart := time.Now() + limit := 10000 - p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) + pre := dbs.mdbDst.DB("dovecot").C("pre_lastlogin_day") + + query := pre.Find(bson.M{}) - idb.Pipe = time.Since(qStart) if opts.Debug { - fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + tot, _ := query.Count() + fmt.Printf("Users: %d\n", tot) } - fmt.Printf("Date: %s - %s\n", ys, ye) - log.Printf("Date: %s - %s\n", ys, ye) + query.Batch(limit) + iter := query.Iter() - ar := Users{} - it := p.Iter() + if opts.Bulk { + dbs.bulk = dbs.lc.Bulk() + dbs.bulk.Unordered() + } - for it.Next(&ar) { + result := Users{} + + for iter.Next(&result) { idb.CountTOT += 1 ll := LastLoginDay{} - ll.User = ar.User + ll.User = result.User ll.Date = ys // DEBUG - if opts.Debug { - fmt.Printf("User: %s\n\r", ar.User) - } - qStart = time.Now() - nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") - //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) - iter := nq.Iter() + logins := result.Logins - if idb.Find == 0 { - idb.Find = time.Since(qStart) - } else { - idb.Find = (idb.Find + time.Since(qStart)) / 2 - } - - result := LastLogin{} ips := []IPs{} lastip := IPs{} - for iter.Next(&result) { - if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { - //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) + for l := range logins { + if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { + /* + if opts.Debug { + fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) + } + */ } else { - ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) - lastip.Date = result.Date - lastip.IP = result.IP + ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) + lastip.Date = logins[l].Date + lastip.IP = logins[l].IP } - switch result.Protocol { + switch logins[l].Protocol { case "pop3", "pop": ll.Protocols.Pop += 1 case "imap": @@ -68,23 +66,80 @@ func aggregate(ys time.Time, ye time.Time) { ll.Protocols.Web += 1 } } - if err := iter.Close(); err != nil { - log.Println("Iter: ", err) - } ll.IPs = ips - //fmt.Printf("Upsert %+v\n\r", ll) + + /* + if opts.Debug { + fmt.Printf("Insert %+v\n\r", ll) + } + */ + iStart := time.Now() - _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - if err != nil { - log.Println("Insert error: ", err) - } - if idb.Insert == 0 { - idb.Insert = time.Since(iStart) + + if opts.Bulk { + //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + dbs.bulk.Insert(ll) } else { - idb.Insert = (idb.Insert + time.Since(iStart)) / 2 + info, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + if err != nil { + log.Println("Insert error: ", err) + continue + } + fmt.Printf("Change: %+v\n", info) } + idb.Insert += time.Since(iStart) idb.CountOK += 1 } + if opts.Bulk { + res, err := dbs.bulk.Run() + if err != nil { + log.Println("Insert error: ", err) + } + fmt.Printf("Bulk res: %+v\n", res) + } + + if opts.Debug { + fmt.Printf("Insert: %d\n", idb.CountOK) + } + + pre.DropCollection() +} + +func aggregate(ys time.Time, ye time.Time) { + + qStart := time.Now() + + p := dbs.ll.Pipe([]bson.M{ + {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, + {"$group": bson.M{"_id": "$user"}}, + {"$project": bson.M{"user": "$_id"}}}) + + /* + p := dbs.ll.Pipe([]bson.M{ + {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, + // {"$sort": bson.M{"user": -1, "date": 1}}, + {"$group": bson.M{"_id": "$user", + "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}) + // {"$sort": bson.M{"_id": -1}}, + // {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}, + // {"$out": "pre_lastlogin_day"}}) + */ + + idb.Pipe = idb.Pipe + time.Since(qStart) + if opts.Debug { + res := new(interface{}) + err := p.Explain(res) + fmt.Printf("Pipe: %+v\nErr: %+v\n", *res, err) + res = new(interface{}) + err = p.One(res) + fmt.Printf("user: %+v\nErr: %+v\n", *res, err) + fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + } + + consolidate(ys, ye) + + fmt.Printf("Date: %s - %s\n", ys, ye) + log.Printf("Date: %s - %s\n", ys, ye) } diff --git a/dbs.go b/dbs.go index 6302a87..6d5239b 100644 --- a/dbs.go +++ b/dbs.go @@ -2,10 +2,11 @@ package main import ( - "gopkg.in/mgo.v2" "log" "os" "time" + + "gopkg.in/mgo.v2" ) type Dbs struct { @@ -13,6 +14,7 @@ type Dbs struct { mdbDst *mgo.Session ll *mgo.Collection lc *mgo.Collection + bulk *mgo.Bulk } type LastLogin struct { @@ -47,9 +49,16 @@ type Index struct { Date time.Time `json:"date"` } +type Users struct { + User string `json:"user"` + Logins []IPs `json:"ips"` +} + +/* type Users struct { User string `json:"user"` } +*/ func connectMongo() { @@ -77,5 +86,4 @@ func connectMongo() { } dbs.lc = dbs.mdbDst.DB("dovecot").C("lastlogin_day") } - } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 4da5df9..cb3b481 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.2.3" + _VERSION = "v1.3.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -70,7 +70,7 @@ func main() { connectMongo() defer dbs.mdbSrc.Close() - defer dbs.mdbDst.Clone() + defer dbs.mdbDst.Close() y, err := time.Parse(_tformat, opts.StartDate) if err != nil { @@ -84,18 +84,23 @@ func main() { var ys []time.Time var ye []time.Time - if opts.Duration <= (time.Hour * 24) { - ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[0].Add(opts.Duration)) - } else { - for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ { - // fmt.Println(i) - yt := y.Add(time.Hour * time.Duration(24*i)) - // fmt.Println(yt) - ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[i].Add(_24h)) + // if opts.Duration <= (time.Hour * 24) { + ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) + ye = append(ye, ys[0].Add(opts.Duration)) + + /* + } else { + for i := 0; i <= int(opts.Duration/(time.Hour*24)); i++ { + yt := y.Add(time.Hour * time.Duration(24*i)) + if opts.Debug { + fmt.Println(i) + fmt.Println(yt) + } + ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) + ye = append(ye, ys[i].Add(_24h)) + } } - } + */ // DEBUG if opts.Debug { diff --git a/options.go b/options.go index 23dcc86..0dddbaf 100644 --- a/options.go +++ b/options.go @@ -23,10 +23,11 @@ type Options struct { Hostname string Version bool Debug bool + Bulk bool } func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -I -v\n") + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -i -I -v\n") os.Exit(0) } @@ -48,4 +49,5 @@ func init() { flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") flag.BoolVar(&opts.Version, "v", false, "Version") flag.BoolVar(&opts.Debug, "debug", false, "Debug") + flag.BoolVar(&opts.Bulk, "bulk", false, "Bulk") } From 0eca19256c2f1e6cafc6fee09b5f69171e2c5732 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 15 Apr 2016 17:31:00 +0200 Subject: [PATCH 20/29] skip+limit test --- aggregate.go | 94 +++++++++++++++++----------------------- lastlogin_consolidate.go | 1 + options.go | 1 + 3 files changed, 42 insertions(+), 54 deletions(-) diff --git a/aggregate.go b/aggregate.go index 033fa39..e5ce833 100644 --- a/aggregate.go +++ b/aggregate.go @@ -10,38 +10,30 @@ import ( "gopkg.in/mgo.v2/bson" ) -func consolidate(ys time.Time, ye time.Time) { - - limit := 10000 - - pre := dbs.mdbDst.DB("dovecot").C("pre_lastlogin_day") - - query := pre.Find(bson.M{}) - - if opts.Debug { - tot, _ := query.Count() - fmt.Printf("Users: %d\n", tot) +func bulkWrite() { + res, err := dbs.bulk.Run() + if err != nil { + log.Println("Insert error: ", err) } + fmt.Printf("Bulk res: %+v\n", res) +} - query.Batch(limit) - iter := query.Iter() +func consolidate(users []Users, ys time.Time, ye time.Time) { if opts.Bulk { dbs.bulk = dbs.lc.Bulk() dbs.bulk.Unordered() } - result := Users{} - - for iter.Next(&result) { + for _, user := range users { idb.CountTOT += 1 ll := LastLoginDay{} - ll.User = result.User + ll.User = user.User ll.Date = ys // DEBUG - logins := result.Logins + logins := user.Logins ips := []IPs{} lastip := IPs{} @@ -68,12 +60,6 @@ func consolidate(ys time.Time, ye time.Time) { } ll.IPs = ips - /* - if opts.Debug { - fmt.Printf("Insert %+v\n\r", ll) - } - */ - iStart := time.Now() if opts.Bulk { @@ -93,53 +79,53 @@ func consolidate(ys time.Time, ye time.Time) { } if opts.Bulk { - res, err := dbs.bulk.Run() - if err != nil { - log.Println("Insert error: ", err) - } - fmt.Printf("Bulk res: %+v\n", res) + bulkWrite() } if opts.Debug { - fmt.Printf("Insert: %d\n", idb.CountOK) + fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert) } - pre.DropCollection() + // pre.DropCollection() } func aggregate(ys time.Time, ye time.Time) { - qStart := time.Now() + skip := 0 + limit := 100000 - p := dbs.ll.Pipe([]bson.M{ - {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, - {"$group": bson.M{"_id": "$user"}}, - {"$project": bson.M{"user": "$_id"}}}) + for { + + qStart := time.Now() - /* p := dbs.ll.Pipe([]bson.M{ {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, - // {"$sort": bson.M{"user": -1, "date": 1}}, + {"$sort": bson.M{"user": -1, "date": 1}}, {"$group": bson.M{"_id": "$user", - "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}) - // {"$sort": bson.M{"_id": -1}}, - // {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}, - // {"$out": "pre_lastlogin_day"}}) - */ + "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}, + {"$sort": bson.M{"_id": -1}}, + {"$skip": skip}, + {"$limit": limit}, + {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}}).AllowDiskUse() - idb.Pipe = idb.Pipe + time.Since(qStart) - if opts.Debug { - res := new(interface{}) - err := p.Explain(res) - fmt.Printf("Pipe: %+v\nErr: %+v\n", *res, err) - res = new(interface{}) - err = p.One(res) - fmt.Printf("user: %+v\nErr: %+v\n", *res, err) - fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + result := make([]Users, limit) + p.All(&result) + if len(result) <= 0 { + break + } + + if opts.Debug { + last := len(result) - 1 + fmt.Printf("Res %d: %d - first: %+v - last: %+v - time: %+v\n", skip, len(result), result[0].User, result[last].User, time.Since(qStart)) + } + + idb.Pipe = idb.Pipe + time.Since(qStart) + + consolidate(result, ys, ye) + + skip += limit } - consolidate(ys, ye) - fmt.Printf("Date: %s - %s\n", ys, ye) log.Printf("Date: %s - %s\n", ys, ye) } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index cb3b481..379992c 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -26,6 +26,7 @@ var ( StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat), Duration: _24h, Interval: _15m, + Batch: 10000, } dbs = Dbs{} diff --git a/options.go b/options.go index 0dddbaf..e696e42 100644 --- a/options.go +++ b/options.go @@ -24,6 +24,7 @@ type Options struct { Version bool Debug bool Bulk bool + Batch int } func usage() { From e3d7b1dae5cdd6763c1f6223e99446ce5d90de8d Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 21 Apr 2016 12:36:06 +0200 Subject: [PATCH 21/29] implementata la scrittura Bulk e lo split dell'aggregato per username --- aggregate.go | 135 ++++++++++++++++++++++++--------------------------- dbs.go | 38 +++++++-------- 2 files changed, 82 insertions(+), 91 deletions(-) diff --git a/aggregate.go b/aggregate.go index e5ce833..c74c977 100644 --- a/aggregate.go +++ b/aggregate.go @@ -11,119 +11,110 @@ import ( ) func bulkWrite() { - res, err := dbs.bulk.Run() + _, err := dbs.bulk.Run() if err != nil { log.Println("Insert error: ", err) } - fmt.Printf("Bulk res: %+v\n", res) + //fmt.Printf("Bulk res: %+v\n", res) } -func consolidate(users []Users, ys time.Time, ye time.Time) { +func consolidate(user Users, ys time.Time, ye time.Time) { if opts.Bulk { dbs.bulk = dbs.lc.Bulk() dbs.bulk.Unordered() } - for _, user := range users { - idb.CountTOT += 1 - ll := LastLoginDay{} - ll.User = user.User - ll.Date = ys + idb.CountTOT += 1 + ll := LastLoginDay{} + ll.User = user.User + ll.Date = ys - // DEBUG + // DEBUG - logins := user.Logins + logins := user.Logins - ips := []IPs{} - lastip := IPs{} - for l := range logins { - if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { - /* - if opts.Debug { - fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) - } - */ - } else { - ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) - lastip.Date = logins[l].Date - lastip.IP = logins[l].IP - } - switch logins[l].Protocol { - case "pop3", "pop": - ll.Protocols.Pop += 1 - case "imap": - ll.Protocols.Imap += 1 - case "web": - ll.Protocols.Web += 1 - } - } - ll.IPs = ips - - iStart := time.Now() - - if opts.Bulk { - //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - dbs.bulk.Insert(ll) + ips := []IPs{} + lastip := IPs{} + for l := range logins { + if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { + /* + if opts.Debug { + fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) + } + */ } else { - info, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - if err != nil { - log.Println("Insert error: ", err) - continue - } - fmt.Printf("Change: %+v\n", info) + ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) + lastip.Date = logins[l].Date + lastip.IP = logins[l].IP + } + switch logins[l].Protocol { + case "pop3", "pop": + ll.Protocols.Pop += 1 + case "imap": + ll.Protocols.Imap += 1 + case "web": + ll.Protocols.Web += 1 } - idb.Insert += time.Since(iStart) - - idb.CountOK += 1 } + ll.IPs = ips + + iStart := time.Now() + + if opts.Bulk { + //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + dbs.bulk.Insert(ll) + } else { + _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + if err != nil { + log.Println("Insert error: ", err) + } + // fmt.Printf("Change: %+v\n", info) + } + idb.Insert += time.Since(iStart) + + idb.CountOK += 1 if opts.Bulk { bulkWrite() } - if opts.Debug { - fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert) - } - - // pre.DropCollection() + // if opts.Debug { + // fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert) + // } } func aggregate(ys time.Time, ye time.Time) { - skip := 0 - limit := 100000 + groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"} - for { + for g := range groups { qStart := time.Now() p := dbs.ll.Pipe([]bson.M{ - {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, + {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, + "user": bson.RegEx{"^" + groups[g], ""}}}, {"$sort": bson.M{"user": -1, "date": 1}}, {"$group": bson.M{"_id": "$user", - "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}, - {"$sort": bson.M{"_id": -1}}, - {"$skip": skip}, - {"$limit": limit}, - {"$project": bson.M{"user": "$_id", "_id": 0, "logins": 1}}}).AllowDiskUse() + "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse() - result := make([]Users, limit) - p.All(&result) - if len(result) <= 0 { - break + iter := p.Iter() + defer iter.Close() + + var result Users + for iter.Next(&result) { + consolidate(result, ys, ye) } if opts.Debug { - last := len(result) - 1 - fmt.Printf("Res %d: %d - first: %+v - last: %+v - time: %+v\n", skip, len(result), result[0].User, result[last].User, time.Since(qStart)) + fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart)) } + // p.All(&result) + idb.Pipe = idb.Pipe + time.Since(qStart) - consolidate(result, ys, ye) - - skip += limit } fmt.Printf("Date: %s - %s\n", ys, ye) diff --git a/dbs.go b/dbs.go index 6d5239b..64864d4 100644 --- a/dbs.go +++ b/dbs.go @@ -18,40 +18,40 @@ type Dbs struct { } type LastLogin struct { - User string `json: "user"` - Protocol string `json: "protocol"` - IP string `json: "ip"` - Date time.Time `json: "date"` - ID string `json: "_id"` + User string `json: "user" bson:"user"` + Protocol string `json: "protocol" bson:"protocol"` + IP string `json: "ip" bson:"ip"` + Date time.Time `json: "date" bson:"date"` + ID string `json: "_id" bson:"_id"` } type LastLoginDay struct { - User string `json:"user"` - Date time.Time `json:"date"` - Protocols Protocols `json:"protocols"` - IPs []IPs `json:"ips"` + User string `json:"user" bson:"user"` + Date time.Time `json:"date" bson:"date"` + Protocols Protocols `json:"protocols" bson:"protocols"` + IPs []IPs `json:"ips" bson:"ips"` } type IPs struct { - IP string `json:"ip"` - Date time.Time `json:"date"` - Protocol string `json:"protocol"` + IP string `json:"ip" bson:"ip"` + Date time.Time `json:"date" bson:"date"` + Protocol string `json:"protocol" bson:"protocol"` } type Protocols struct { - Pop int `json:"pop"` - Imap int `json:"imap"` - Web int `json:"web"` + Pop int `json:"pop" bson:"pop"` + Imap int `json:"imap" bson:"imap"` + Web int `json:"web" bson:"web"` } type Index struct { - User string `json:"user"` - Date time.Time `json:"date"` + User string `json:"user" bson:"user"` + Date time.Time `json:"date" bson:"date"` } type Users struct { - User string `json:"user"` - Logins []IPs `json:"ips"` + User string `json:"_id" bson:"_id"` + Logins []IPs `json:"logins" bson:"logins"` } /* From 2e53d3fb4f1c6c05c4dd2ce417e5481f36a3f7c5 Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 21 Apr 2016 16:11:27 +0200 Subject: [PATCH 22/29] spedisce a influxdb la data --- influxdb.go | 6 ++++-- lastlogin_consolidate.go | 30 ++++++++++++++++++------------ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/influxdb.go b/influxdb.go index b56eb9f..1691396 100644 --- a/influxdb.go +++ b/influxdb.go @@ -11,13 +11,14 @@ import ( type InfluxdbOutput struct { CountOK int CountTOT int + Start time.Time Stop time.Duration Pipe time.Duration Find time.Duration Insert time.Duration } -func writeStats(start time.Time) { +func writeStats(start time.Time, ys time.Time) { if opts.Debug { fmt.Printf("writing to influxdb server: %s", opts.Influxdb) } @@ -41,10 +42,11 @@ func writeStats(start time.Time) { return } - tags := map[string]string{"server": opts.Hostname} + tags := map[string]string{"server": opts.Hostname, "date": ys.String()} fields := map[string]interface{}{ "LoginOK": idb.CountOK, "LoginTOT": idb.CountTOT, + "start": ys, "stop": idb.Stop.Seconds(), "pipe": idb.Pipe.Nanoseconds(), "find": idb.Find.Nanoseconds(), diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 379992c..ff2adaf 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.3.0" + _VERSION = "v1.3.1" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -82,12 +82,14 @@ func main() { // DEBUG //fmt.Printf("Start %+v\n\r", y) - var ys []time.Time - var ye []time.Time + // var ys []time.Time + // var ye []time.Time + var ys time.Time + var ye time.Time // if opts.Duration <= (time.Hour * 24) { - ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[0].Add(opts.Duration)) + ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC) + ye = ys.Add(opts.Duration) /* } else { @@ -108,15 +110,19 @@ func main() { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } - for i := range ys { + // for i := range ys { - pStart := time.Now() + pStart := time.Now() - aggregate(ys[i], ye[i]) + // aggregate(ys[i], ye[i]) + aggregate(ys, ye) - fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) - log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) - } + fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart)) + log.Printf("Stop %s: %s\n", ys, time.Since(pStart)) + + // fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) + // log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) + // } idb.Stop = time.Since(start) @@ -124,6 +130,6 @@ func main() { log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) if opts.Influxdb != "" { - writeStats(start) + writeStats(start, ys) } } From 1b1736a2e44ff94f3d76a584161a42b044c9c6d1 Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 21 Apr 2016 16:21:13 +0200 Subject: [PATCH 23/29] aggiunta gestione del PID --- lastlogin_consolidate.go | 9 ++++++++- options.go | 26 ++++++++++++++------------ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index ff2adaf..1e70e7a 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.3.1" + _VERSION = "v1.3.2" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -29,6 +29,8 @@ var ( Batch: 10000, } + loop []bool + dbs = Dbs{} idb = InfluxdbOutput{ @@ -57,6 +59,8 @@ func main() { } } + setTerm() + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) @@ -65,6 +69,9 @@ func main() { log.SetOutput(fs) + pid.Write(true) + defer pid.Remove() + start := time.Now() fmt.Printf("Start: %+v\n", opts) log.Printf("Start: %+v\n", opts) diff --git a/options.go b/options.go index e696e42..296853c 100644 --- a/options.go +++ b/options.go @@ -13,18 +13,20 @@ import ( ) type Options struct { - MongoSrc string - MongoDst string - StartDate string - Duration time.Duration - Interval time.Duration - LogFile string - Influxdb string - Hostname string - Version bool - Debug bool - Bulk bool - Batch int + MongoSrc string + MongoDst string + StartDate string + Duration time.Duration + Interval time.Duration + LogFile string + Influxdb string + Hostname string + Version bool + Debug bool + Bulk bool + Batch int + Exe string + Concurrent int } func usage() { From 9c7006dcea63e04ddb98b870221238ed1337b326 Mon Sep 17 00:00:00 2001 From: Miki Date: Thu, 21 Apr 2016 17:37:27 +0200 Subject: [PATCH 24/29] manda solo la data ad influxdb --- influxdb.go | 2 +- lastlogin_consolidate.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb.go b/influxdb.go index 1691396..b881214 100644 --- a/influxdb.go +++ b/influxdb.go @@ -46,7 +46,7 @@ func writeStats(start time.Time, ys time.Time) { fields := map[string]interface{}{ "LoginOK": idb.CountOK, "LoginTOT": idb.CountTOT, - "start": ys, + "start": ys.Format(_tformat), "stop": idb.Stop.Seconds(), "pipe": idb.Pipe.Nanoseconds(), "find": idb.Find.Nanoseconds(), diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 1e70e7a..4fc5cf1 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -12,7 +12,7 @@ import ( ) const ( - _VERSION = "v1.3.2" + _VERSION = "v1.3.3" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) From 914177d440acb47458a532c79e5019e3396ae732 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 22 Apr 2016 12:02:03 +0200 Subject: [PATCH 25/29] objects structure --- aggregate.go | 86 +++++++++++++++++++++++++--------------- dbs.go | 6 --- influxdb.go | 52 +++++++++++++++--------- lastlogin_consolidate.go | 57 ++++++++------------------ options.go | 29 +++++++------- 5 files changed, 117 insertions(+), 113 deletions(-) diff --git a/aggregate.go b/aggregate.go index c74c977..96e856e 100644 --- a/aggregate.go +++ b/aggregate.go @@ -4,45 +4,67 @@ package main import ( "fmt" "log" - // "sort" "time" "gopkg.in/mgo.v2/bson" ) -func bulkWrite() { +type Aggregate struct { + start time.Time + stop time.Time + users Users + cUsers int + cLogins int +} + +func Consolidate(ys time.Time, ye time.Time) *Aggregate { + a := Aggregate{ + start: ys, + stop: ye, + cLogins: 0, + cUsers: 0, + } + return &a +} + +func (a Aggregate) Verify() int { + tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count() + if err != nil { + fmt.Printf("Verify error: %+v\n", err) + } + + return tot +} + +func (a Aggregate) bulkWrite() { _, err := dbs.bulk.Run() if err != nil { log.Println("Insert error: ", err) } - //fmt.Printf("Bulk res: %+v\n", res) } -func consolidate(user Users, ys time.Time, ye time.Time) { +func (a Aggregate) consolidate() { if opts.Bulk { dbs.bulk = dbs.lc.Bulk() dbs.bulk.Unordered() } - idb.CountTOT += 1 + idb.TotUsers += 1 ll := LastLoginDay{} - ll.User = user.User - ll.Date = ys + ll.User = a.users.User + ll.Date = a.start - // DEBUG - - logins := user.Logins + logins := a.users.Logins ips := []IPs{} lastip := IPs{} for l := range logins { if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { - /* - if opts.Debug { - fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) - } - */ + // a.discard += 1 + // if opts.Debug { + // fmt.Printf("\rDiscarded: %06d", a.discard) + // } } else { ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) lastip.Date = logins[l].Date @@ -62,29 +84,22 @@ func consolidate(user Users, ys time.Time, ye time.Time) { iStart := time.Now() if opts.Bulk { - //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) dbs.bulk.Insert(ll) } else { _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } - // fmt.Printf("Change: %+v\n", info) } idb.Insert += time.Since(iStart) - idb.CountOK += 1 - if opts.Bulk { - bulkWrite() + a.bulkWrite() } - // if opts.Debug { - // fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert) - // } } -func aggregate(ys time.Time, ye time.Time) { +func (a Aggregate) Start() { groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"} @@ -93,7 +108,7 @@ func aggregate(ys time.Time, ye time.Time) { qStart := time.Now() p := dbs.ll.Pipe([]bson.M{ - {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, + {"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop}, "user": bson.RegEx{"^" + groups[g], ""}}}, {"$sort": bson.M{"user": -1, "date": 1}}, {"$group": bson.M{"_id": "$user", @@ -102,21 +117,26 @@ func aggregate(ys time.Time, ye time.Time) { iter := p.Iter() defer iter.Close() - var result Users - for iter.Next(&result) { - consolidate(result, ys, ye) + a.cUsers = 0 + a.cLogins = 0 + a.users = *new(Users) + fmt.Printf("Logins: %d\n", a.cLogins) + for iter.Next(&a.users) { + a.cUsers += 1 + a.cLogins += len(a.users.Logins) + a.consolidate() } + idb.TotLogins += a.cLogins + if opts.Debug { - fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart)) + fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart)) } - // p.All(&result) - idb.Pipe = idb.Pipe + time.Since(qStart) } - fmt.Printf("Date: %s - %s\n", ys, ye) - log.Printf("Date: %s - %s\n", ys, ye) + fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) + log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) } diff --git a/dbs.go b/dbs.go index 64864d4..ba859c0 100644 --- a/dbs.go +++ b/dbs.go @@ -54,12 +54,6 @@ type Users struct { Logins []IPs `json:"logins" bson:"logins"` } -/* -type Users struct { - User string `json:"user"` -} -*/ - func connectMongo() { if opts.MongoSrc == "" { diff --git a/influxdb.go b/influxdb.go index b881214..86a8070 100644 --- a/influxdb.go +++ b/influxdb.go @@ -9,16 +9,32 @@ import ( ) type InfluxdbOutput struct { - CountOK int - CountTOT int - Start time.Time - Stop time.Duration - Pipe time.Duration - Find time.Duration - Insert time.Duration + InsUsers int + TotUsers int + TotLogins int + Now time.Time + Start time.Time + Stop time.Duration + Pipe time.Duration + Find time.Duration + Insert time.Duration } -func writeStats(start time.Time, ys time.Time) { +func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput { + i := InfluxdbOutput{ + InsUsers: 0, + TotLogins: 0, + TotUsers: 0, + Insert: 0, + Find: 0, + Start: ys, + Now: start, + } + + return &i +} + +func (i InfluxdbOutput) writeStats() { if opts.Debug { fmt.Printf("writing to influxdb server: %s", opts.Influxdb) } @@ -42,17 +58,18 @@ func writeStats(start time.Time, ys time.Time) { return } - tags := map[string]string{"server": opts.Hostname, "date": ys.String()} + tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()} fields := map[string]interface{}{ - "LoginOK": idb.CountOK, - "LoginTOT": idb.CountTOT, - "start": ys.Format(_tformat), - "stop": idb.Stop.Seconds(), - "pipe": idb.Pipe.Nanoseconds(), - "find": idb.Find.Nanoseconds(), - "insert": idb.Insert.Nanoseconds(), + "UsersOK": idb.InsUsers, + "UsersTOT": idb.TotUsers, + "LoginsTOT": idb.TotLogins, + "start": i.Start.Format(_tformat), + "stop": idb.Stop.Seconds(), + "pipe": idb.Pipe.Nanoseconds(), + "find": idb.Find.Nanoseconds(), + "insert": idb.Insert.Nanoseconds(), } - pt, err := influxdb.NewPoint("llday", tags, fields, start) + pt, err := influxdb.NewPoint("llday", tags, fields, i.Now) if err != nil { fmt.Printf("Error: %+v\n", err) return @@ -60,6 +77,5 @@ func writeStats(start time.Time, ys time.Time) { bp.AddPoint(pt) - // Write the batch c.Write(bp) } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 4fc5cf1..4cdb6f8 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -4,15 +4,16 @@ package main import ( "flag" "fmt" - // "gopkg.in/mgo.v2" - // "gopkg.in/mgo.v2/bson" "log" "os" + "sync" "time" + + "github.com/mikif70/pidlib" ) const ( - _VERSION = "v1.3.3" + _VERSION = "v1.3.4" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -29,16 +30,11 @@ var ( Batch: 10000, } - loop []bool + wg sync.WaitGroup dbs = Dbs{} - idb = InfluxdbOutput{ - CountTOT: 0, - CountOK: 0, - Insert: 0, - Find: 0, - } + idb *InfluxdbOutput ) func main() { @@ -69,7 +65,8 @@ func main() { log.SetOutput(fs) - pid.Write(true) + pid := pidlib.New() + pid.Write() defer pid.Remove() start := time.Now() @@ -86,57 +83,35 @@ func main() { os.Exit(-1) } - // DEBUG - //fmt.Printf("Start %+v\n\r", y) - - // var ys []time.Time - // var ye []time.Time var ys time.Time var ye time.Time - // if opts.Duration <= (time.Hour * 24) { ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC) ye = ys.Add(opts.Duration) - /* - } else { - for i := 0; i <= int(opts.Duration/(time.Hour*24)); i++ { - yt := y.Add(time.Hour * time.Duration(24*i)) - if opts.Debug { - fmt.Println(i) - fmt.Println(yt) - } - ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[i].Add(_24h)) - } - } - */ + idb = Influxdb(start, ys) // DEBUG if opts.Debug { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } - // for i := range ys { - pStart := time.Now() - // aggregate(ys[i], ye[i]) - aggregate(ys, ye) + agg := Consolidate(ys, ye) + agg.Start() fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart)) log.Printf("Stop %s: %s\n", ys, time.Since(pStart)) - // fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) - // log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) - // } - idb.Stop = time.Since(start) - fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) - log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) + idb.InsUsers = agg.Verify() + + fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop) + log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop) if opts.Influxdb != "" { - writeStats(start, ys) + idb.writeStats() } } diff --git a/options.go b/options.go index 296853c..29a4c0f 100644 --- a/options.go +++ b/options.go @@ -13,24 +13,23 @@ import ( ) type Options struct { - MongoSrc string - MongoDst string - StartDate string - Duration time.Duration - Interval time.Duration - LogFile string - Influxdb string - Hostname string - Version bool - Debug bool - Bulk bool - Batch int - Exe string - Concurrent int + MongoSrc string + MongoDst string + StartDate string + Duration time.Duration + Interval time.Duration + LogFile string + Influxdb string + Hostname string + Version bool + Debug bool + Bulk bool + Batch int + Exe string } func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -i -I -v\n") + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -I -bulk -v\n") os.Exit(0) } From a8ae98e427cc18f98aeea31369469d4cb6ac46d1 Mon Sep 17 00:00:00 2001 From: Miki Date: Fri, 22 Apr 2016 12:18:33 +0200 Subject: [PATCH 26/29] fixed bugs --- aggregate.go | 1 - lastlogin_consolidate.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/aggregate.go b/aggregate.go index 96e856e..9e7f322 100644 --- a/aggregate.go +++ b/aggregate.go @@ -120,7 +120,6 @@ func (a Aggregate) Start() { a.cUsers = 0 a.cLogins = 0 a.users = *new(Users) - fmt.Printf("Logins: %d\n", a.cLogins) for iter.Next(&a.users) { a.cUsers += 1 a.cLogins += len(a.users.Logins) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 4cdb6f8..6382817 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -13,7 +13,7 @@ import ( ) const ( - _VERSION = "v1.3.4" + _VERSION = "v1.3.5" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) From 8dca30619b643b3857cc36d67b86335e84962bf3 Mon Sep 17 00:00:00 2001 From: Miki Date: Mon, 2 May 2016 16:38:45 +0200 Subject: [PATCH 27/29] aggiunti i ms --- lastlogin_consolidate.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 6382817..97b7e81 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -13,9 +13,9 @@ import ( ) const ( - _VERSION = "v1.3.5" + _VERSION = "v1.3.6" _tformat = "2006-01-02" - _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999) _10m = (time.Minute * 10) _15m = (time.Minute * 15) ) From 2778e5506bd7bf53eec5fcf7fde3c423ddc11c42 Mon Sep 17 00:00:00 2001 From: Miki Date: Wed, 4 May 2016 10:46:24 +0200 Subject: [PATCH 28/29] aggiunta l'opzione per specificare il database di destinazione --- dbs.go | 2 +- lastlogin_consolidate.go | 3 ++- options.go | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dbs.go b/dbs.go index ba859c0..6b6b65b 100644 --- a/dbs.go +++ b/dbs.go @@ -78,6 +78,6 @@ func connectMongo() { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } - dbs.lc = dbs.mdbDst.DB("dovecot").C("lastlogin_day") + dbs.lc = dbs.mdbDst.DB(opts.DstDB).C("lastlogin_day") } } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 97b7e81..6605749 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -13,7 +13,7 @@ import ( ) const ( - _VERSION = "v1.3.6" + _VERSION = "v1.3.7" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999) _10m = (time.Minute * 10) @@ -28,6 +28,7 @@ var ( Duration: _24h, Interval: _15m, Batch: 10000, + DstDB: "dovecot", } wg sync.WaitGroup diff --git a/options.go b/options.go index 29a4c0f..62471e8 100644 --- a/options.go +++ b/options.go @@ -15,6 +15,7 @@ import ( type Options struct { MongoSrc string MongoDst string + DstDB string StartDate string Duration time.Duration Interval time.Duration @@ -43,11 +44,12 @@ func init() { flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") + flag.StringVar(&opts.MongoDst, "dd", opts.DstDB, "Database Destination") flag.StringVar(&opts.Influxdb, "I", "", "Influxdb uri") flag.StringVar(&opts.Hostname, "H", "", "Hostname") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") - flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") + flag.DurationVar(&opts.Duration, "du", opts.Duration, "Duration") flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") flag.BoolVar(&opts.Version, "v", false, "Version") flag.BoolVar(&opts.Debug, "debug", false, "Debug") From b97098e78bc8bf6fc50b4ebc993d500a8a3ec87a Mon Sep 17 00:00:00 2001 From: Miki Date: Mon, 9 May 2016 12:10:27 +0200 Subject: [PATCH 29/29] usa la collection: lastlogin.lastlogin_yymm --- dbs.go | 4 ++-- lastlogin_consolidate.go | 12 +++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/dbs.go b/dbs.go index 6b6b65b..5330be7 100644 --- a/dbs.go +++ b/dbs.go @@ -54,7 +54,7 @@ type Users struct { Logins []IPs `json:"logins" bson:"logins"` } -func connectMongo() { +func connectMongo(data string) { if opts.MongoSrc == "" { log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) @@ -67,7 +67,7 @@ func connectMongo() { os.Exit(-3) } dbs.mdbSrc.SetSocketTimeout(0) - dbs.ll = dbs.mdbSrc.DB("dovecot").C("lastlogin") + dbs.ll = dbs.mdbSrc.DB("lastlogin").C("lastlogin_" + data) if opts.MongoDst == "" { dbs.mdbDst = dbs.mdbSrc diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 6605749..6a757cf 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -13,7 +13,7 @@ import ( ) const ( - _VERSION = "v1.3.7" + _VERSION = "v1.4.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999) _10m = (time.Minute * 10) @@ -74,24 +74,22 @@ func main() { fmt.Printf("Start: %+v\n", opts) log.Printf("Start: %+v\n", opts) - connectMongo() - defer dbs.mdbSrc.Close() - defer dbs.mdbDst.Close() - y, err := time.Parse(_tformat, opts.StartDate) if err != nil { log.Println("Date Error: ", err) os.Exit(-1) } - var ys time.Time var ye time.Time ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC) ye = ys.Add(opts.Duration) - idb = Influxdb(start, ys) + connectMongo(ys.Format("0601")) + defer dbs.mdbSrc.Close() + defer dbs.mdbDst.Close() + // DEBUG if opts.Debug { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye)