diff --git a/aggregate.go b/aggregate.go new file mode 100644 index 0000000..c5c93c6 --- /dev/null +++ b/aggregate.go @@ -0,0 +1,74 @@ +// aggregate +package main + +import ( + "fmt" + "gopkg.in/mgo.v2/bson" + "log" + "time" +) + +func aggregate(ys time.Time, ye time.Time) { + + qStart := time.Now() + + p := opts.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}}) + + if opts.Debug { + fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart)) + } + + fmt.Printf("Date: %s - %s\n", ys, ye) + log.Printf("Date: %s - %s\n", ys, ye) + + ar := Users{} + it := p.Iter() + + for it.Next(&ar) { + countTOT += 1 + ll := LastLoginDay{} + ll.User = ar.User + ll.Date = ys + + // DEBUG + if opts.Debug { + fmt.Printf("User: %s\n\r", ar.User) + } + + qStart = time.Now() + nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date") + //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) + iter := nq.Iter() + result := LastLogin{} + ips := []IPs{} + lastip := IPs{} + for iter.Next(&result) { + if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { + //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) + } else { + ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) + lastip.Date = result.Date + lastip.IP = result.IP + } + switch result.Protocol { + case "pop3", "pop": + ll.Protocols.Pop += 1 + case "imap": + ll.Protocols.Imap += 1 + case "web": + ll.Protocols.Web += 1 + } + } + if err := iter.Close(); err != nil { + log.Println("Iter: ", err) + } + ll.IPs = ips + //fmt.Printf("Upsert %+v\n\r", ll) + _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + if err != nil { + log.Println("Insert error: ", err) + } + countOK += 1 + } + +} diff --git a/dbs.go b/dbs.go index e153bae..95fa063 100644 --- a/dbs.go +++ b/dbs.go @@ -40,6 +40,10 @@ type Index struct { Date time.Time `json:"date"` } +type Users struct { + User string `json:"user"` +} + func connectMongo() { if opts.MongoSrc == "" { diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 52c9db3..c2a9869 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -5,14 +5,14 @@ import ( "flag" "fmt" // "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" + // "gopkg.in/mgo.v2/bson" "log" "os" "time" ) const ( - _VERSION = "v1.0.10" + _VERSION = "v1.1.0" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -27,6 +27,8 @@ var ( Duration: _24h, Interval: _15m, } + countTOT = 0 + countOK = 0 ) func main() { @@ -85,79 +87,12 @@ func main() { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } - countTOT := 0 - countOK := 0 - for i := range ys { pStart := time.Now() - qStart := time.Now() + aggregate(ys[i], ye[i]) - q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") - - if opts.Debug { - fmt.Printf("Find from date: %s\n\r", time.Since(qStart)) - } - - qStart = time.Now() - - ar := []string{} - q.Distinct("user", &ar) - - if opts.Debug { - fmt.Printf("Distinct user: %s\n\r", time.Since(qStart)) - } - - fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) - log.Printf("Date: %s - %s\n", ys[i], ye[i]) - - for u := range ar { - countTOT += 1 - ll := LastLoginDay{} - ll.User = ar[u] - ll.Date = ys[i] - - // DEBUG - if opts.Debug { - fmt.Printf("User: %s\n\r", ar[u]) - } - - qStart = time.Now() - nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date") - //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) - iter := nq.Iter() - result := LastLogin{} - ips := []IPs{} - lastip := IPs{} - for iter.Next(&result) { - if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { - //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) - } else { - ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) - lastip.Date = result.Date - lastip.IP = result.IP - } - switch result.Protocol { - case "pop3", "pop": - ll.Protocols.Pop += 1 - case "imap": - ll.Protocols.Imap += 1 - case "web": - ll.Protocols.Web += 1 - } - } - if err := iter.Close(); err != nil { - log.Println("Iter: ", err) - } - ll.IPs = ips - //fmt.Printf("Upsert %+v\n\r", ll) - _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - if err != nil { - log.Println("Insert error: ", err) - } - countOK += 1 - } fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) }