Compare commits

...

26 commits

Author SHA1 Message Date
b97098e78b usa la collection: lastlogin.lastlogin_yymm 2016-05-09 12:10:27 +02:00
2778e5506b aggiunta l'opzione per specificare il database di destinazione 2016-05-04 10:46:24 +02:00
8dca30619b aggiunti i ms 2016-05-02 16:38:45 +02:00
a8ae98e427 fixed bugs 2016-04-22 12:18:33 +02:00
914177d440 objects structure 2016-04-22 12:02:03 +02:00
9c7006dcea manda solo la data ad influxdb 2016-04-21 17:37:27 +02:00
1b1736a2e4 aggiunta gestione del PID 2016-04-21 16:21:13 +02:00
2e53d3fb4f spedisce a influxdb la data 2016-04-21 16:11:27 +02:00
e3d7b1dae5 implementata la scrittura Bulk e lo split dell'aggregato per username 2016-04-21 12:36:06 +02:00
0eca19256c skip+limit test 2016-04-15 17:31:00 +02:00
c0d5972a44 primo test 2016-04-15 15:28:52 +02:00
3078fe215d return nanoseconds 2016-04-07 17:44:53 +02:00
34e86dd7fd aggiunti parametri per il monitoraggio 2016-04-06 12:05:40 +02:00
1ebc85d1c3 aggiunto timeout per la connessione con influxdb 2016-03-25 10:15:19 +01:00
a03057fe81 write stats to influxdb 2016-02-19 10:48:55 +01:00
494696c336 creata una struttura DBS per contenere i puntatori delle connessioni al DB 2015-11-26 09:19:16 +01:00
0de9b415b8 ottimizzata la query per l'estrazione degli utenti 2015-11-25 15:08:29 +01:00
a375f8dedf modificata la struttura del sorgente 2015-11-20 17:11:36 +01:00
16fd3dbd69 . 2015-11-13 12:58:09 +01:00
e20784a0d5 aggiunto il numero di user consolidati e il totale user da consolidare al file di log 2015-11-12 09:33:06 +01:00
c96f963f21 impostato ad indefinito il SocketTimeout del mongodb source 2015-11-09 16:43:07 +01:00
cd673bba2d aggiunta l'opzione di debug 2015-11-09 15:54:05 +01:00
42bb907c47 aggiunti i tempi parziali 2015-07-21 15:45:06 +02:00
9c49a04667 aggiunto il protcollo nell'array degli IP 2015-07-21 15:00:46 +02:00
58211e5dae . 2015-07-21 13:36:30 +02:00
dc2e4988be stampa il tempo parziale 2015-07-21 13:35:39 +02:00
5 changed files with 415 additions and 165 deletions

141
aggregate.go Normal file
View file

@ -0,0 +1,141 @@
// aggregate
package main
import (
"fmt"
"log"
"time"
"gopkg.in/mgo.v2/bson"
)
type Aggregate struct {
start time.Time
stop time.Time
users Users
cUsers int
cLogins int
}
func Consolidate(ys time.Time, ye time.Time) *Aggregate {
a := Aggregate{
start: ys,
stop: ye,
cLogins: 0,
cUsers: 0,
}
return &a
}
func (a Aggregate) Verify() int {
tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count()
if err != nil {
fmt.Printf("Verify error: %+v\n", err)
}
return tot
}
func (a Aggregate) bulkWrite() {
_, err := dbs.bulk.Run()
if err != nil {
log.Println("Insert error: ", err)
}
}
func (a Aggregate) consolidate() {
if opts.Bulk {
dbs.bulk = dbs.lc.Bulk()
dbs.bulk.Unordered()
}
idb.TotUsers += 1
ll := LastLoginDay{}
ll.User = a.users.User
ll.Date = a.start
logins := a.users.Logins
ips := []IPs{}
lastip := IPs{}
for l := range logins {
if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval {
// a.discard += 1
// if opts.Debug {
// fmt.Printf("\rDiscarded: %06d", a.discard)
// }
} else {
ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
lastip.Date = logins[l].Date
lastip.IP = logins[l].IP
}
switch logins[l].Protocol {
case "pop3", "pop":
ll.Protocols.Pop += 1
case "imap":
ll.Protocols.Imap += 1
case "web":
ll.Protocols.Web += 1
}
}
ll.IPs = ips
iStart := time.Now()
if opts.Bulk {
dbs.bulk.Insert(ll)
} else {
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil {
log.Println("Insert error: ", err)
}
}
idb.Insert += time.Since(iStart)
if opts.Bulk {
a.bulkWrite()
}
}
func (a Aggregate) Start() {
groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"}
for g := range groups {
qStart := time.Now()
p := dbs.ll.Pipe([]bson.M{
{"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop},
"user": bson.RegEx{"^" + groups[g], ""}}},
{"$sort": bson.M{"user": -1, "date": 1}},
{"$group": bson.M{"_id": "$user",
"logins": bson.M{"$push": bson.M{"protocol": "$protocol", "date": "$date", "ip": "$ip"}}}}}).AllowDiskUse()
iter := p.Iter()
defer iter.Close()
a.cUsers = 0
a.cLogins = 0
a.users = *new(Users)
for iter.Next(&a.users) {
a.cUsers += 1
a.cLogins += len(a.users.Logins)
a.consolidate()
}
idb.TotLogins += a.cLogins
if opts.Debug {
fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart))
}
idb.Pipe = idb.Pipe + time.Since(qStart)
}
fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
}

83
dbs.go Normal file
View file

@ -0,0 +1,83 @@
// dbs
package main
import (
"log"
"os"
"time"
"gopkg.in/mgo.v2"
)
type Dbs struct {
mdbSrc *mgo.Session
mdbDst *mgo.Session
ll *mgo.Collection
lc *mgo.Collection
bulk *mgo.Bulk
}
type LastLogin struct {
User string `json: "user" bson:"user"`
Protocol string `json: "protocol" bson:"protocol"`
IP string `json: "ip" bson:"ip"`
Date time.Time `json: "date" bson:"date"`
ID string `json: "_id" bson:"_id"`
}
type LastLoginDay struct {
User string `json:"user" bson:"user"`
Date time.Time `json:"date" bson:"date"`
Protocols Protocols `json:"protocols" bson:"protocols"`
IPs []IPs `json:"ips" bson:"ips"`
}
type IPs struct {
IP string `json:"ip" bson:"ip"`
Date time.Time `json:"date" bson:"date"`
Protocol string `json:"protocol" bson:"protocol"`
}
type Protocols struct {
Pop int `json:"pop" bson:"pop"`
Imap int `json:"imap" bson:"imap"`
Web int `json:"web" bson:"web"`
}
type Index struct {
User string `json:"user" bson:"user"`
Date time.Time `json:"date" bson:"date"`
}
type Users struct {
User string `json:"_id" bson:"_id"`
Logins []IPs `json:"logins" bson:"logins"`
}
func connectMongo(data string) {
if opts.MongoSrc == "" {
log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc)
}
var err error
//opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5)
dbs.mdbSrc, err = mgo.Dial(opts.MongoSrc)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
dbs.mdbSrc.SetSocketTimeout(0)
dbs.ll = dbs.mdbSrc.DB("lastlogin").C("lastlogin_" + data)
if opts.MongoDst == "" {
dbs.mdbDst = dbs.mdbSrc
dbs.lc = dbs.mdbSrc.DB("dovecot").C("lastlogin_day")
} else {
dbs.mdbDst, err = mgo.Dial(opts.MongoDst)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
dbs.lc = dbs.mdbDst.DB(opts.DstDB).C("lastlogin_day")
}
}

81
influxdb.go Normal file
View file

@ -0,0 +1,81 @@
// influxdb
package main
import (
"fmt"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
)
type InfluxdbOutput struct {
InsUsers int
TotUsers int
TotLogins int
Now time.Time
Start time.Time
Stop time.Duration
Pipe time.Duration
Find time.Duration
Insert time.Duration
}
func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput {
i := InfluxdbOutput{
InsUsers: 0,
TotLogins: 0,
TotUsers: 0,
Insert: 0,
Find: 0,
Start: ys,
Now: start,
}
return &i
}
func (i InfluxdbOutput) writeStats() {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: opts.Influxdb,
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
}
defer c.Close()
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: "dovecot",
Precision: "s",
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
}
tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()}
fields := map[string]interface{}{
"UsersOK": idb.InsUsers,
"UsersTOT": idb.TotUsers,
"LoginsTOT": idb.TotLogins,
"start": i.Start.Format(_tformat),
"stop": idb.Stop.Seconds(),
"pipe": idb.Pipe.Nanoseconds(),
"find": idb.Find.Nanoseconds(),
"insert": idb.Insert.Nanoseconds(),
}
pt, err := influxdb.NewPoint("llday", tags, fields, i.Now)
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
}
bp.AddPoint(pt)
c.Write(bp)
}

View file

@ -4,20 +4,20 @@ package main
import (
"flag"
"fmt"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"log"
"os"
"path"
"path/filepath"
"sync"
"time"
"github.com/mikif70/pidlib"
)
const (
_VERSION = "v1.0.2"
_VERSION = "v1.4.0"
_tformat = "2006-01-02"
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59)
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999)
_10m = (time.Minute * 10)
_15m = (time.Minute * 15)
)
var (
@ -26,104 +26,18 @@ var (
LogFile: "log/llconsolidate.log",
StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat),
Duration: _24h,
Interval: _10m,
Interval: _15m,
Batch: 10000,
DstDB: "dovecot",
}
wg sync.WaitGroup
dbs = Dbs{}
idb *InfluxdbOutput
)
type Options struct {
MongoSrc string
MongoDst string
mdbSrc *mgo.Session
mdbDst *mgo.Session
ll *mgo.Collection
lc *mgo.Collection
StartDate string
Duration time.Duration
Interval time.Duration
LogFile string
Version bool
}
type LastLogin struct {
User string `json: "user"`
Protocol string `json: "protocol"`
IP string `json: "ip"`
Date time.Time `json: "date"`
ID string `json: "_id"`
}
type LastLoginDay struct {
User string `json:"user"`
Date time.Time `json:"date"`
Protocols Protocols `json:"protocols"`
IPs []IPs `json:"ips"`
}
type IPs struct {
IP string `json:"ip"`
Date time.Time `json:"date"`
}
type Protocols struct {
Pop int `json:"pop"`
Imap int `json:"imap"`
Web int `json:"web"`
}
type Index struct {
User string `json:"user"`
Date time.Time `json:"date"`
}
func usage() {
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source uri> -md <mongo destination uri> -l <logfile> -d <date> -dd <duration> -i <interval> -v\n")
os.Exit(0)
}
func init() {
current, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(current, opts.LogFile)
flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source")
flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date")
flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration")
flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration")
flag.BoolVar(&opts.Version, "v", false, "Version")
}
func connectMongo() {
if opts.MongoSrc == "" {
log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc)
}
var err error
opts.mdbSrc, err = mgo.Dial(opts.MongoSrc)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin")
if opts.MongoDst == "" {
opts.mdbDst = opts.mdbSrc
opts.lc = opts.mdbSrc.DB("dovecot").C("lastlogin_day")
} else {
opts.mdbDst, err = mgo.Dial(opts.MongoDst)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
opts.lc = opts.mdbDst.DB("dovecot").C("lastlogin_day")
}
}
func main() {
flag.Usage = usage
@ -134,6 +48,16 @@ func main() {
os.Exit(0)
}
if opts.Hostname == "" {
var err error
opts.Hostname, err = os.Hostname()
if err != nil {
fmt.Println("Hostname error: ", err.Error())
}
}
setTerm()
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
@ -142,87 +66,51 @@ func main() {
log.SetOutput(fs)
pid := pidlib.New()
pid.Write()
defer pid.Remove()
start := time.Now()
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
connectMongo()
defer opts.mdbSrc.Close()
defer opts.mdbDst.Clone()
y, err := time.Parse(_tformat, opts.StartDate)
if err != nil {
log.Println("Date Error: ", err)
os.Exit(-1)
}
var ys time.Time
var ye time.Time
// fmt.Println(y)
ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)
ye = ys.Add(opts.Duration)
idb = Influxdb(start, ys)
var ys []time.Time
var ye []time.Time
connectMongo(ys.Format("0601"))
defer dbs.mdbSrc.Close()
defer dbs.mdbDst.Close()
if opts.Duration <= (time.Hour * 24) {
ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC))
ye = append(ye, ys[0].Add(opts.Duration))
} else {
for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ {
// fmt.Println(i)
yt := y.Add(time.Hour * time.Duration(24*i))
// fmt.Println(yt)
ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC))
ye = append(ye, ys[i].Add(_24h))
}
// DEBUG
if opts.Debug {
fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye)
}
fmt.Println(ys, ye)
pStart := time.Now()
for i := range ys {
q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user")
agg := Consolidate(ys, ye)
agg.Start()
ar := []string{}
q.Distinct("user", &ar)
fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart))
log.Printf("Stop %s: %s\n", ys, time.Since(pStart))
fmt.Printf("Date: %s - %s\n", ys[i], ye[i])
log.Printf("Date: %s - %s\n", ys[i], ye[i])
idb.Stop = time.Since(start)
for u := range ar {
ll := LastLoginDay{}
ll.User = ar[u]
ll.Date = ys[i]
// fmt.Println(ar[u])
nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date")
iter := nq.Iter()
result := LastLogin{}
ips := []IPs{}
lastip := IPs{}
for iter.Next(&result) {
if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval {
//fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date))
} else {
ips = append(ips, IPs{IP: result.IP, Date: result.Date})
lastip.Date = result.Date
lastip.IP = result.IP
}
switch result.Protocol {
case "pop3", "pop":
ll.Protocols.Pop += 1
case "imap":
ll.Protocols.Imap += 1
case "web":
ll.Protocols.Web += 1
}
}
if err := iter.Close(); err != nil {
log.Println("Iter: ", err)
}
ll.IPs = ips
_, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil {
log.Println("Insert error: ", err)
}
}
idb.InsUsers = agg.Verify()
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
if opts.Influxdb != "" {
idb.writeStats()
}
fmt.Println("Stop: ", time.Since(start))
log.Println("Stop: ", time.Since(start))
}

57
options.go Normal file
View file

@ -0,0 +1,57 @@
// options
package main
import (
"flag"
"fmt"
// "gopkg.in/mgo.v2"
"log"
"os"
"path"
"path/filepath"
"time"
)
type Options struct {
MongoSrc string
MongoDst string
DstDB string
StartDate string
Duration time.Duration
Interval time.Duration
LogFile string
Influxdb string
Hostname string
Version bool
Debug bool
Bulk bool
Batch int
Exe string
}
func usage() {
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -I <influxdb uri> -bulk -v\n")
os.Exit(0)
}
func init() {
current, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(current, opts.LogFile)
flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source")
flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination")
flag.StringVar(&opts.MongoDst, "dd", opts.DstDB, "Database Destination")
flag.StringVar(&opts.Influxdb, "I", "", "Influxdb uri")
flag.StringVar(&opts.Hostname, "H", "", "Hostname")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date")
flag.DurationVar(&opts.Duration, "du", opts.Duration, "Duration")
flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.BoolVar(&opts.Debug, "debug", false, "Debug")
flag.BoolVar(&opts.Bulk, "bulk", false, "Bulk")
}