utilizza SCAN

This commit is contained in:
Michele 2017-03-13 15:28:20 +01:00
parent 0c9c9710ce
commit a6a959427d
10 changed files with 190 additions and 21 deletions

7
Docker/Dockerfile Normal file
View file

@ -0,0 +1,7 @@
FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
COPY quota_mongodb-v2.0.1 /bin/quota_mongodb
ENTRYPOINT [ "/bin/quota_mongodb" ]

BIN
Docker/quota_mongodb-v2.0.1 Executable file

Binary file not shown.

16
Docker/run.sh Executable file
View file

@ -0,0 +1,16 @@
#!/bin/bash
docker run \
--rm \
-v /opt/qmongo/log:/data \
--name qmongo_tiscali \
--log-opt max-size=2m \
--log-opt max-file=5 \
mikif70/qmongo:2.0.1 \
-l /data/llmongo.log \
-p /data/llmongo.pid \
-r redis-qdb.mail.tiscali.sys:6379 \
-m 10.39.80.189:27017 \
-d quota \
-c tiscali \
$@

3
build.sh Executable file
View file

@ -0,0 +1,3 @@
#!/bin/bash
CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo .

View file

@ -42,15 +42,17 @@ func consumer() {
}
// for j := range dbs.mdb {
bulk := dbs.mdb.DB("quota").C("tiscali").Bulk()
bulk := dbs.mdb.DB(dbs.Database).C(dbs.Collection).Bulk()
bulk.Unordered()
for p := range prod {
mlID := hash([]byte(fmt.Sprintf("%s%s", prod[p].user, start.Format("20060102T15"))))
mquota := MongoQuota{
ID: mlID,
User: prod[p].user,
Messages: prod[p].messages,
Storage: prod[p].storage,
Insert: time.Now(),
Insert: start,
}
bulk.Insert(mquota)
@ -74,6 +76,6 @@ func consumer() {
fmt.Printf("CONS: users=%d in %v - active=%d\n", len(prod), time.Since(start), dbs.rdb.ActiveCount())
}
//wg.Done()
wg.Done()
}
}

18
dbs.go
View file

@ -15,23 +15,25 @@ import (
var (
dbs = Dbs{
RedisURI: "127.0.0.1:6379",
Database: "lastlogin",
RedisURI: "127.0.0.1:6379",
Database: "",
Collection: "",
}
)
// Dbs structure
type Dbs struct {
MongoURI string
Database string
RedisURI string
rdb *redis.Pool //*redis.Client
mdb *mgo.Session
MongoURI string
Database string
Collection string
RedisURI string
rdb *redis.Pool //*redis.Client
mdb *mgo.Session
}
// MongoQuota structure
type MongoQuota struct {
// ID string `json:"_id" bson:"_id"`
ID string `json:"_id" bson:"_id"`
User string `json:"user" bson:"user"`
Messages int `json:"messages" bson:"messages"`
Storage int `json:"storage" bson:"storage"`

20
main.go
View file

@ -11,7 +11,7 @@ import (
)
const (
_Version = "v1.0.0"
_Version = "v2.0.2"
_Producer = 0
_Consumer = 1
)
@ -19,8 +19,9 @@ const (
var (
loop bool
done chan bool
consume chan []userQuota
done chan bool
consume chan []userQuota
mget_chan chan []string
counter chan Counterchan
@ -56,10 +57,13 @@ func main() {
defer pid.Remove()
start := time.Now()
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
fmt.Printf("Start: %+v\n\t%+v\n", opts, dbs)
log.Printf("Start: %+v\n\t%+v\n", opts, dbs)
opts.Month = start.Format("0601")
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" || dbs.Collection == "" {
flag.Usage()
os.Exit(-1)
}
dbs.poolRedis()
defer dbs.rdb.Close()
@ -74,12 +78,16 @@ func main() {
count = NewCounter()
consume = make(chan []userQuota, opts.Queue)
mget_chan = make(chan []string, opts.Queue*10)
loop = true
done = make(chan bool)
counter = make(chan Counterchan)
go count.Run()
go producer()
for i := 0; i < opts.Queue*10; i++ {
go mget()
}
for i := 0; i < opts.Queue; i++ {
go consumer()
}

67
mget.go Normal file
View file

@ -0,0 +1,67 @@
package main
import (
"fmt"
"log"
"strconv"
"time"
"github.com/garyburd/redigo/redis"
)
func mget() {
conn := dbs.rdb.Get()
defer conn.Close()
for {
get := <-mget_chan
start := time.Now()
uq := make([]userQuota, 0)
for _, key := range get {
// estrae un userid dalla lista degli utenti che hanno fatto login
quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key)))
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("MGET err: %v\n", err)
}
log.Printf("MGET err: %v\n", err)
continue
}
counter <- Counterchan{
tipo: "user",
val: 1,
}
msg, err := strconv.Atoi(quota[0])
if err != nil {
msg = 0
}
store, err := strconv.Atoi(quota[1])
if err != nil {
store = 0
}
uq = append(uq, userQuota{
user: key,
messages: msg,
storage: store,
})
}
if opts.Debug {
fmt.Printf("\nMGET: %+v\n", time.Since(start))
}
wg.Add(1)
consume <- uq
wg.Done()
}
}

View file

@ -34,7 +34,18 @@ var (
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -T <running ttl> -x <xymon server> -H <hostname> -i <influxdb uri> -v")
fmt.Println(
`Usage: llmongo
-m <mongo uri>
-d <database>
-c <collection>
-r <redis uri>
-l <logfile>
-T <running ttl>
-p <pid filename>
-M <bulk size>
-q <num consumer>
-v`)
fmt.Println()
os.Exit(0)
}
@ -52,13 +63,14 @@ func init() {
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database")
flag.StringVar(&dbs.Collection, "c", dbs.Collection, "Mongodb Collection")
flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxBulk, "M", 100, "Max Mongodb bulk")
flag.IntVar(&opts.MaxBulk, "M", 1000, "Max Mongodb bulk")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")
}

View file

@ -3,7 +3,6 @@ package main
import (
"fmt"
"log"
"strconv"
"strings"
"time"
@ -26,6 +25,59 @@ func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
val := "0"
loop := true
start := time.Now()
users := make(map[string]bool)
for loop {
status = _Producer
retval, err := redis.Values(conn.Do("SCAN", val, "COUNT", strconv.Itoa(opts.MaxBulk)))
keys, _ := redis.Strings(retval[1], err)
for _, key := range keys {
user := key[:strings.Index(key, "@")]
users[user] = true
}
val, _ = redis.String(retval[0], err)
if opts.Debug {
fmt.Println(val, len(users))
}
if val == "0" {
loop = false
}
}
if opts.Debug {
fmt.Printf("\nKEYS: %+v\n", time.Since(start))
}
max := 0
block := make([]string, 0)
for user, _ := range users {
block = append(block, user)
if max >= opts.MaxBulk {
wg.Add(1)
mget_chan <- block
max = 0
block = make([]string, 0)
continue
}
max += 1
}
done <- true
}
/*
keys, err := redis.Strings(conn.Do("KEYS", "*"))
if err != nil {
@ -114,4 +166,4 @@ func producer() {
}
done <- true
}
*/