llmongodb/main.go

134 lines
2.5 KiB
Go
Raw Normal View History

2015-07-09 17:50:13 +02:00
// llmongo.go
package main
import (
"flag"
"fmt"
2015-07-10 12:16:39 +02:00
"log"
2015-07-30 15:55:32 +02:00
"os"
"regexp"
"sync"
2015-07-10 14:35:35 +02:00
"time"
2018-05-29 15:48:29 +02:00
_ "github.com/nats-io/go-nats"
2015-07-09 17:50:13 +02:00
)
const (
2018-12-12 14:57:08 +01:00
_Version = "v5.0.0b6"
_Producer = 0
_Consumer = 1
_Remover = 2
)
2015-07-09 17:50:13 +02:00
var (
loop bool
done chan bool
consume chan loginsList
remove chan loginsList
2016-12-20 15:37:32 +01:00
counter chan Counterchan
wg sync.WaitGroup
2015-11-23 17:39:17 +01:00
status int
2015-11-23 17:39:17 +01:00
count *Counter
2015-11-20 15:23:12 +01:00
)
2015-07-09 17:50:13 +02:00
func main() {
flag.Usage = usage
2015-07-10 14:35:35 +02:00
flag.Parse()
2015-07-09 17:50:13 +02:00
if opts.Version {
2016-10-05 11:55:50 +02:00
fmt.Println(os.Args[0], _Version)
os.Exit(0)
}
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
flag.Usage()
os.Exit(-1)
}
if opts.Influxdb != "" {
2018-12-12 13:21:39 +01:00
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
2018-12-12 13:33:26 +01:00
if opts.Debug {
fmt.Printf("Influxdb match: %+v\n", match)
}
infhost = match[1]
2018-12-12 13:33:26 +01:00
infdb = match[2] + ":" + match[3]
} else {
opts.Influxdb = ""
}
}
2015-11-20 15:23:12 +01:00
setTerm()
2015-11-19 17:12:53 +01:00
2015-07-10 12:16:39 +02:00
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
os.Exit(-4)
}
2015-07-30 10:21:01 +02:00
defer fs.Close()
2015-07-10 14:35:35 +02:00
2015-07-10 12:16:39 +02:00
log.SetOutput(fs)
// pid.PIDFile = opts.Pidfile
// pid.Write(true)
// defer pid.Remove()
2015-07-10 12:16:39 +02:00
start := time.Now()
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
2015-07-09 17:50:13 +02:00
opts.Month = start.Format("0601")
dbs.poolRedis()
2015-07-30 10:21:01 +02:00
defer dbs.rdb.Close()
2015-07-09 17:50:13 +02:00
dbs.connectMongo()
for k := range dbs.mdb {
defer dbs.mdb[k].Close()
2016-11-10 09:01:10 +01:00
}
2015-07-09 17:50:13 +02:00
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
}
2015-11-23 17:39:17 +01:00
count = NewCounter()
consume = make(chan loginsList, opts.Queue)
remove = make(chan loginsList, opts.Queue)
loop = true
done = make(chan bool)
2016-12-20 15:37:32 +01:00
counter = make(chan Counterchan)
2015-11-23 17:39:17 +01:00
2016-12-20 15:37:32 +01:00
go count.Run()
go producer()
2017-01-04 17:40:59 +01:00
for i := 0; i < opts.Queue; i++ {
for j := 0; j < len(dbs.mdb); j++ {
go consumer()
}
2017-01-04 17:40:59 +01:00
go remover()
}
2015-11-19 17:12:53 +01:00
<-done
fmt.Printf("Done\n")
close(done)
2015-11-20 15:23:12 +01:00
2015-11-23 17:39:17 +01:00
fmt.Println("Waiting WG")
wg.Wait()
count.SetTime(time.Since(start))
2016-11-10 09:01:10 +01:00
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
if opts.Influxdb != "" {
writeStats(start)
}
2015-07-10 14:35:35 +02:00
}