implementato bulk insert con _id generato

This commit is contained in:
Michele 2016-11-03 17:42:37 +01:00
parent c2a5e09323
commit d7d68f4bf4
8 changed files with 71 additions and 85 deletions

View file

@ -4,11 +4,12 @@ package main
import (
"fmt"
// "github.com/garyburd/redigo/redis"
// "gopkg.in/mgo.v2/bson"
"log"
"strconv"
"strings"
"time"
"gopkg.in/mgo.v2"
)
type consumed struct {
@ -27,11 +28,16 @@ func contains(s []Ips, e string) bool {
return false
}
func consumer(id int) {
func consumer() {
for {
prod := <-consume[id]
prod := <-consume
var bulk = make(map[string]*mgo.Bulk)
bulk[opts.Month] = dbs.ll.Bulk()
bulk[opts.Month].Unordered()
cons := consumed{
user: prod.user,
@ -64,6 +70,8 @@ func consumer(id int) {
continue
}
ml := MongoLogin{
// genera l' _ID con user e timestamp
ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")),
User: prod.user,
Protocol: sval[0],
IP: sval[2],
@ -72,38 +80,30 @@ func consumer(id int) {
}
if opts.Month != ml.Date.Format("0601") {
lt := dbs.mdb.DB("lastlogin").C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")))
err = lt.Insert(ml)
if err != nil {
log.Printf("Insert error: %+v - %s - %s\n", err, cons.user, lt.FullName)
count.AddErr()
cons.error = true
continue
}
if opts.Debug {
log.Printf("%s - %+v\n", lt.FullName, ml)
dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))
if _, ok := bulk[dt]; !ok {
bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk()
bulk[dt].Unordered()
}
bulk[dt].Insert(ml)
} else {
// inserisce il login su Mongodb
err = dbs.ll.Insert(ml)
if err != nil {
log.Printf("Insert error: %+v - %s\n", err, cons.user)
count.AddErr()
cons.error = true
continue
}
if opts.Debug {
log.Printf("%+v\n", ml)
}
// inserisce last timestamp su redis per consolidamento
/*
*/
bulk[opts.Month].Insert(ml)
// inserisce last timestamp su redis per consolidamento
}
cons.logins = append(cons.logins, login)
}
for key, _ := range bulk {
_, err := bulk[key].Run()
if err != nil {
fmt.Printf("Err: %+v\n", err)
}
}
count.AddLog(len(prod.logins))
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
@ -111,9 +111,10 @@ func consumer(id int) {
}
if opts.Debug {
fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}
remove[id] <- cons
wg.Done()
// remove <- cons
}
}

View file

@ -15,7 +15,7 @@ type Counter struct {
err int
dup int
time time.Duration
wg []int
wg int
}
// NewCounter iniitialized Counter structure
@ -27,7 +27,7 @@ func NewCounter() *Counter {
rem: 0,
dup: 0,
time: 0,
wg: make([]int, opts.Concurrent),
wg: 0,
}
}
@ -60,10 +60,10 @@ func (c *Counter) AddRem(add int) {
}
// AddWG ...
func (c *Counter) AddWG(id int) {
func (c *Counter) AddWG() {
c.mu.Lock()
defer c.mu.Unlock()
c.wg[id]++
c.wg++
}
// AddErr ...
@ -74,10 +74,10 @@ func (c *Counter) AddErr() {
}
// DelWG ...
func (c *Counter) DelWG(id int) {
func (c *Counter) DelWG() {
c.mu.Lock()
defer c.mu.Unlock()
c.wg[id]--
c.wg--
}
// GetUser return total users
@ -121,10 +121,10 @@ func (c *Counter) GetRem() (ret int) {
}
// GetWG ...
func (c *Counter) GetWG(id int) (ret int) {
func (c *Counter) GetWG() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.wg[id]
ret = c.wg
return
}

1
dbs.go
View file

@ -33,6 +33,7 @@ type Dbs struct {
// MongoLogin structure
type MongoLogin struct {
ID string `json:"_id" bson:"_id"`
User string `json:"user"`
Protocol string `json:"protocol"`
IP string `json:"ip"`

41
main.go
View file

@ -11,15 +11,15 @@ import (
)
const (
_Version = "v2.8.0.1"
_Version = "v3.0.0"
)
var (
loop []bool
loop bool
done []chan bool
consume []chan produced
remove []chan consumed
done chan bool
consume chan produced
remove chan consumed
wg sync.WaitGroup
@ -76,27 +76,20 @@ func main() {
count = NewCounter()
for i := 0; i < opts.Concurrent; i++ {
consume = append(consume, make(chan produced))
remove = append(remove, make(chan consumed))
loop = append(loop, true)
done = append(done, make(chan bool))
consume = make(chan produced)
remove = make(chan consumed)
loop = true
done = make(chan bool)
go producer(i)
go consumer(i)
go remover(i)
}
go producer()
go consumer()
go remover()
for i := 0; i < opts.Concurrent; i++ {
<-done[i]
fmt.Printf("Done %d\n", i)
close(done[i])
}
<-done
fmt.Printf("Done\n")
close(done)
fmt.Println("Waiting WG")
for i := 0; i < opts.Concurrent; i++ {
fmt.Printf("ID (%d): %d\n", i, count.GetWG(i))
}
wg.Wait()
count.SetTime(time.Since(start))
@ -107,8 +100,4 @@ func main() {
if opts.Influxdb != "" {
writeStats(start)
}
if opts.Xymon != "" {
sendStatus()
}
}

View file

@ -24,7 +24,6 @@ type Options struct {
MaxLogins int
Debug bool
Version bool
Concurrent int
MaxError int
Xymon string
Influxdb string
@ -42,7 +41,7 @@ var (
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -b <concurrent thread> -T <running ttl> -x <xymon server> -H <hostname> -i <influxdb uri> -v")
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -T <running ttl> -x <xymon server> -H <hostname> -i <influxdb uri> -v")
fmt.Println()
os.Exit(0)
}
@ -70,7 +69,6 @@ func init() {
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.IntVar(&opts.Concurrent, "c", 1, "Concurrent thread")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")
}

View file

@ -15,11 +15,11 @@ type produced struct {
logins []string
}
func producer(id int) {
func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
for loop[id] {
for loop {
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
@ -53,19 +53,19 @@ func producer(id int) {
// }
if opts.Debug {
fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
// log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
count.AddUser()
wg.Add(1)
count.AddWG(id)
count.AddWG()
consume[id] <- produced{
consume <- produced{
user: user,
logins: logs,
}
}
done[id] <- true
done <- true
}

View file

@ -7,26 +7,26 @@ import (
"time"
)
func remover(id int) {
func remover() {
var conn = dbs.rdb.Get()
defer conn.Close()
for {
rem := <-remove[id]
rem := <-remove
// wg.Add(1)
start := time.Now()
for i := range rem.logins {
login := rem.logins[i]
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", login)
conn.Send("lrem", rem.user, "1", rem.logins[i])
}
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
if rem.error || !rem.empty {
if !rem.empty || rem.error {
if opts.Debug {
fmt.Printf("SADD (%d): %s\n", id, rem.user)
fmt.Printf("SADD: %s\n", rem.user)
}
conn.Send("sadd", "llindex", rem.user)
if count.GetErr() >= opts.MaxError {
@ -37,9 +37,9 @@ func remover(id int) {
conn.Flush()
count.AddRem(len(rem.logins))
if opts.Debug {
fmt.Printf("LREM (%d): %s - %d - %+v\n", id, rem.user, len(rem.logins), time.Since(start))
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
}
wg.Done()
count.DelWG(id)
count.DelWG()
}
}

View file

@ -10,12 +10,9 @@ import (
)
func exit() {
for i := 0; i < opts.Concurrent; i++ {
log.Println("EXIT ", i)
fmt.Println("EXIT ", i)
loop[i] = false
// done[i] <- true
}
log.Println("EXIT ")
fmt.Println("EXIT ")
loop = false
}
func setTerm() {