quota_mongodb/consumer.go

91 lines
1.4 KiB
Go

// consumer
package main
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
// "strconv"
// "strings"
"time"
// "gopkg.in/mgo.v2"
)
type consumed struct {
user string
error bool
logins []string
}
func hash(val []byte) string {
h := sha256.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func consumer() {
for {
lstart := time.Now()
prod := <-consume
status = _Consumer
if opts.Test {
fmt.Printf("CONS: users - %d\n", len(prod))
}
// for j := range dbs.mdb {
bulk := dbs.mdb.DB(dbs.Database).C(dbs.Collection).Bulk()
bulk.Unordered()
for p := range prod {
mlID := hash([]byte(fmt.Sprintf("%s%s", prod[p].user, start.Format("20060102T15"))))
mquota := MongoQuota{
ID: mlID,
User: prod[p].user,
Messages: prod[p].messages,
Storage: prod[p].storage,
Insert: start,
}
bulk.Insert(mquota)
counter <- Counterchan{
tipo: "msg",
val: prod[p].messages,
}
counter <- Counterchan{
tipo: "storage",
val: prod[p].storage,
}
if opts.Test {
log.Printf("OK: %s\n", prod[p].user)
}
}
_, err := bulk.Run()
if err != nil {
fmt.Printf("Err: %+v\n", err)
counter <- Counterchan{
tipo: "err",
val: 1,
}
continue
}
if opts.Debug {
fmt.Printf("CONS: users=%d in %v - active=%d\n", len(prod), time.Since(lstart), dbs.rdb.ActiveCount())
}
wg.Done()
}
}