diff --git a/Docker/Dockerfile b/Docker/Dockerfile index 952403a..0736707 100644 --- a/Docker/Dockerfile +++ b/Docker/Dockerfile @@ -2,6 +2,6 @@ FROM scratch MAINTAINER Michele Fadda "" -COPY quota_mongodb-v2.0.1 /bin/quota_mongodb +COPY quota_mongodb-v2.0.2 /bin/quota_mongodb ENTRYPOINT [ "/bin/quota_mongodb" ] diff --git a/Docker/quota_mongodb-v2.0.1 b/Docker/quota_mongodb-v2.0.1 deleted file mode 100755 index 470b93a..0000000 Binary files a/Docker/quota_mongodb-v2.0.1 and /dev/null differ diff --git a/consumer.go b/consumer.go index cfa4a72..598f205 100644 --- a/consumer.go +++ b/consumer.go @@ -33,8 +33,6 @@ func consumer() { prod := <-consume - start := time.Now() - status = _Consumer if opts.Test { diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..a91b792 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,96 @@ +// influxdb +package main + +import ( + "fmt" + "time" + + // "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + + influxdb "github.com/influxdata/influxdb/client/v2" +) + +var ( + infdb string + infhost string +) + +type storage struct { + ID string `bson:"_id"` + Tot int64 `bson:"tot"` +} + +func getStorage() int64 { + + db := dbs.mdb.DB("quota").C("tiscali") + + // start, err0 := time.Parse("06-01-02", "17-03-20") + // stop, err1 := time.Parse("06-01-02", "17-03-21") + stop := start.Add(1 * time.Hour) + + if opts.Debug { + fmt.Printf("Start = %s - Stop = %s\n", start.String(), stop.String()) + } + + pipe := db.Pipe([]bson.M{{"$match": bson.M{"insert": bson.M{"$gte": start, "$lt": stop}}}, {"$group": bson.M{"_id": "TOT", "tot": bson.M{"$sum": "$storage"}}}}) + + var retval []storage + + err := pipe.All(&retval) + if err != nil { + fmt.Printf("Err: %+v\n", err) + return 0 + } + + if opts.Debug { + fmt.Println(db.Count()) + fmt.Printf("%d", retval[0].Tot) + } + + return retval[0].Tot +} + +func writeStats() { + if opts.Debug { + fmt.Printf("writing to influxdb server: %s", opts.Influxdb) + } + + c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: fmt.Sprintf("http://%s", infdb), + Timeout: 2 * time.Second, + }) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + defer c.Close() + + bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ + Database: "dovecot", + Precision: "s", + }) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + + qtot := getStorage() + + tags := map[string]string{"server": infhost, "domain": "quota"} + fields := map[string]interface{}{ + "user": count.GetUser(), + "storage": qtot, + "stop": count.GetTime(), + } + pt, err := influxdb.NewPoint("qmongo", tags, fields, start) + if err != nil { + fmt.Printf("Error: %+v\n", err) + return + } + + bp.AddPoint(pt) + + // Write the batch + c.Write(bp) +} diff --git a/main.go b/main.go index 505ee10..766de11 100644 --- a/main.go +++ b/main.go @@ -6,12 +6,13 @@ import ( "fmt" "log" "os" + "regexp" "sync" "time" ) const ( - _Version = "v2.0.2" + _Version = "v2.1.0" _Producer = 0 _Consumer = 1 ) @@ -30,6 +31,8 @@ var ( status int count *Counter + + start time.Time ) func main() { @@ -56,7 +59,7 @@ func main() { pid.Write(true) defer pid.Remove() - start := time.Now() + start = time.Now() fmt.Printf("Start: %+v\n\t%+v\n", opts, dbs) log.Printf("Start: %+v\n\t%+v\n", opts, dbs) @@ -65,6 +68,17 @@ func main() { os.Exit(-1) } + if opts.Influxdb != "" { + re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`) + if re.MatchString(opts.Influxdb) { + match := re.FindStringSubmatch(opts.Influxdb) + infhost = match[1] + infdb = match[2] + } else { + opts.Influxdb = "" + } + } + dbs.poolRedis() defer dbs.rdb.Close() @@ -103,4 +117,8 @@ func main() { fmt.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) log.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) + + if opts.Influxdb != "" { + writeStats() + } } diff --git a/mongodb-scripts.txt b/mongodb-scripts.txt new file mode 100644 index 0000000..fdee217 --- /dev/null +++ b/mongodb-scripts.txt @@ -0,0 +1,3 @@ +conn = new Mongo("192.168.0.1:27017") +q = conn.getDB("quota") +retval = q.tiscali.aggregate({$match: {insert: {$gte: ISODate("2017-03-15T00:00:00.000Z"), $lt: ISODate("2017-03-16T00:00:00.000Z")}}},{$group: {_id: null, tot: {$sum: "$storage"}}}).result[0].tot diff --git a/options.go b/options.go index 8f82c1f..0711c55 100644 --- a/options.go +++ b/options.go @@ -18,6 +18,7 @@ type Options struct { LogFile string ConfigFile string Timeout time.Duration + Influxdb string Debug bool Test bool Version bool @@ -40,6 +41,7 @@ func usage() { -d -c -r + -i -l -T -p @@ -65,6 +67,7 @@ func init() { flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database") flag.StringVar(&dbs.Collection, "c", dbs.Collection, "Mongodb Collection") flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis") + flag.StringVar(&opts.Influxdb, "i", "", "influxdb server") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.BoolVar(&opts.Version, "v", false, "Version") flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") diff --git a/producer.go b/producer.go index 297db57..4e16449 100644 --- a/producer.go +++ b/producer.go @@ -29,7 +29,7 @@ func producer() { loop := true - start := time.Now() + startp := time.Now() users := make(map[string]bool) for loop { @@ -56,7 +56,7 @@ func producer() { } if opts.Debug { - fmt.Printf("\nKEYS: %+v\n", time.Since(start)) + fmt.Printf("\nKEYS: %+v\n", time.Since(startp)) } max := 0 @@ -75,95 +75,3 @@ func producer() { done <- true } - -/* - - keys, err := redis.Strings(conn.Do("KEYS", "*")) - - if err != nil { - if opts.Debug { - fmt.Printf("Keys error: %v\n", err) - } - log.Printf("Keys error: %v\n", err) - exit() - } - - users := make(map[string]bool) - - for _, key := range keys { - user := key[:strings.Index(key, "@")] - users[user] = true - } - - uq := make([]userQuota, 0) - max := 0 - - for key, _ := range users { - - if !loop { - break - } - - // wg.Wait() - status = _Producer - - start := time.Now() - if opts.Test { - fmt.Printf("MGET: %s (%d)\n", key, max) - } - // estrae un userid dalla lista degli utenti che hanno fatto login - quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key))) - // se non ci sono piu' userid esce - if err != nil { - if opts.Debug { - fmt.Printf("MGET err: %v\n", err) - } - log.Printf("MGET err: %v\n", err) - continue - } - - counter <- Counterchan{ - tipo: "user", - val: 1, - } - - msg, err := strconv.Atoi(quota[0]) - if err != nil { - msg = 0 - } - store, err := strconv.Atoi(quota[1]) - if err != nil { - store = 0 - } - - uq = append(uq, userQuota{ - user: key, - messages: msg, - storage: store, - }) - - if opts.Test { - fmt.Printf("User: %s - %+v\n", key, quota) - } - - if max >= opts.MaxBulk { - if opts.Debug { - fmt.Printf("\nPROD: %+v\n", time.Since(start)) - } - - // wg.Add(1) - // counter <- Counterchan{ - // tipo: "wg", - // val: 1, - // } - - consume <- uq - uq = make([]userQuota, 0) - max = 0 - } - - max += 1 - } - - done <- true -*/