diff --git a/aggregate.go b/aggregate.go new file mode 100644 index 0000000..9e7f322 --- /dev/null +++ b/aggregate.go @@ -0,0 +1,141 @@ +// aggregate +package main + +import ( + "fmt" + "log" + "time" + + "gopkg.in/mgo.v2/bson" +) + +type Aggregate struct { + start time.Time + stop time.Time + users Users + cUsers int + cLogins int +} + +func Consolidate(ys time.Time, ye time.Time) *Aggregate { + a := Aggregate{ + start: ys, + stop: ye, + cLogins: 0, + cUsers: 0, + } + return &a +} + +func (a Aggregate) Verify() int { + tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count() + if err != nil { + fmt.Printf("Verify error: %+v\n", err) + } + + return tot +} + +func (a Aggregate) bulkWrite() { + _, err := dbs.bulk.Run() + if err != nil { + log.Println("Insert error: ", err) + } +} + +func (a Aggregate) consolidate() { + + if opts.Bulk { + dbs.bulk = dbs.lc.Bulk() + dbs.bulk.Unordered() + } + + idb.TotUsers += 1 + ll := LastLoginDay{} + ll.User = a.users.User + ll.Date = a.start + + logins := a.users.Logins + + ips := []IPs{} + lastip := IPs{} + for l := range logins { + if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { + // a.discard += 1 + // if opts.Debug { + // fmt.Printf("\rDiscarded: %06d", a.discard) + // } + } else { + ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) + lastip.Date = logins[l].Date + lastip.IP = logins[l].IP + } + switch logins[l].Protocol { + case "pop3", "pop": + ll.Protocols.Pop += 1 + case "imap": + ll.Protocols.Imap += 1 + case "web": + ll.Protocols.Web += 1 + } + } + ll.IPs = ips + + iStart := time.Now() + + if opts.Bulk { + dbs.bulk.Insert(ll) + } else { + _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) + if err != nil { + log.Println("Insert error: ", err) + } + } + idb.Insert += time.Since(iStart) + + if opts.Bulk { + a.bulkWrite() + } + +} + +func (a Aggregate) Start() { + + groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"} + + for g := range groups { + + qStart := time.Now() + + p := dbs.ll.Pipe([]bson.M{ + {"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop}, + "user": bson.RegEx{"^" + groups[g], ""}}}, + {"$sort": bson.M{"user": -1, "date": 1}}, + {"$group": bson.M{"_id": "$user", + "logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse() + + iter := p.Iter() + defer iter.Close() + + a.cUsers = 0 + a.cLogins = 0 + a.users = *new(Users) + for iter.Next(&a.users) { + a.cUsers += 1 + a.cLogins += len(a.users.Logins) + a.consolidate() + } + + idb.TotLogins += a.cLogins + + if opts.Debug { + fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart)) + } + + idb.Pipe = idb.Pipe + time.Since(qStart) + + } + + fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) + log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins) +} diff --git a/dbs.go b/dbs.go new file mode 100644 index 0000000..5330be7 --- /dev/null +++ b/dbs.go @@ -0,0 +1,83 @@ +// dbs +package main + +import ( + "log" + "os" + "time" + + "gopkg.in/mgo.v2" +) + +type Dbs struct { + mdbSrc *mgo.Session + mdbDst *mgo.Session + ll *mgo.Collection + lc *mgo.Collection + bulk *mgo.Bulk +} + +type LastLogin struct { + User string `json: "user" bson:"user"` + Protocol string `json: "protocol" bson:"protocol"` + IP string `json: "ip" bson:"ip"` + Date time.Time `json: "date" bson:"date"` + ID string `json: "_id" bson:"_id"` +} + +type LastLoginDay struct { + User string `json:"user" bson:"user"` + Date time.Time `json:"date" bson:"date"` + Protocols Protocols `json:"protocols" bson:"protocols"` + IPs []IPs `json:"ips" bson:"ips"` +} + +type IPs struct { + IP string `json:"ip" bson:"ip"` + Date time.Time `json:"date" bson:"date"` + Protocol string `json:"protocol" bson:"protocol"` +} + +type Protocols struct { + Pop int `json:"pop" bson:"pop"` + Imap int `json:"imap" bson:"imap"` + Web int `json:"web" bson:"web"` +} + +type Index struct { + User string `json:"user" bson:"user"` + Date time.Time `json:"date" bson:"date"` +} + +type Users struct { + User string `json:"_id" bson:"_id"` + Logins []IPs `json:"logins" bson:"logins"` +} + +func connectMongo(data string) { + + if opts.MongoSrc == "" { + log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) + } + var err error + //opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5) + dbs.mdbSrc, err = mgo.Dial(opts.MongoSrc) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + dbs.mdbSrc.SetSocketTimeout(0) + dbs.ll = dbs.mdbSrc.DB("lastlogin").C("lastlogin_" + data) + + if opts.MongoDst == "" { + dbs.mdbDst = dbs.mdbSrc + dbs.lc = dbs.mdbSrc.DB("dovecot").C("lastlogin_day") + } else { + dbs.mdbDst, err = mgo.Dial(opts.MongoDst) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + dbs.lc = dbs.mdbDst.DB(opts.DstDB).C("lastlogin_day") + } +} diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..86a8070 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,81 @@ +// influxdb +package main + +import ( + "fmt" + "time" + + influxdb "github.com/influxdata/influxdb/client/v2" +) + +type InfluxdbOutput struct { + InsUsers int + TotUsers int + TotLogins int + Now time.Time + Start time.Time + Stop time.Duration + Pipe time.Duration + Find time.Duration + Insert time.Duration +} + +func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput { + i := InfluxdbOutput{ + InsUsers: 0, + TotLogins: 0, + TotUsers: 0, + Insert: 0, + Find: 0, + Start: ys, + Now: start, + } + + return &i +} + +func (i InfluxdbOutput) writeStats() { + if opts.Debug { + fmt.Printf("writing to influxdb server: %s", opts.Influxdb) + } + + c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: opts.Influxdb, + Timeout: 2 * time.Second, + }) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + defer c.Close() + + bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ + Database: "dovecot", + Precision: "s", + }) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + + tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()} + fields := map[string]interface{}{ + "UsersOK": idb.InsUsers, + "UsersTOT": idb.TotUsers, + "LoginsTOT": idb.TotLogins, + "start": i.Start.Format(_tformat), + "stop": idb.Stop.Seconds(), + "pipe": idb.Pipe.Nanoseconds(), + "find": idb.Find.Nanoseconds(), + "insert": idb.Insert.Nanoseconds(), + } + pt, err := influxdb.NewPoint("llday", tags, fields, i.Now) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + + bp.AddPoint(pt) + + c.Write(bp) +} diff --git a/lastlogin_consolidate.go b/lastlogin_consolidate.go index 8e5c2d3..6a757cf 100644 --- a/lastlogin_consolidate.go +++ b/lastlogin_consolidate.go @@ -4,126 +4,40 @@ package main import ( "flag" "fmt" - "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" "log" "os" - "path" - "path/filepath" + "sync" "time" + + "github.com/mikif70/pidlib" ) const ( - _VERSION = "v0.5" + _VERSION = "v1.4.0" _tformat = "2006-01-02" - _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999) _10m = (time.Minute * 10) + _15m = (time.Minute * 15) ) var ( opts = Options{ - // MongoSrc: "mongodb://10.39.81.85:27018", - LogFile: "log/llmongo.log", + MongoSrc: "mongodb://127.0.0.1:27018", + LogFile: "log/llconsolidate.log", StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat), Duration: _24h, - Interval: _10m, + Interval: _15m, + Batch: 10000, + DstDB: "dovecot", } + + wg sync.WaitGroup + + dbs = Dbs{} + + idb *InfluxdbOutput ) -type Options struct { - MongoSrc string - MongoDst string - mdbSrc *mgo.Session - mdbDst *mgo.Session - ll *mgo.Collection - lc *mgo.Collection - StartDate string - Duration time.Duration - Interval time.Duration - LogFile string - Version bool -} - -type LastLogin struct { - User string `json: "user"` - Protocol string `json: "protocol"` - IP string `json: "ip"` - Date time.Time `json: "date"` - ID string `json: "_id"` -} - -type LastLoginDay struct { - User string `json:"user"` - Date time.Time `json:"date"` - Protocols Protocols `json:"protocols"` - IPs []IPs `json:"ips"` -} - -type IPs struct { - IP string `json:"ip"` - Date time.Time `json:"date"` -} - -type Protocols struct { - Pop int `json:"pop"` - Imap int `json:"imap"` - Web int `json:"web"` -} - -type Index struct { - User string `json:"user"` - Date time.Time `json:"date"` -} - -func usage() { - fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -dd -i -v\n") - os.Exit(0) -} - -func init() { - current, err := filepath.Abs(filepath.Dir(os.Args[0])) - if err != nil { - log.Fatal(err) - } - - opts.LogFile = path.Join(current, opts.LogFile) - - flag.StringVar(&opts.MongoSrc, "ms", "", "Mongodb Source") - flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") - flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") - flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") - flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") - flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") - flag.BoolVar(&opts.Version, "v", false, "Version") -} - -func connectMongo() { - - if opts.MongoSrc == "" { - log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) - } - var err error - opts.mdbSrc, err = mgo.Dial(opts.MongoSrc) - if err != nil { - log.Println("Mongodb connect Error: ", err.Error()) - os.Exit(-3) - } - opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin") - - if opts.MongoDst == "" { - opts.mdbDst = opts.mdbSrc - opts.lc = opts.mdbSrc.DB("dovecot").C("lastlogin_day") - } else { - opts.mdbDst, err = mgo.Dial(opts.MongoDst) - if err != nil { - log.Println("Mongodb connect Error: ", err.Error()) - os.Exit(-3) - } - opts.lc = opts.mdbDst.DB("dovecot").C("lastlogin_day") - } - -} - func main() { flag.Usage = usage @@ -134,6 +48,16 @@ func main() { os.Exit(0) } + if opts.Hostname == "" { + var err error + opts.Hostname, err = os.Hostname() + if err != nil { + fmt.Println("Hostname error: ", err.Error()) + } + } + + setTerm() + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) @@ -141,87 +65,52 @@ func main() { } log.SetOutput(fs) - // log.SetPrefix("[llmongo] ") + + pid := pidlib.New() + pid.Write() + defer pid.Remove() start := time.Now() fmt.Printf("Start: %+v\n", opts) log.Printf("Start: %+v\n", opts) - connectMongo() - defer opts.mdbSrc.Close() - defer opts.mdbDst.Clone() - y, err := time.Parse(_tformat, opts.StartDate) if err != nil { - fmt.Println("Date Error: ", err) + log.Println("Date Error: ", err) os.Exit(-1) } + var ys time.Time + var ye time.Time - fmt.Println(y) + ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC) + ye = ys.Add(opts.Duration) + idb = Influxdb(start, ys) - var ys []time.Time - var ye []time.Time + connectMongo(ys.Format("0601")) + defer dbs.mdbSrc.Close() + defer dbs.mdbDst.Close() - if opts.Duration <= (time.Hour * 24) { - ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[0].Add(opts.Duration)) - } else { - for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ { - fmt.Println(i) - yt := y.Add(time.Hour * time.Duration(24*i)) - fmt.Println(yt) - ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) - ye = append(ye, ys[i].Add(_24h)) - } + // DEBUG + if opts.Debug { + fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } - fmt.Println(ys, ye) + pStart := time.Now() - for i := range ys { - q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") + agg := Consolidate(ys, ye) + agg.Start() - ar := []string{} - q.Distinct("user", &ar) + fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart)) + log.Printf("Stop %s: %s\n", ys, time.Since(pStart)) - fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) + idb.Stop = time.Since(start) - for u := range ar { - ll := LastLoginDay{} - ll.User = ar[u] - ll.Date = ys[i] - fmt.Println(ar[u]) - nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date") - iter := nq.Iter() - result := LastLogin{} - ips := []IPs{} - lastip := IPs{} - for iter.Next(&result) { - if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { - //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) - } else { - ips = append(ips, IPs{IP: result.IP, Date: result.Date}) - lastip.Date = result.Date - lastip.IP = result.IP - } - switch result.Protocol { - case "pop3", "pop": - ll.Protocols.Pop += 1 - case "imap": - ll.Protocols.Imap += 1 - case "web": - ll.Protocols.Web += 1 - } - } - if err := iter.Close(); err != nil { - fmt.Println("Iter: ", err) - } - ll.IPs = ips - _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) - if err != nil { - fmt.Println("Insert error: ", err) - } - } + idb.InsUsers = agg.Verify() + + fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop) + log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop) + + if opts.Influxdb != "" { + idb.writeStats() } - - fmt.Println("Stop: ", time.Since(start)) } diff --git a/options.go b/options.go new file mode 100644 index 0000000..62471e8 --- /dev/null +++ b/options.go @@ -0,0 +1,57 @@ +// options +package main + +import ( + "flag" + "fmt" + // "gopkg.in/mgo.v2" + "log" + "os" + "path" + "path/filepath" + "time" +) + +type Options struct { + MongoSrc string + MongoDst string + DstDB string + StartDate string + Duration time.Duration + Interval time.Duration + LogFile string + Influxdb string + Hostname string + Version bool + Debug bool + Bulk bool + Batch int + Exe string +} + +func usage() { + fmt.Println("Usage: lastlogin_consolidate -ms -md -l -d -I -bulk -v\n") + os.Exit(0) +} + +func init() { + current, err := filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(current, opts.LogFile) + + flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") + flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") + flag.StringVar(&opts.MongoDst, "dd", opts.DstDB, "Database Destination") + flag.StringVar(&opts.Influxdb, "I", "", "Influxdb uri") + flag.StringVar(&opts.Hostname, "H", "", "Hostname") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") + flag.DurationVar(&opts.Duration, "du", opts.Duration, "Duration") + flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.BoolVar(&opts.Debug, "debug", false, "Debug") + flag.BoolVar(&opts.Bulk, "bulk", false, "Bulk") +}