diff --git a/Docker/Dockerfile b/Docker/Dockerfile new file mode 100644 index 0000000..952403a --- /dev/null +++ b/Docker/Dockerfile @@ -0,0 +1,7 @@ +FROM scratch + +MAINTAINER Michele Fadda "" + +COPY quota_mongodb-v2.0.1 /bin/quota_mongodb + +ENTRYPOINT [ "/bin/quota_mongodb" ] diff --git a/Docker/quota_mongodb-v2.0.1 b/Docker/quota_mongodb-v2.0.1 new file mode 100755 index 0000000..470b93a Binary files /dev/null and b/Docker/quota_mongodb-v2.0.1 differ diff --git a/Docker/run.sh b/Docker/run.sh new file mode 100755 index 0000000..0fe725e --- /dev/null +++ b/Docker/run.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +docker run \ + --rm \ + -v /opt/qmongo/log:/data \ + --name qmongo_tiscali \ + --log-opt max-size=2m \ + --log-opt max-file=5 \ + mikif70/qmongo:2.0.1 \ + -l /data/llmongo.log \ + -p /data/llmongo.pid \ + -r redis-qdb.mail.tiscali.sys:6379 \ + -m 10.39.80.189:27017 \ + -d quota \ + -c tiscali \ + $@ \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..184d379 --- /dev/null +++ b/build.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo . \ No newline at end of file diff --git a/consumer.go b/consumer.go index 76289bf..cfa4a72 100644 --- a/consumer.go +++ b/consumer.go @@ -42,15 +42,17 @@ func consumer() { } // for j := range dbs.mdb { - bulk := dbs.mdb.DB("quota").C("tiscali").Bulk() + bulk := dbs.mdb.DB(dbs.Database).C(dbs.Collection).Bulk() bulk.Unordered() for p := range prod { + mlID := hash([]byte(fmt.Sprintf("%s%s", prod[p].user, start.Format("20060102T15")))) mquota := MongoQuota{ + ID: mlID, User: prod[p].user, Messages: prod[p].messages, Storage: prod[p].storage, - Insert: time.Now(), + Insert: start, } bulk.Insert(mquota) @@ -74,6 +76,6 @@ func consumer() { fmt.Printf("CONS: users=%d in %v - active=%d\n", len(prod), time.Since(start), dbs.rdb.ActiveCount()) } - //wg.Done() + wg.Done() } } diff --git a/dbs.go b/dbs.go index 4de0902..5bef3a4 100644 --- a/dbs.go +++ b/dbs.go @@ -15,23 +15,25 @@ import ( var ( dbs = Dbs{ - RedisURI: "127.0.0.1:6379", - Database: "lastlogin", + RedisURI: "127.0.0.1:6379", + Database: "", + Collection: "", } ) // Dbs structure type Dbs struct { - MongoURI string - Database string - RedisURI string - rdb *redis.Pool //*redis.Client - mdb *mgo.Session + MongoURI string + Database string + Collection string + RedisURI string + rdb *redis.Pool //*redis.Client + mdb *mgo.Session } // MongoQuota structure type MongoQuota struct { - // ID string `json:"_id" bson:"_id"` + ID string `json:"_id" bson:"_id"` User string `json:"user" bson:"user"` Messages int `json:"messages" bson:"messages"` Storage int `json:"storage" bson:"storage"` diff --git a/main.go b/main.go index 5d89e1c..505ee10 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,7 @@ import ( ) const ( - _Version = "v1.0.0" + _Version = "v2.0.2" _Producer = 0 _Consumer = 1 ) @@ -19,8 +19,9 @@ const ( var ( loop bool - done chan bool - consume chan []userQuota + done chan bool + consume chan []userQuota + mget_chan chan []string counter chan Counterchan @@ -56,10 +57,13 @@ func main() { defer pid.Remove() start := time.Now() - fmt.Printf("Start: %+v\n", opts) - log.Printf("Start: %+v\n", opts) + fmt.Printf("Start: %+v\n\t%+v\n", opts, dbs) + log.Printf("Start: %+v\n\t%+v\n", opts, dbs) - opts.Month = start.Format("0601") + if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" || dbs.Collection == "" { + flag.Usage() + os.Exit(-1) + } dbs.poolRedis() defer dbs.rdb.Close() @@ -74,12 +78,16 @@ func main() { count = NewCounter() consume = make(chan []userQuota, opts.Queue) + mget_chan = make(chan []string, opts.Queue*10) loop = true done = make(chan bool) counter = make(chan Counterchan) go count.Run() go producer() + for i := 0; i < opts.Queue*10; i++ { + go mget() + } for i := 0; i < opts.Queue; i++ { go consumer() } diff --git a/mget.go b/mget.go new file mode 100644 index 0000000..afa959c --- /dev/null +++ b/mget.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + "log" + "strconv" + "time" + + "github.com/garyburd/redigo/redis" +) + +func mget() { + + conn := dbs.rdb.Get() + defer conn.Close() + + for { + + get := <-mget_chan + + start := time.Now() + + uq := make([]userQuota, 0) + + for _, key := range get { + + // estrae un userid dalla lista degli utenti che hanno fatto login + quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key))) + // se non ci sono piu' userid esce + if err != nil { + if opts.Debug { + fmt.Printf("MGET err: %v\n", err) + } + log.Printf("MGET err: %v\n", err) + continue + } + counter <- Counterchan{ + tipo: "user", + val: 1, + } + + msg, err := strconv.Atoi(quota[0]) + if err != nil { + msg = 0 + } + store, err := strconv.Atoi(quota[1]) + if err != nil { + store = 0 + } + + uq = append(uq, userQuota{ + user: key, + messages: msg, + storage: store, + }) + } + + if opts.Debug { + fmt.Printf("\nMGET: %+v\n", time.Since(start)) + } + + wg.Add(1) + consume <- uq + + wg.Done() + } +} diff --git a/options.go b/options.go index b394e0a..8f82c1f 100644 --- a/options.go +++ b/options.go @@ -34,7 +34,18 @@ var ( ) func usage() { - fmt.Println("Usage: llmongo -m -r -t -l -T -x -H -i -v") + fmt.Println( + `Usage: llmongo + -m + -d + -c + -r + -l + -T + -p + -M + -q + -v`) fmt.Println() os.Exit(0) } @@ -52,13 +63,14 @@ func init() { flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb") flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database") + flag.StringVar(&dbs.Collection, "c", dbs.Collection, "Mongodb Collection") flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.BoolVar(&opts.Version, "v", false, "Version") flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") flag.BoolVar(&opts.Debug, "D", false, "Debug") flag.BoolVar(&opts.Test, "DD", false, "Test") - flag.IntVar(&opts.MaxBulk, "M", 100, "Max Mongodb bulk") + flag.IntVar(&opts.MaxBulk, "M", 1000, "Max Mongodb bulk") flag.IntVar(&opts.Queue, "q", 2, "parallel consumer") flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file") } diff --git a/producer.go b/producer.go index 246dd2b..297db57 100644 --- a/producer.go +++ b/producer.go @@ -3,7 +3,6 @@ package main import ( "fmt" - "log" "strconv" "strings" "time" @@ -26,6 +25,59 @@ func producer() { conn := dbs.rdb.Get() defer conn.Close() + val := "0" + + loop := true + + start := time.Now() + users := make(map[string]bool) + for loop { + + status = _Producer + + retval, err := redis.Values(conn.Do("SCAN", val, "COUNT", strconv.Itoa(opts.MaxBulk))) + + keys, _ := redis.Strings(retval[1], err) + + for _, key := range keys { + user := key[:strings.Index(key, "@")] + users[user] = true + } + + val, _ = redis.String(retval[0], err) + + if opts.Debug { + fmt.Println(val, len(users)) + } + + if val == "0" { + loop = false + } + } + + if opts.Debug { + fmt.Printf("\nKEYS: %+v\n", time.Since(start)) + } + + max := 0 + block := make([]string, 0) + for user, _ := range users { + block = append(block, user) + if max >= opts.MaxBulk { + wg.Add(1) + mget_chan <- block + max = 0 + block = make([]string, 0) + continue + } + max += 1 + } + + done <- true +} + +/* + keys, err := redis.Strings(conn.Do("KEYS", "*")) if err != nil { @@ -114,4 +166,4 @@ func producer() { } done <- true -} +*/