Compare commits
13 commits
Author | SHA1 | Date | |
---|---|---|---|
b97098e78b | |||
2778e5506b | |||
8dca30619b | |||
a8ae98e427 | |||
914177d440 | |||
9c7006dcea | |||
1b1736a2e4 | |||
2e53d3fb4f | |||
e3d7b1dae5 | |||
0eca19256c | |||
c0d5972a44 | |||
3078fe215d | |||
34e86dd7fd |
5 changed files with 220 additions and 118 deletions
165
aggregate.go
165
aggregate.go
|
@ -3,72 +3,139 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
func aggregate(ys time.Time, ye time.Time) {
|
||||
type Aggregate struct {
|
||||
start time.Time
|
||||
stop time.Time
|
||||
users Users
|
||||
cUsers int
|
||||
cLogins int
|
||||
}
|
||||
|
||||
qStart := time.Now()
|
||||
func Consolidate(ys time.Time, ye time.Time) *Aggregate {
|
||||
a := Aggregate{
|
||||
start: ys,
|
||||
stop: ye,
|
||||
cLogins: 0,
|
||||
cUsers: 0,
|
||||
}
|
||||
return &a
|
||||
}
|
||||
|
||||
p := dbs.ll.Pipe([]bson.M{{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}}}, {"$group": bson.M{"_id": "$user"}}, {"$project": bson.M{"user": "$_id"}}})
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("Aggregate user: %s\n\r", time.Since(qStart))
|
||||
func (a Aggregate) Verify() int {
|
||||
tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count()
|
||||
if err != nil {
|
||||
fmt.Printf("Verify error: %+v\n", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Date: %s - %s\n", ys, ye)
|
||||
log.Printf("Date: %s - %s\n", ys, ye)
|
||||
return tot
|
||||
}
|
||||
|
||||
ar := Users{}
|
||||
it := p.Iter()
|
||||
func (a Aggregate) bulkWrite() {
|
||||
_, err := dbs.bulk.Run()
|
||||
if err != nil {
|
||||
log.Println("Insert error: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
for it.Next(&ar) {
|
||||
countTOT += 1
|
||||
ll := LastLoginDay{}
|
||||
ll.User = ar.User
|
||||
ll.Date = ys
|
||||
func (a Aggregate) consolidate() {
|
||||
|
||||
// DEBUG
|
||||
if opts.Debug {
|
||||
fmt.Printf("User: %s\n\r", ar.User)
|
||||
if opts.Bulk {
|
||||
dbs.bulk = dbs.lc.Bulk()
|
||||
dbs.bulk.Unordered()
|
||||
}
|
||||
|
||||
idb.TotUsers += 1
|
||||
ll := LastLoginDay{}
|
||||
ll.User = a.users.User
|
||||
ll.Date = a.start
|
||||
|
||||
logins := a.users.Logins
|
||||
|
||||
ips := []IPs{}
|
||||
lastip := IPs{}
|
||||
for l := range logins {
|
||||
if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval {
|
||||
// a.discard += 1
|
||||
// if opts.Debug {
|
||||
// fmt.Printf("\rDiscarded: %06d", a.discard)
|
||||
// }
|
||||
} else {
|
||||
ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
|
||||
lastip.Date = logins[l].Date
|
||||
lastip.IP = logins[l].IP
|
||||
}
|
||||
switch logins[l].Protocol {
|
||||
case "pop3", "pop":
|
||||
ll.Protocols.Pop += 1
|
||||
case "imap":
|
||||
ll.Protocols.Imap += 1
|
||||
case "web":
|
||||
ll.Protocols.Web += 1
|
||||
}
|
||||
}
|
||||
ll.IPs = ips
|
||||
|
||||
qStart = time.Now()
|
||||
nq := dbs.ll.Find(bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, "user": ar.User}).Sort("date")
|
||||
//fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) )
|
||||
iter := nq.Iter()
|
||||
result := LastLogin{}
|
||||
ips := []IPs{}
|
||||
lastip := IPs{}
|
||||
for iter.Next(&result) {
|
||||
if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval {
|
||||
//fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date))
|
||||
} else {
|
||||
ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol})
|
||||
lastip.Date = result.Date
|
||||
lastip.IP = result.IP
|
||||
}
|
||||
switch result.Protocol {
|
||||
case "pop3", "pop":
|
||||
ll.Protocols.Pop += 1
|
||||
case "imap":
|
||||
ll.Protocols.Imap += 1
|
||||
case "web":
|
||||
ll.Protocols.Web += 1
|
||||
}
|
||||
}
|
||||
if err := iter.Close(); err != nil {
|
||||
log.Println("Iter: ", err)
|
||||
}
|
||||
ll.IPs = ips
|
||||
//fmt.Printf("Upsert %+v\n\r", ll)
|
||||
iStart := time.Now()
|
||||
|
||||
if opts.Bulk {
|
||||
dbs.bulk.Insert(ll)
|
||||
} else {
|
||||
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
|
||||
if err != nil {
|
||||
log.Println("Insert error: ", err)
|
||||
}
|
||||
countOK += 1
|
||||
}
|
||||
idb.Insert += time.Since(iStart)
|
||||
|
||||
if opts.Bulk {
|
||||
a.bulkWrite()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (a Aggregate) Start() {
|
||||
|
||||
groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"}
|
||||
|
||||
for g := range groups {
|
||||
|
||||
qStart := time.Now()
|
||||
|
||||
p := dbs.ll.Pipe([]bson.M{
|
||||
{"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop},
|
||||
"user": bson.RegEx{"^" + groups[g], ""}}},
|
||||
{"$sort": bson.M{"user": -1, "date": 1}},
|
||||
{"$group": bson.M{"_id": "$user",
|
||||
"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse()
|
||||
|
||||
iter := p.Iter()
|
||||
defer iter.Close()
|
||||
|
||||
a.cUsers = 0
|
||||
a.cLogins = 0
|
||||
a.users = *new(Users)
|
||||
for iter.Next(&a.users) {
|
||||
a.cUsers += 1
|
||||
a.cLogins += len(a.users.Logins)
|
||||
a.consolidate()
|
||||
}
|
||||
|
||||
idb.TotLogins += a.cLogins
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart))
|
||||
}
|
||||
|
||||
idb.Pipe = idb.Pipe + time.Since(qStart)
|
||||
|
||||
}
|
||||
|
||||
fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
|
||||
log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
|
||||
}
|
||||
|
|
48
dbs.go
48
dbs.go
|
@ -2,10 +2,11 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"gopkg.in/mgo.v2"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
type Dbs struct {
|
||||
|
@ -13,45 +14,47 @@ type Dbs struct {
|
|||
mdbDst *mgo.Session
|
||||
ll *mgo.Collection
|
||||
lc *mgo.Collection
|
||||
bulk *mgo.Bulk
|
||||
}
|
||||
|
||||
type LastLogin struct {
|
||||
User string `json: "user"`
|
||||
Protocol string `json: "protocol"`
|
||||
IP string `json: "ip"`
|
||||
Date time.Time `json: "date"`
|
||||
ID string `json: "_id"`
|
||||
User string `json: "user" bson:"user"`
|
||||
Protocol string `json: "protocol" bson:"protocol"`
|
||||
IP string `json: "ip" bson:"ip"`
|
||||
Date time.Time `json: "date" bson:"date"`
|
||||
ID string `json: "_id" bson:"_id"`
|
||||
}
|
||||
|
||||
type LastLoginDay struct {
|
||||
User string `json:"user"`
|
||||
Date time.Time `json:"date"`
|
||||
Protocols Protocols `json:"protocols"`
|
||||
IPs []IPs `json:"ips"`
|
||||
User string `json:"user" bson:"user"`
|
||||
Date time.Time `json:"date" bson:"date"`
|
||||
Protocols Protocols `json:"protocols" bson:"protocols"`
|
||||
IPs []IPs `json:"ips" bson:"ips"`
|
||||
}
|
||||
|
||||
type IPs struct {
|
||||
IP string `json:"ip"`
|
||||
Date time.Time `json:"date"`
|
||||
Protocol string `json:"protocol"`
|
||||
IP string `json:"ip" bson:"ip"`
|
||||
Date time.Time `json:"date" bson:"date"`
|
||||
Protocol string `json:"protocol" bson:"protocol"`
|
||||
}
|
||||
|
||||
type Protocols struct {
|
||||
Pop int `json:"pop"`
|
||||
Imap int `json:"imap"`
|
||||
Web int `json:"web"`
|
||||
Pop int `json:"pop" bson:"pop"`
|
||||
Imap int `json:"imap" bson:"imap"`
|
||||
Web int `json:"web" bson:"web"`
|
||||
}
|
||||
|
||||
type Index struct {
|
||||
User string `json:"user"`
|
||||
Date time.Time `json:"date"`
|
||||
User string `json:"user" bson:"user"`
|
||||
Date time.Time `json:"date" bson:"date"`
|
||||
}
|
||||
|
||||
type Users struct {
|
||||
User string `json:"user"`
|
||||
User string `json:"_id" bson:"_id"`
|
||||
Logins []IPs `json:"logins" bson:"logins"`
|
||||
}
|
||||
|
||||
func connectMongo() {
|
||||
func connectMongo(data string) {
|
||||
|
||||
if opts.MongoSrc == "" {
|
||||
log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc)
|
||||
|
@ -64,7 +67,7 @@ func connectMongo() {
|
|||
os.Exit(-3)
|
||||
}
|
||||
dbs.mdbSrc.SetSocketTimeout(0)
|
||||
dbs.ll = dbs.mdbSrc.DB("dovecot").C("lastlogin")
|
||||
dbs.ll = dbs.mdbSrc.DB("lastlogin").C("lastlogin_" + data)
|
||||
|
||||
if opts.MongoDst == "" {
|
||||
dbs.mdbDst = dbs.mdbSrc
|
||||
|
@ -75,7 +78,6 @@ func connectMongo() {
|
|||
log.Println("Mongodb connect Error: ", err.Error())
|
||||
os.Exit(-3)
|
||||
}
|
||||
dbs.lc = dbs.mdbDst.DB("dovecot").C("lastlogin_day")
|
||||
dbs.lc = dbs.mdbDst.DB(opts.DstDB).C("lastlogin_day")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
44
influxdb.go
44
influxdb.go
|
@ -8,7 +8,33 @@ import (
|
|||
influxdb "github.com/influxdata/influxdb/client/v2"
|
||||
)
|
||||
|
||||
func writeStats(start time.Time, stop time.Duration) {
|
||||
type InfluxdbOutput struct {
|
||||
InsUsers int
|
||||
TotUsers int
|
||||
TotLogins int
|
||||
Now time.Time
|
||||
Start time.Time
|
||||
Stop time.Duration
|
||||
Pipe time.Duration
|
||||
Find time.Duration
|
||||
Insert time.Duration
|
||||
}
|
||||
|
||||
func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput {
|
||||
i := InfluxdbOutput{
|
||||
InsUsers: 0,
|
||||
TotLogins: 0,
|
||||
TotUsers: 0,
|
||||
Insert: 0,
|
||||
Find: 0,
|
||||
Start: ys,
|
||||
Now: start,
|
||||
}
|
||||
|
||||
return &i
|
||||
}
|
||||
|
||||
func (i InfluxdbOutput) writeStats() {
|
||||
if opts.Debug {
|
||||
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
|
||||
}
|
||||
|
@ -32,13 +58,18 @@ func writeStats(start time.Time, stop time.Duration) {
|
|||
return
|
||||
}
|
||||
|
||||
tags := map[string]string{"server": opts.Hostname}
|
||||
tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()}
|
||||
fields := map[string]interface{}{
|
||||
"LoginOK": countOK,
|
||||
"LoginTOT": countTOT,
|
||||
"stop": stop.Seconds(),
|
||||
"UsersOK": idb.InsUsers,
|
||||
"UsersTOT": idb.TotUsers,
|
||||
"LoginsTOT": idb.TotLogins,
|
||||
"start": i.Start.Format(_tformat),
|
||||
"stop": idb.Stop.Seconds(),
|
||||
"pipe": idb.Pipe.Nanoseconds(),
|
||||
"find": idb.Find.Nanoseconds(),
|
||||
"insert": idb.Insert.Nanoseconds(),
|
||||
}
|
||||
pt, err := influxdb.NewPoint("llday", tags, fields, start)
|
||||
pt, err := influxdb.NewPoint("llday", tags, fields, i.Now)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
return
|
||||
|
@ -46,6 +77,5 @@ func writeStats(start time.Time, stop time.Duration) {
|
|||
|
||||
bp.AddPoint(pt)
|
||||
|
||||
// Write the batch
|
||||
c.Write(bp)
|
||||
}
|
||||
|
|
|
@ -4,17 +4,18 @@ package main
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
// "gopkg.in/mgo.v2"
|
||||
// "gopkg.in/mgo.v2/bson"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/mikif70/pidlib"
|
||||
)
|
||||
|
||||
const (
|
||||
_VERSION = "v1.2.1"
|
||||
_VERSION = "v1.4.0"
|
||||
_tformat = "2006-01-02"
|
||||
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59)
|
||||
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999)
|
||||
_10m = (time.Minute * 10)
|
||||
_15m = (time.Minute * 15)
|
||||
)
|
||||
|
@ -26,12 +27,15 @@ var (
|
|||
StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat),
|
||||
Duration: _24h,
|
||||
Interval: _15m,
|
||||
Batch: 10000,
|
||||
DstDB: "dovecot",
|
||||
}
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
dbs = Dbs{}
|
||||
|
||||
countTOT = 0
|
||||
countOK = 0
|
||||
idb *InfluxdbOutput
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -52,6 +56,8 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
setTerm()
|
||||
|
||||
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
fmt.Println("Log file error: ", err.Error())
|
||||
|
@ -60,60 +66,51 @@ func main() {
|
|||
|
||||
log.SetOutput(fs)
|
||||
|
||||
pid := pidlib.New()
|
||||
pid.Write()
|
||||
defer pid.Remove()
|
||||
|
||||
start := time.Now()
|
||||
fmt.Printf("Start: %+v\n", opts)
|
||||
log.Printf("Start: %+v\n", opts)
|
||||
|
||||
connectMongo()
|
||||
defer dbs.mdbSrc.Close()
|
||||
defer dbs.mdbDst.Clone()
|
||||
|
||||
y, err := time.Parse(_tformat, opts.StartDate)
|
||||
if err != nil {
|
||||
log.Println("Date Error: ", err)
|
||||
os.Exit(-1)
|
||||
}
|
||||
var ys time.Time
|
||||
var ye time.Time
|
||||
|
||||
// DEBUG
|
||||
//fmt.Printf("Start %+v\n\r", y)
|
||||
ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)
|
||||
ye = ys.Add(opts.Duration)
|
||||
idb = Influxdb(start, ys)
|
||||
|
||||
var ys []time.Time
|
||||
var ye []time.Time
|
||||
|
||||
if opts.Duration <= (time.Hour * 24) {
|
||||
ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC))
|
||||
ye = append(ye, ys[0].Add(opts.Duration))
|
||||
} else {
|
||||
for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ {
|
||||
// fmt.Println(i)
|
||||
yt := y.Add(time.Hour * time.Duration(24*i))
|
||||
// fmt.Println(yt)
|
||||
ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC))
|
||||
ye = append(ye, ys[i].Add(_24h))
|
||||
}
|
||||
}
|
||||
connectMongo(ys.Format("0601"))
|
||||
defer dbs.mdbSrc.Close()
|
||||
defer dbs.mdbDst.Close()
|
||||
|
||||
// DEBUG
|
||||
if opts.Debug {
|
||||
fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye)
|
||||
}
|
||||
|
||||
for i := range ys {
|
||||
pStart := time.Now()
|
||||
|
||||
pStart := time.Now()
|
||||
agg := Consolidate(ys, ye)
|
||||
agg.Start()
|
||||
|
||||
aggregate(ys[i], ye[i])
|
||||
fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart))
|
||||
log.Printf("Stop %s: %s\n", ys, time.Since(pStart))
|
||||
|
||||
fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart))
|
||||
log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart))
|
||||
}
|
||||
idb.Stop = time.Since(start)
|
||||
|
||||
stop := time.Since(start)
|
||||
idb.InsUsers = agg.Verify()
|
||||
|
||||
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, stop)
|
||||
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, stop)
|
||||
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
|
||||
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
|
||||
|
||||
if opts.Influxdb != "" {
|
||||
writeStats(start, stop)
|
||||
idb.writeStats()
|
||||
}
|
||||
}
|
||||
|
|
10
options.go
10
options.go
|
@ -15,6 +15,7 @@ import (
|
|||
type Options struct {
|
||||
MongoSrc string
|
||||
MongoDst string
|
||||
DstDB string
|
||||
StartDate string
|
||||
Duration time.Duration
|
||||
Interval time.Duration
|
||||
|
@ -23,10 +24,13 @@ type Options struct {
|
|||
Hostname string
|
||||
Version bool
|
||||
Debug bool
|
||||
Bulk bool
|
||||
Batch int
|
||||
Exe string
|
||||
}
|
||||
|
||||
func usage() {
|
||||
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -dd <duration> -i <interval> -I <influxdb uri> -v\n")
|
||||
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -I <influxdb uri> -bulk -v\n")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
|
@ -40,12 +44,14 @@ func init() {
|
|||
|
||||
flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source")
|
||||
flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination")
|
||||
flag.StringVar(&opts.MongoDst, "dd", opts.DstDB, "Database Destination")
|
||||
flag.StringVar(&opts.Influxdb, "I", "", "Influxdb uri")
|
||||
flag.StringVar(&opts.Hostname, "H", "", "Hostname")
|
||||
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
|
||||
flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date")
|
||||
flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration")
|
||||
flag.DurationVar(&opts.Duration, "du", opts.Duration, "Duration")
|
||||
flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration")
|
||||
flag.BoolVar(&opts.Version, "v", false, "Version")
|
||||
flag.BoolVar(&opts.Debug, "debug", false, "Debug")
|
||||
flag.BoolVar(&opts.Bulk, "bulk", false, "Bulk")
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue