diff --git a/aggregate.go b/aggregate.go index c74c977..96e856e 100644 --- a/aggregate.go +++ b/aggregate.go @@ -4,45 +4,67 @@ package main import ( "fmt" "log" - // "sort" "time" "gopkg.in/mgo.v2/bson" ) -func bulkWrite() { +type Aggregate struct { + start time.Time + stop time.Time + users Users + cUsers int + cLogins int +} + +func Consolidate(ys time.Time, ye time.Time) *Aggregate { + a := Aggregate{ + start: ys, + stop: ye, + cLogins: 0, + cUsers: 0, + } + return &a +} + +func (a Aggregate) Verify() int { + tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count() + if err != nil { + fmt.Printf("Verify error: %+v\n", err) + } + + return tot +} + +func (a Aggregate) bulkWrite() { _, err := dbs.bulk.Run() if err != nil { log.Println("Insert error: ", err) } - //fmt.Printf("Bulk res: %+v\n", res) } -func consolidate(user Users, ys time.Time, ye time.Time) { +func (a Aggregate) consolidate() { if opts.Bulk { dbs.bulk = dbs.lc.Bulk() dbs.bulk.Unordered() } - idb.CountTOT += 1 + idb.TotUsers += 1 ll := LastLoginDay{} - ll.User = user.User - ll.Date = ys + ll.User = a.users.User + ll.Date = a.start - // DEBUG - - logins := user.Logins + logins := a.users.Logins ips := []IPs{} lastip := IPs{} for l := range logins { if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { - /* - if opts.Debug { - fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) - } - */ + // a.discard += 1 + // if opts.Debug { + // fmt.Printf("\rDiscarded: %06d", a.discard) + // } } else { ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) lastip.Date = logins[l].Date @@ -62,29 +84,22 @@ func consolidate(user Users, ys time.Time, ye time.Time) { iStart := time.Now() if opts.Bulk { - //dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll) dbs.bulk.Insert(ll) } else { _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } - // fmt.Printf("Change: %+v\n", info) } idb.Insert += time.Since(iStart) - idb.CountOK += 1 - if opts.Bulk { - bulkWrite() + a.bulkWrite() } - // if opts.Debug { - // fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert) - // } } -func aggregate(ys time.Time, ye time.Time) { +func (a Aggregate) Start() { groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"} @@ -93,7 +108,7 @@ func aggregate(ys time.Time, ye time.Time) { qStart := time.Now() p := dbs.ll.Pipe([]bson.M{ - {"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, + {"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop}, "user": bson.RegEx{"^" + groups[g], ""}}}, {"$sort": bson.M{"user": -1, "date": 1}}, {"$group": bson.M{"_id": "$user", @@ -102,21 +117,26 @@ func aggregate(ys time.Time, ye time.Time) { iter := p.Iter() defer iter.Close() - var result Users - for iter.Next(&result) { - consolidate(result, ys, ye) + a.cUsers = 0 + a.cLogins = 0 + a.users = *new(Users) + fmt.Printf("Logins: %d\n", a.cLogins) + for iter.Next(&a.users) { + a.cUsers += 1 + a.cLogins += len(a.users.Logins) + a.consolidate() } + idb.TotLogins += a.cLogins + if opts.Debug { - fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart)) + fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart)) } - // p.All(&result) - idb.Pipe = idb.Pipe + time.Since(qStart) } - fmt.Printf("Date: %s - %s\n", ys, ye) - log.Printf("Date: %s - %s\n", ys, ye) + fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) + log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) } diff --git a/dbs.go b/dbs.go index 64864d4..ba859c0 100644 --- a/dbs.go +++ b/dbs.go @@ -54,12 +54,6 @@ type Users struct { Logins []IPs `json:"logins" bson:"logins"` } -/* -type Users struct { - User string `json:"user"` -} -*/ - func connectMongo() { if opts.MongoSrc == "" { diff --git a/influxdb.go b/influxdb.go index b881214..86a8070 100644 --- a/influxdb.go +++ b/influxdb.go @@ -9,16 +9,32 @@ import ( ) type InfluxdbOutput struct { - CountOK int - CountTOT int - Start time.Time - Stop time.Duration - Pipe time.Duration - Find time.Duration - Insert time.Duration + InsUsers int + TotUsers int + TotLogins int + Now time.Time + Start time.Time + Stop time.Duration + Pipe time.Duration + Find time.Duration + Insert time.Duration } -func writeStats(start time.Time, ys time.Time) { +func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput { + i := InfluxdbOutput{ + InsUsers: 0, + TotLogins: 0, + TotUsers: 0, + Insert: 0, + Find: 0, + Start: ys, + Now: start, + } + + return &i +} + +func (i InfluxdbOutput) writeStats() { if opts.Debug { fmt.Printf("writing to influxdb server: %s", opts.Influxdb) } @@ -42,17 +58,18 @@ func writeStats(start time.Time, ys time.Time) { return } - tags := map[string]string{"server": opts.Hostname, "date": ys.String()} + tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()} fields := map[string]interface{}{ - "LoginOK": idb.CountOK, - "LoginTOT": idb.CountTOT, - "start": ys.Format(_tformat), - "stop": idb.Stop.Seconds(), - "pipe": idb.Pipe.Nanoseconds(), - "find": idb.Find.Nanoseconds(), - "insert": idb.Insert.Nanoseconds(), + "UsersOK": idb.InsUsers, + "UsersTOT": idb.TotUsers, + "LoginsTOT": idb.TotLogins, + "start": i.Start.Format(_tformat), + "stop": idb.Stop.Seconds(), + "pipe": idb.Pipe.Nanoseconds(), + "find": idb.Find.Nanoseconds(), + "insert": idb.Insert.Nanoseconds(), } - pt, err := influxdb.NewPoint("llday", tags, fields, start) + pt, err := influxdb.NewPoint("llday", tags, fields, i.Now) if err != nil { fmt.Printf("Error: %+v\n", err) return @@ -60,6 +77,5 @@ func writeStats(start time.Time, ys time.Time) { bp.AddPoint(pt) - // Write the batch c.Write(bp) } diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 4fc5cf1..4cdb6f8 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -4,15 +4,16 @@ package main import ( "flag" "fmt" - // "gopkg.in/mgo.v2" - // "gopkg.in/mgo.v2/bson" "log" "os" + "sync" "time" + + "github.com/mikif70/pidlib" ) const ( - _VERSION = "v1.3.3" + _VERSION = "v1.3.4" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) @@ -29,16 +30,11 @@ var ( Batch: 10000, } - loop []bool + wg sync.WaitGroup dbs = Dbs{} - idb = InfluxdbOutput{ - CountTOT: 0, - CountOK: 0, - Insert: 0, - Find: 0, - } + idb *InfluxdbOutput ) func main() { @@ -69,7 +65,8 @@ func main() { log.SetOutput(fs) - pid.Write(true) + pid := pidlib.New() + pid.Write() defer pid.Remove() start := time.Now() @@ -86,57 +83,35 @@ func main() { os.Exit(-1) } - // DEBUG - //fmt.Printf("Start %+v\n\r", y) - - // var ys []time.Time - // var ye []time.Time var ys time.Time var ye time.Time - // if opts.Duration <= (time.Hour * 24) { ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC) ye = ys.Add(opts.Duration) - /* - } else { - for i := 0; i <= int(opts.Duration/(time.Hour*24)); i++ { - yt := y.Add(time.Hour * time.Duration(24*i)) - if opts.Debug { - fmt.Println(i) - fmt.Println(yt) - } - ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[i].Add(_24h)) - } - } - */ + idb = Influxdb(start, ys) // DEBUG if opts.Debug { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } - // for i := range ys { - pStart := time.Now() - // aggregate(ys[i], ye[i]) - aggregate(ys, ye) + agg := Consolidate(ys, ye) + agg.Start() fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart)) log.Printf("Stop %s: %s\n", ys, time.Since(pStart)) - // fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) - // log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) - // } - idb.Stop = time.Since(start) - fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) - log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) + idb.InsUsers = agg.Verify() + + fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop) + log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop) if opts.Influxdb != "" { - writeStats(start, ys) + idb.writeStats() } } diff --git a/options.go b/options.go index 296853c..29a4c0f 100644 --- a/options.go +++ b/options.go @@ -13,24 +13,23 @@ import ( ) type Options struct { - MongoSrc string - MongoDst string - StartDate string - Duration time.Duration - Interval time.Duration - LogFile string - Influxdb string - Hostname string - Version bool - Debug bool - Bulk bool - Batch int - Exe string - Concurrent int + MongoSrc string + MongoDst string + StartDate string + Duration time.Duration + Interval time.Duration + LogFile string + Influxdb string + Hostname string + Version bool + Debug bool + Bulk bool + Batch int + Exe string } func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -i -I -v\n") + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -I -bulk -v\n") os.Exit(0) }