objects structure

This commit is contained in:
Miki 2016-04-22 12:02:03 +02:00
parent 9c7006dcea
commit 914177d440
5 changed files with 117 additions and 113 deletions

View file

@ -4,45 +4,67 @@ package main
import ( import (
"fmt" "fmt"
"log" "log"
// "sort"
"time" "time"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
func bulkWrite() { type Aggregate struct {
start time.Time
stop time.Time
users Users
cUsers int
cLogins int
}
func Consolidate(ys time.Time, ye time.Time) *Aggregate {
a := Aggregate{
start: ys,
stop: ye,
cLogins: 0,
cUsers: 0,
}
return &a
}
func (a Aggregate) Verify() int {
tot, err := dbs.lc.Find(bson.M{"date": a.start}).Count()
if err != nil {
fmt.Printf("Verify error: %+v\n", err)
}
return tot
}
func (a Aggregate) bulkWrite() {
_, err := dbs.bulk.Run() _, err := dbs.bulk.Run()
if err != nil { if err != nil {
log.Println("Insert error: ", err) log.Println("Insert error: ", err)
} }
//fmt.Printf("Bulk res: %+v\n", res)
} }
func consolidate(user Users, ys time.Time, ye time.Time) { func (a Aggregate) consolidate() {
if opts.Bulk { if opts.Bulk {
dbs.bulk = dbs.lc.Bulk() dbs.bulk = dbs.lc.Bulk()
dbs.bulk.Unordered() dbs.bulk.Unordered()
} }
idb.CountTOT += 1 idb.TotUsers += 1
ll := LastLoginDay{} ll := LastLoginDay{}
ll.User = user.User ll.User = a.users.User
ll.Date = ys ll.Date = a.start
// DEBUG logins := a.users.Logins
logins := user.Logins
ips := []IPs{} ips := []IPs{}
lastip := IPs{} lastip := IPs{}
for l := range logins { for l := range logins {
if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval { if logins[l].IP == lastip.IP && logins[l].Date.Sub(lastip.Date) < opts.Interval {
/* // a.discard += 1
if opts.Debug { // if opts.Debug {
fmt.Println("IPs: ", logins[l].IP, logins[l].Date, logins[l].Date.Sub(lastip.Date)) // fmt.Printf("\rDiscarded: %06d", a.discard)
} // }
*/
} else { } else {
ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol}) ips = append(ips, IPs{IP: logins[l].IP, Date: logins[l].Date, Protocol: logins[l].Protocol})
lastip.Date = logins[l].Date lastip.Date = logins[l].Date
@ -62,29 +84,22 @@ func consolidate(user Users, ys time.Time, ye time.Time) {
iStart := time.Now() iStart := time.Now()
if opts.Bulk { if opts.Bulk {
//dbs.bulk.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
dbs.bulk.Insert(ll) dbs.bulk.Insert(ll)
} else { } else {
_, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) _, err := dbs.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll)
if err != nil { if err != nil {
log.Println("Insert error: ", err) log.Println("Insert error: ", err)
} }
// fmt.Printf("Change: %+v\n", info)
} }
idb.Insert += time.Since(iStart) idb.Insert += time.Since(iStart)
idb.CountOK += 1
if opts.Bulk { if opts.Bulk {
bulkWrite() a.bulkWrite()
} }
// if opts.Debug {
// fmt.Printf("Insert: %d in %v\n", idb.CountOK, idb.Insert)
// }
} }
func aggregate(ys time.Time, ye time.Time) { func (a Aggregate) Start() {
groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"} groups := []string{"[^a-z]", "[be]", "[rv]", "[dt]", "[li]", "[pzjkwxy]", "[fn]", "[co]", "[gu]", "[sh]", "[aq]", "[m]"}
@ -93,7 +108,7 @@ func aggregate(ys time.Time, ye time.Time) {
qStart := time.Now() qStart := time.Now()
p := dbs.ll.Pipe([]bson.M{ p := dbs.ll.Pipe([]bson.M{
{"$match": bson.M{"date": bson.M{"$gte": ys, "$lte": ye}, {"$match": bson.M{"date": bson.M{"$gte": a.start, "$lte": a.stop},
"user": bson.RegEx{"^" + groups[g], ""}}}, "user": bson.RegEx{"^" + groups[g], ""}}},
{"$sort": bson.M{"user": -1, "date": 1}}, {"$sort": bson.M{"user": -1, "date": 1}},
{"$group": bson.M{"_id": "$user", {"$group": bson.M{"_id": "$user",
@ -102,21 +117,26 @@ func aggregate(ys time.Time, ye time.Time) {
iter := p.Iter() iter := p.Iter()
defer iter.Close() defer iter.Close()
var result Users a.cUsers = 0
for iter.Next(&result) { a.cLogins = 0
consolidate(result, ys, ye) a.users = *new(Users)
fmt.Printf("Logins: %d\n", a.cLogins)
for iter.Next(&a.users) {
a.cUsers += 1
a.cLogins += len(a.users.Logins)
a.consolidate()
} }
idb.TotLogins += a.cLogins
if opts.Debug { if opts.Debug {
fmt.Printf("Group %v: %+v\n", groups[g], time.Since(qStart)) fmt.Printf("Group %v: %d & %d in %+v\n", groups[g], a.cUsers, a.cLogins, time.Since(qStart))
} }
// p.All(&result)
idb.Pipe = idb.Pipe + time.Since(qStart) idb.Pipe = idb.Pipe + time.Since(qStart)
} }
fmt.Printf("Date: %s - %s\n", ys, ye) fmt.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
log.Printf("Date: %s - %s\n", ys, ye) log.Printf("Date: %s - %s - U: %d - L: %d\n", a.start, a.stop, idb.TotUsers, idb.TotLogins)
} }

6
dbs.go
View file

@ -54,12 +54,6 @@ type Users struct {
Logins []IPs `json:"logins" bson:"logins"` Logins []IPs `json:"logins" bson:"logins"`
} }
/*
type Users struct {
User string `json:"user"`
}
*/
func connectMongo() { func connectMongo() {
if opts.MongoSrc == "" { if opts.MongoSrc == "" {

View file

@ -9,8 +9,10 @@ import (
) )
type InfluxdbOutput struct { type InfluxdbOutput struct {
CountOK int InsUsers int
CountTOT int TotUsers int
TotLogins int
Now time.Time
Start time.Time Start time.Time
Stop time.Duration Stop time.Duration
Pipe time.Duration Pipe time.Duration
@ -18,7 +20,21 @@ type InfluxdbOutput struct {
Insert time.Duration Insert time.Duration
} }
func writeStats(start time.Time, ys time.Time) { func Influxdb(start time.Time, ys time.Time) *InfluxdbOutput {
i := InfluxdbOutput{
InsUsers: 0,
TotLogins: 0,
TotUsers: 0,
Insert: 0,
Find: 0,
Start: ys,
Now: start,
}
return &i
}
func (i InfluxdbOutput) writeStats() {
if opts.Debug { if opts.Debug {
fmt.Printf("writing to influxdb server: %s", opts.Influxdb) fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
} }
@ -42,17 +58,18 @@ func writeStats(start time.Time, ys time.Time) {
return return
} }
tags := map[string]string{"server": opts.Hostname, "date": ys.String()} tags := map[string]string{"server": opts.Hostname, "date": i.Start.String()}
fields := map[string]interface{}{ fields := map[string]interface{}{
"LoginOK": idb.CountOK, "UsersOK": idb.InsUsers,
"LoginTOT": idb.CountTOT, "UsersTOT": idb.TotUsers,
"start": ys.Format(_tformat), "LoginsTOT": idb.TotLogins,
"start": i.Start.Format(_tformat),
"stop": idb.Stop.Seconds(), "stop": idb.Stop.Seconds(),
"pipe": idb.Pipe.Nanoseconds(), "pipe": idb.Pipe.Nanoseconds(),
"find": idb.Find.Nanoseconds(), "find": idb.Find.Nanoseconds(),
"insert": idb.Insert.Nanoseconds(), "insert": idb.Insert.Nanoseconds(),
} }
pt, err := influxdb.NewPoint("llday", tags, fields, start) pt, err := influxdb.NewPoint("llday", tags, fields, i.Now)
if err != nil { if err != nil {
fmt.Printf("Error: %+v\n", err) fmt.Printf("Error: %+v\n", err)
return return
@ -60,6 +77,5 @@ func writeStats(start time.Time, ys time.Time) {
bp.AddPoint(pt) bp.AddPoint(pt)
// Write the batch
c.Write(bp) c.Write(bp)
} }

View file

@ -4,15 +4,16 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
// "gopkg.in/mgo.v2"
// "gopkg.in/mgo.v2/bson"
"log" "log"
"os" "os"
"sync"
"time" "time"
"github.com/mikif70/pidlib"
) )
const ( const (
_VERSION = "v1.3.3" _VERSION = "v1.3.4"
_tformat = "2006-01-02" _tformat = "2006-01-02"
_24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59)
_10m = (time.Minute * 10) _10m = (time.Minute * 10)
@ -29,16 +30,11 @@ var (
Batch: 10000, Batch: 10000,
} }
loop []bool wg sync.WaitGroup
dbs = Dbs{} dbs = Dbs{}
idb = InfluxdbOutput{ idb *InfluxdbOutput
CountTOT: 0,
CountOK: 0,
Insert: 0,
Find: 0,
}
) )
func main() { func main() {
@ -69,7 +65,8 @@ func main() {
log.SetOutput(fs) log.SetOutput(fs)
pid.Write(true) pid := pidlib.New()
pid.Write()
defer pid.Remove() defer pid.Remove()
start := time.Now() start := time.Now()
@ -86,57 +83,35 @@ func main() {
os.Exit(-1) os.Exit(-1)
} }
// DEBUG
//fmt.Printf("Start %+v\n\r", y)
// var ys []time.Time
// var ye []time.Time
var ys time.Time var ys time.Time
var ye time.Time var ye time.Time
// if opts.Duration <= (time.Hour * 24) {
ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC) ys = time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)
ye = ys.Add(opts.Duration) ye = ys.Add(opts.Duration)
/* idb = Influxdb(start, ys)
} else {
for i := 0; i <= int(opts.Duration/(time.Hour*24)); i++ {
yt := y.Add(time.Hour * time.Duration(24*i))
if opts.Debug {
fmt.Println(i)
fmt.Println(yt)
}
ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC))
ye = append(ye, ys[i].Add(_24h))
}
}
*/
// DEBUG // DEBUG
if opts.Debug { if opts.Debug {
fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye)
} }
// for i := range ys {
pStart := time.Now() pStart := time.Now()
// aggregate(ys[i], ye[i]) agg := Consolidate(ys, ye)
aggregate(ys, ye) agg.Start()
fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart)) fmt.Printf("Stop %s: %s\n", ys, time.Since(pStart))
log.Printf("Stop %s: %s\n", ys, time.Since(pStart)) log.Printf("Stop %s: %s\n", ys, time.Since(pStart))
// fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart))
// log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart))
// }
idb.Stop = time.Since(start) idb.Stop = time.Since(start)
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop) idb.InsUsers = agg.Verify()
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.CountOK, idb.CountTOT, idb.Stop)
fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", idb.InsUsers, idb.TotUsers, idb.Stop)
if opts.Influxdb != "" { if opts.Influxdb != "" {
writeStats(start, ys) idb.writeStats()
} }
} }

View file

@ -26,11 +26,10 @@ type Options struct {
Bulk bool Bulk bool
Batch int Batch int
Exe string Exe string
Concurrent int
} }
func usage() { func usage() {
fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -i <interval> -I <influxdb uri> -v\n") fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -I <influxdb uri> -bulk -v\n")
os.Exit(0) os.Exit(0)
} }