aggiunta l'opzione per scrivere la somma su influxdb

This commit is contained in:
Michele 2017-03-20 17:20:26 +01:00
parent a6a959427d
commit 87bd709630
8 changed files with 125 additions and 99 deletions

View file

@ -2,6 +2,6 @@ FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>" MAINTAINER Michele Fadda "<mikif70@gmail.com>"
COPY quota_mongodb-v2.0.1 /bin/quota_mongodb COPY quota_mongodb-v2.0.2 /bin/quota_mongodb
ENTRYPOINT [ "/bin/quota_mongodb" ] ENTRYPOINT [ "/bin/quota_mongodb" ]

Binary file not shown.

View file

@ -33,8 +33,6 @@ func consumer() {
prod := <-consume prod := <-consume
start := time.Now()
status = _Consumer status = _Consumer
if opts.Test { if opts.Test {

96
influxdb.go Normal file
View file

@ -0,0 +1,96 @@
// influxdb
package main
import (
"fmt"
"time"
// "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
influxdb "github.com/influxdata/influxdb/client/v2"
)
var (
infdb string
infhost string
)
type storage struct {
ID string `bson:"_id"`
Tot int64 `bson:"tot"`
}
func getStorage() int64 {
db := dbs.mdb.DB("quota").C("tiscali")
// start, err0 := time.Parse("06-01-02", "17-03-20")
// stop, err1 := time.Parse("06-01-02", "17-03-21")
stop := start.Add(1 * time.Hour)
if opts.Debug {
fmt.Printf("Start = %s - Stop = %s\n", start.String(), stop.String())
}
pipe := db.Pipe([]bson.M{{"$match": bson.M{"insert": bson.M{"$gte": start, "$lt": stop}}}, {"$group": bson.M{"_id": "TOT", "tot": bson.M{"$sum": "$storage"}}}})
var retval []storage
err := pipe.All(&retval)
if err != nil {
fmt.Printf("Err: %+v\n", err)
return 0
}
if opts.Debug {
fmt.Println(db.Count())
fmt.Printf("%d", retval[0].Tot)
}
return retval[0].Tot
}
func writeStats() {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: fmt.Sprintf("http://%s", infdb),
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
}
defer c.Close()
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: "dovecot",
Precision: "s",
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
}
qtot := getStorage()
tags := map[string]string{"server": infhost, "domain": "quota"}
fields := map[string]interface{}{
"user": count.GetUser(),
"storage": qtot,
"stop": count.GetTime(),
}
pt, err := influxdb.NewPoint("qmongo", tags, fields, start)
if err != nil {
fmt.Printf("Error: %+v\n", err)
return
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}

22
main.go
View file

@ -6,12 +6,13 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"regexp"
"sync" "sync"
"time" "time"
) )
const ( const (
_Version = "v2.0.2" _Version = "v2.1.0"
_Producer = 0 _Producer = 0
_Consumer = 1 _Consumer = 1
) )
@ -30,6 +31,8 @@ var (
status int status int
count *Counter count *Counter
start time.Time
) )
func main() { func main() {
@ -56,7 +59,7 @@ func main() {
pid.Write(true) pid.Write(true)
defer pid.Remove() defer pid.Remove()
start := time.Now() start = time.Now()
fmt.Printf("Start: %+v\n\t%+v\n", opts, dbs) fmt.Printf("Start: %+v\n\t%+v\n", opts, dbs)
log.Printf("Start: %+v\n\t%+v\n", opts, dbs) log.Printf("Start: %+v\n\t%+v\n", opts, dbs)
@ -65,6 +68,17 @@ func main() {
os.Exit(-1) os.Exit(-1)
} }
if opts.Influxdb != "" {
re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
infhost = match[1]
infdb = match[2]
} else {
opts.Influxdb = ""
}
}
dbs.poolRedis() dbs.poolRedis()
defer dbs.rdb.Close() defer dbs.rdb.Close()
@ -103,4 +117,8 @@ func main() {
fmt.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) fmt.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser())
log.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) log.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser())
if opts.Influxdb != "" {
writeStats()
}
} }

3
mongodb-scripts.txt Normal file
View file

@ -0,0 +1,3 @@
conn = new Mongo("192.168.0.1:27017")
q = conn.getDB("quota")
retval = q.tiscali.aggregate({$match: {insert: {$gte: ISODate("2017-03-15T00:00:00.000Z"), $lt: ISODate("2017-03-16T00:00:00.000Z")}}},{$group: {_id: null, tot: {$sum: "$storage"}}}).result[0].tot

View file

@ -18,6 +18,7 @@ type Options struct {
LogFile string LogFile string
ConfigFile string ConfigFile string
Timeout time.Duration Timeout time.Duration
Influxdb string
Debug bool Debug bool
Test bool Test bool
Version bool Version bool
@ -40,6 +41,7 @@ func usage() {
-d <database> -d <database>
-c <collection> -c <collection>
-r <redis uri> -r <redis uri>
-i <influxdb [localname@ip:port]>
-l <logfile> -l <logfile>
-T <running ttl> -T <running ttl>
-p <pid filename> -p <pid filename>
@ -65,6 +67,7 @@ func init() {
flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database") flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database")
flag.StringVar(&dbs.Collection, "c", dbs.Collection, "Mongodb Collection") flag.StringVar(&dbs.Collection, "c", dbs.Collection, "Mongodb Collection")
flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis") flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis")
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.BoolVar(&opts.Version, "v", false, "Version") flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")

View file

@ -29,7 +29,7 @@ func producer() {
loop := true loop := true
start := time.Now() startp := time.Now()
users := make(map[string]bool) users := make(map[string]bool)
for loop { for loop {
@ -56,7 +56,7 @@ func producer() {
} }
if opts.Debug { if opts.Debug {
fmt.Printf("\nKEYS: %+v\n", time.Since(start)) fmt.Printf("\nKEYS: %+v\n", time.Since(startp))
} }
max := 0 max := 0
@ -75,95 +75,3 @@ func producer() {
done <- true done <- true
} }
/*
keys, err := redis.Strings(conn.Do("KEYS", "*"))
if err != nil {
if opts.Debug {
fmt.Printf("Keys error: %v\n", err)
}
log.Printf("Keys error: %v\n", err)
exit()
}
users := make(map[string]bool)
for _, key := range keys {
user := key[:strings.Index(key, "@")]
users[user] = true
}
uq := make([]userQuota, 0)
max := 0
for key, _ := range users {
if !loop {
break
}
// wg.Wait()
status = _Producer
start := time.Now()
if opts.Test {
fmt.Printf("MGET: %s (%d)\n", key, max)
}
// estrae un userid dalla lista degli utenti che hanno fatto login
quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key)))
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("MGET err: %v\n", err)
}
log.Printf("MGET err: %v\n", err)
continue
}
counter <- Counterchan{
tipo: "user",
val: 1,
}
msg, err := strconv.Atoi(quota[0])
if err != nil {
msg = 0
}
store, err := strconv.Atoi(quota[1])
if err != nil {
store = 0
}
uq = append(uq, userQuota{
user: key,
messages: msg,
storage: store,
})
if opts.Test {
fmt.Printf("User: %s - %+v\n", key, quota)
}
if max >= opts.MaxBulk {
if opts.Debug {
fmt.Printf("\nPROD: %+v\n", time.Since(start))
}
// wg.Add(1)
// counter <- Counterchan{
// tipo: "wg",
// val: 1,
// }
consume <- uq
uq = make([]userQuota, 0)
max = 0
}
max += 1
}
done <- true
*/