Compare commits

...

5 commits

Author SHA1 Message Date
b97098e78b usa la collection: lastlogin.lastlogin_yymm 2016-05-09 12:10:27 +02:00
2778e5506b aggiunta l'opzione per specificare il database di destinazione 2016-05-04 10:46:24 +02:00
8dca30619b aggiunti i ms 2016-05-02 16:38:45 +02:00
a8ae98e427 fixed bugs 2016-04-22 12:18:33 +02:00
914177d440 objects structure 2016-04-22 12:02:03 +02:00
5 changed files with 127 additions and 123 deletions

View file

@ -4,45 +4,67 @@ package main
import (
"fmt"
"log"
// "sort"
"time"
"gopkg.in/mgo.v2/bson"
)
func bulkWrite() {
type Aggregate struct {
start time.Time
stop time.Time
users Users
cUsers int
cLogins int
}
func Consolidate(ys time.Time, ye time.Time) *Aggregate {
a := Aggregate{
start: ys,
stop: ye,
cLogins: 0,
cUsers: 0,
}
return &a
}
func (a Aggregate) Verify() int {
tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count()
if err != nil {
fmt.Printf("Verify error: %+v\n", err)
}
return tot
}
func (a Aggregate) bulkWrite() {
_, err := dbs.bulk.Run()
if err != nil {
log.Println("Insert error: ", err)
}
//fmt.Printf("Bulk res: %+v\n", res)
}
func consolidate(user Users, ys time.Time, ye time.Time) {
func (a Aggregate) consolidate() {
if opts.Bulk {
dbs.bulk = dbs.lc.Bulk()
dbs.bulk.Unordered()
}
idb.CountTOT += 1
idb.TotUsers += 1
ll := LastLoginDay{}
ll.User = user.User
ll.Date = ys
ll.User = a.users.User
ll.Date = a.start
// DEBUG
logins := user.Logins
logins := a.users.Logins
ips := []IPs{}
lastip := IPs{}
for l := range logins {
if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval {
/*
if opts.Debug {
fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date))
}
*/
// a.discard += 1
// if opts.Debug {
// fmt.Printf("\rDiscarded: %06d", a.discard)
// }
} else {
ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
lastip.Date = logins[l].Date
@ -62,29 +84,22 @@ func consolidate(user Users, ys time.Time, ye time.Time) {
iStart := time.Now()
if opts.Bulk {
//dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
dbs.bulk.Insert(ll)
} else {
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil {
log.Println("Insert error: ", err)
}
// fmt.Printf("Change: %+v\n", info)
}
idb.Insert += time.Since(iStart)
idb.CountOK += 1
if opts.Bulk {
bulkWrite()
a.bulkWrite()
}
// if opts.Debug {
// fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert)
// }
}
func aggregate(ys time.Time, ye time.Time) {
func (a Aggregate) Start() {
groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"}
@ -93,7 +108,7 @@ func aggregate(ys time.Time, ye time.Time) {
qStart := time.Now()
p := dbs.ll.Pipe([]bson.M{
{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye},
{"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop},
"user": bson.RegEx{"^" + groups[g], ""}}},
{"$sort": bson.M{"user": -1, "date": 1}},
{"$group": bson.M{"_id": "$user",
@ -102,21 +117,25 @@ func aggregate(ys time.Time, ye time.Time) {
iter := p.Iter()
defer iter.Close()
var result Users
for iter.Next(&result) {
consolidate(result, ys, ye)
a.cUsers = 0
a.cLogins = 0
a.users = *new(Users)
for iter.Next(&a.users) {
a.cUsers += 1
a.cLogins += len(a.users.Logins)
a.consolidate()
}
idb.TotLogins += a.cLogins
if opts.Debug {
fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart))
fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart))
}
// p.All(&result)
idb.Pipe = idb.Pipe + time.Since(qStart)
}
fmt.Printf("Date: %s - %s\n", ys, ye)
log.Printf("Date: %s - %s\n", ys, ye)
fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
}

12
dbs.go
View file

@ -54,13 +54,7 @@ type Users struct {
Logins []IPs `json:"logins" bson:"logins"`
}
/*
type Users struct {
User string `json:"user"`
}
*/
func connectMongo() {
func connectMongo(data string) {
if opts.MongoSrc == "" {
log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc)
@ -73,7 +67,7 @@ func connectMongo() {
os.Exit(-3)
}
dbs.mdbSrc.SetSocketTimeout(0)
dbs.ll = dbs.mdbSrc.DB("dovecot").C("lastlogin")
dbs.ll = dbs.mdbSrc.DB("lastlogin").C("lastlogin_" + data)
if opts.MongoDst == "" {
dbs.mdbDst = dbs.mdbSrc
@ -84,6 +78,6 @@ func connectMongo() {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
dbs.lc = dbs.mdbDst.DB("dovecot").C("lastlogin_day")
dbs.lc = dbs.mdbDst.DB(opts.DstDB).C("lastlogin_day")
}
}

View file

@ -9,16 +9,32 @@ import (
)
type InfluxdbOutput struct {
CountOK int
CountTOT int
Start time.Time
Stop time.Duration
Pipe time.Duration
Find time.Duration
Insert time.Duration
InsUsers int
TotUsers int
TotLogins int
Now time.Time
Start time.Time
Stop time.Duration
Pipe time.Duration
Find time.Duration
Insert time.Duration
}
func writeStats(start time.Time, ys time.Time) {
func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput {
i := InfluxdbOutput{
InsUsers: 0,
TotLogins: 0,
TotUsers: 0,
Insert: 0,
Find: 0,
Start: ys,
Now: start,
}
return &i
}
func (i InfluxdbOutput) writeStats() {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
}
@ -42,17 +58,18 @@ func writeStats(start time.Time, ys time.Time) {
return
}
tags := map[string]string{"server": opts.Hostname, "date": ys.String()}
tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()}
fields := map[string]interface{}{
"LoginOK": idb.CountOK,
"LoginTOT": idb.CountTOT,
"start": ys.Format(_tformat),
"stop": idb.Stop.Seconds(),
"pipe": idb.Pipe.Nanoseconds(),
"find": idb.Find.Nanoseconds(),
"insert": idb.Insert.Nanoseconds(),
"UsersOK": idb.InsUsers,
"UsersTOT": idb.TotUsers,
"LoginsTOT": idb.TotLogins,
"start": i.Start.Format(_tformat),
"stop": idb.Stop.Seconds(),
"pipe": idb.Pipe.Nanoseconds(),
"find": idb.Find.Nanoseconds(),
"insert": idb.Insert.Nanoseconds(),
}
pt, err := influxdb.NewPoint("llday", tags, fields, start)
pt, err := influxdb.NewPoint("llday", tags, fields, i.Now)
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
@ -60,6 +77,5 @@ func writeStats(start time.Time, ys time.Time) {
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}

View file

@ -4,17 +4,18 @@ package main
import (
"flag"
"fmt"
// "gopkg.in/mgo.v2"
// "gopkg.in/mgo.v2/bson"
"log"
"os"
"sync"
"time"
"github.com/mikif70/pidlib"
)
const (
_VERSION = "v1.3.3"
_VERSION = "v1.4.0"
_tformat = "2006-01-02"
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59)
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) + (time.Millisecond * 999)
_10m = (time.Minute * 10)
_15m = (time.Minute * 15)
)
@ -27,18 +28,14 @@ var (
Duration: _24h,
Interval: _15m,
Batch: 10000,
DstDB: "dovecot",
}
loop []bool
wg sync.WaitGroup
dbs = Dbs{}
idb = InfluxdbOutput{
CountTOT: 0,
CountOK: 0,
Insert: 0,
Find: 0,
}
idb *InfluxdbOutput
)
func main() {
@ -69,74 +66,51 @@ func main() {
log.SetOutput(fs)
pid.Write(true)
pid := pidlib.New()
pid.Write()
defer pid.Remove()
start := time.Now()
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
connectMongo()
defer dbs.mdbSrc.Close()
defer dbs.mdbDst.Close()
y, err := time.Parse(_tformat, opts.StartDate)
if err != nil {
log.Println("Date Error: ", err)
os.Exit(-1)
}
// DEBUG
//fmt.Printf("Start %+v\n\r", y)
// var ys []time.Time
// var ye []time.Time
var ys time.Time
var ye time.Time
// if opts.Duration <= (time.Hour * 24) {
ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)
ye = ys.Add(opts.Duration)
idb = Influxdb(start, ys)
/*
} else {
for i := 0; i <= int(opts.Duration/(time.Hour*24)); i++ {
yt := y.Add(time.Hour * time.Duration(24*i))
if opts.Debug {
fmt.Println(i)
fmt.Println(yt)
}
ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC))
ye = append(ye, ys[i].Add(_24h))
}
}
*/
connectMongo(ys.Format("0601"))
defer dbs.mdbSrc.Close()
defer dbs.mdbDst.Close()
// DEBUG
if opts.Debug {
fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye)
}
// for i := range ys {
pStart := time.Now()
// aggregate(ys[i], ye[i])
aggregate(ys, ye)
agg := Consolidate(ys, ye)
agg.Start()
fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart))
log.Printf("Stop %s: %s\n", ys, time.Since(pStart))
// fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart))
// log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart))
// }
idb.Stop = time.Since(start)
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop)
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop)
idb.InsUsers = agg.Verify()
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
if opts.Influxdb != "" {
writeStats(start, ys)
idb.writeStats()
}
}

View file

@ -13,24 +13,24 @@ import (
)
type Options struct {
MongoSrc string
MongoDst string
StartDate string
Duration time.Duration
Interval time.Duration
LogFile string
Influxdb string
Hostname string
Version bool
Debug bool
Bulk bool
Batch int
Exe string
Concurrent int
MongoSrc string
MongoDst string
DstDB string
StartDate string
Duration time.Duration
Interval time.Duration
LogFile string
Influxdb string
Hostname string
Version bool
Debug bool
Bulk bool
Batch int
Exe string
}
func usage() {
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -i <interval> -I <influxdb uri> -v\n")
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -I <influxdb uri> -bulk -v\n")
os.Exit(0)
}
@ -44,11 +44,12 @@ func init() {
flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source")
flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination")
flag.StringVar(&opts.MongoDst, "dd", opts.DstDB, "Database Destination")
flag.StringVar(&opts.Influxdb, "I", "", "Influxdb uri")
flag.StringVar(&opts.Hostname, "H", "", "Hostname")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date")
flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration")
flag.DurationVar(&opts.Duration, "du", opts.Duration, "Duration")
flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.BoolVar(&opts.Debug, "debug", false, "Debug")