aggiunta la possibilita' di scrivere su piu' mongodb

This commit is contained in:
Michele 2017-02-10 16:56:09 +01:00
parent 1b36fde70a
commit 8228f63fff
6 changed files with 98 additions and 273 deletions

View file

@ -2,10 +2,9 @@
package main
import (
"fmt"
// "github.com/garyburd/redigo/redis"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"strconv"
"strings"
@ -39,23 +38,13 @@ func consumer() {
status = _Consumer
var bulk = make(map[string]*mgo.Bulk)
var rtbulk []MongoLogin
var col = make(map[string]*mgo.Collection)
var slogin = make(map[string][]string)
if opts.Bulk {
bulk[opts.Month] = dbs.ll.Bulk()
bulk[opts.Month].Unordered()
} else {
col[opts.Month] = dbs.ll
}
var bulk = make(map[string][]*mgo.Bulk)
var allLogins = make(map[string]MongoLogin)
cons := consumed{
user: prod.user,
logins: make([]string, 0),
error: false,
empty: true,
}
for i := range prod.logins {
@ -80,9 +69,10 @@ func consumer() {
cons.logins = append(cons.logins, login)
continue
}
mlID := hash([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
ml := MongoLogin{
// genera l' _ID con user e timestamp
ID: hash([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))), // Format("20060102T150405")
ID: mlID,
User: prod.user,
Protocol: sval[0],
IP: sval[2],
@ -90,137 +80,38 @@ func consumer() {
Insert: time.Now(),
}
if opts.Month != ml.Date.Format("0601") {
dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))
if opts.Bulk {
if dbs.isMongodb() {
allLogins[mlID] = ml
}
for _, val := range allLogins {
dt := fmt.Sprintf("lastlogin_%s", val.Date.Format("0601"))
if _, ok := bulk[dt]; !ok {
bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk()
bulk[dt].Unordered()
}
bulk[dt].Insert(ml)
slogin[dt] = append(slogin[dt], login)
}
if dbs.isRethink() {
rtbulk = append(rtbulk, ml)
slogin["rt"] = append(slogin["rt"], login)
}
} else {
if dbs.isMongodb() {
if _, ok := col[dt]; !ok {
col[dt] = dbs.mdb.DB("lastlogin").C(dt)
}
err = col[dt].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Mongo Insert Err: %+v\n", err)
cons.error = true
counter <- Counterchan{
tipo: "err",
val: 1,
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: 1,
}
}
}
}
if dbs.isRethink() {
_, err = dbs.rtdb.Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") {
fmt.Printf("RT Insert Err: %+v\n", err)
cons.error = true
counter <- Counterchan{
tipo: "err",
val: 1,
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: 1,
for j := range dbs.mdb {
b := dbs.mdb[j].DB("lastlogin").C(dt).Bulk()
b.Unordered()
bulk[dt] = append(bulk[dt], b)
}
}
for _, bl := range bulk[dt] {
bl.Insert(val)
}
}
cons.logins = append(cons.logins, login)
}
} else {
// inserisce il login su Mongodb
if opts.Bulk {
if dbs.isMongodb() {
bulk[opts.Month].Insert(ml)
slogin[opts.Month] = append(slogin[opts.Month], login)
}
if dbs.isRethink() {
rtbulk = append(rtbulk, ml)
slogin["rt"] = append(slogin["rt"], login)
}
} else {
if dbs.isMongodb() {
err = col[opts.Month].Insert(ml)
for _, val := range bulk {
for j, bl := range val {
_, err := bl.Run()
if j == 0 {
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
counter <- Counterchan{
tipo: "err",
val: 1,
val: len(prod.logins),
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: 1,
}
}
}
}
if dbs.isRethink() {
resp, err := dbs.rtdb.Insert(ml)
counter <- Counterchan{
tipo: "ins",
val: resp.Inserted,
}
if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") {
fmt.Printf("RT Insert Err: %+v\n", err)
cons.error = true
counter <- Counterchan{
tipo: "err",
val: 1,
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: 1,
}
}
}
}
cons.logins = append(cons.logins, login)
}
}
}
if opts.Bulk {
if dbs.isMongodb() {
for key, _ := range bulk {
_, err := bulk[key].Run()
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
counter <- Counterchan{
tipo: "err",
val: len(slogin[key]),
if opts.Test {
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
}
continue
} else {
@ -229,33 +120,16 @@ func consumer() {
val: strings.Count(err.Error(), "E11000"),
}
}
}
cons.logins = append(cons.logins, slogin[key]...)
}
}
if dbs.isRethink() {
resp, err := dbs.rtdb.MultiInsert(rtbulk)
counter <- Counterchan{
tipo: "ins",
val: resp.Inserted,
}
if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") {
cons.error = true
counter <- Counterchan{
tipo: "err",
val: 1,
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: 1,
if opts.Test {
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
}
}
}
}
cons.logins = append(cons.logins, slogin["rt"]...)
}
cons.logins = append(cons.logins, prod.logins...)
}
counter <- Counterchan{
@ -263,10 +137,6 @@ func consumer() {
val: len(prod.logins),
}
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
cons.empty = false
}
if opts.Debug {
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}

59
dbs.go
View file

@ -2,15 +2,14 @@
package main
import (
"github.com/garyburd/redigo/redis"
// "github.com/fzzy/radix/redis"
"fmt"
// "fmt"
"log"
"os"
"strings"
"time"
// rt "gopkg.in/dancannon/gorethink.v2"
"github.com/garyburd/redigo/redis"
"gopkg.in/mgo.v2"
)
@ -26,12 +25,8 @@ type Dbs struct {
MongoURI string
Database string
RedisURI string
RethinkURI string
rdb *redis.Pool //*redis.Client
rtdb *Rethink
mdb *mgo.Session
ll *mgo.Collection
// us *mgo.Collection
mdb []*mgo.Session
}
// MongoLogin structure
@ -62,14 +57,6 @@ type Index struct {
Date time.Time `json:"date"`
}
func (db *Dbs) isRethink() bool {
if db.RethinkURI != "" {
return true
}
return false
}
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
@ -78,31 +65,6 @@ func (db *Dbs) isMongodb() bool {
return false
}
func (db *Dbs) poolRethink() {
var err error
if opts.Debug {
fmt.Printf("DBS: %+v\n", db)
}
if db.RethinkURI != "" {
uri := strings.Split(dbs.RethinkURI, ",")
if opts.Debug {
fmt.Printf("RT_URI: %s\n", uri)
}
db.rtdb, err = NewRethinkDB(uri)
if err != nil {
fmt.Println("RethinkDB connect Error: ", err.Error())
os.Exit(-4)
}
}
if opts.Debug {
fmt.Printf("DBS: %+v\n", db)
}
}
func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{
@ -126,12 +88,17 @@ func (db *Dbs) poolRedis() {
}
func (db *Dbs) connectMongo() {
var err error
db.mdb, err = mgo.Dial(db.MongoURI)
mongoList := strings.Split(db.MongoURI, ",")
for m := range mongoList {
nm, err := mgo.Dial(mongoList[m])
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
db.ll = db.mdb.DB(db.Database).C(fmt.Sprintf("lastlogin_%s", opts.Month))
nm.SetSocketTimeout(5 * time.Second)
nm.SetSyncTimeout(5 * time.Second)
db.mdb = append(db.mdb, nm)
}
}

13
main.go
View file

@ -11,7 +11,7 @@ import (
)
const (
_Version = "v3.2.0"
_Version = "v4.0.0"
_Producer = 0
_Consumer = 1
_Remover = 2
@ -74,14 +74,9 @@ func main() {
dbs.poolRedis()
defer dbs.rdb.Close()
if dbs.isMongodb() {
dbs.connectMongo()
defer dbs.mdb.Close()
}
if dbs.isRethink() {
dbs.poolRethink()
defer dbs.rtdb.Close()
for k := range dbs.mdb {
defer dbs.mdb[k].Close()
}
if opts.Timeout > 0 {
@ -99,7 +94,9 @@ func main() {
go count.Run()
go producer()
for i := 0; i < opts.Queue; i++ {
for j := 0; j < len(dbs.mdb); j++ {
go consumer()
}
go remover()
}

View file

@ -2,10 +2,8 @@
package main
import (
// "encoding/json"
"flag"
"fmt"
// "io/ioutil"
"log"
"os"
"path"
@ -21,10 +19,9 @@ type Options struct {
LogFile string
ConfigFile string
Timeout time.Duration
MaxLogins int
Debug bool
Test bool
Version bool
Bulk bool
MaxError int
Influxdb string
Hostname string
@ -37,7 +34,6 @@ var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
MaxLogins: -1,
}
)
@ -63,14 +59,12 @@ func init() {
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis")
flag.StringVar(&dbs.RethinkURI, "R", "", "Rethink DB")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.IntVar(&opts.MaxLogins, "L", opts.MaxLogins, "Max lastlogins")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Bulk, "B", false, "Bulk")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")

View file

@ -4,7 +4,7 @@ package main
import (
"fmt"
"log"
"strconv"
// "strconv"
"time"
"github.com/garyburd/redigo/redis"
@ -27,39 +27,29 @@ func producer() {
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))
// if opts.Debug {
// log.Printf("SPOP: %+v - %+v\n", user, err)
// fmt.Printf("SPOP: %+v - %+v\n", user, err)
// }
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("LLINDEX empty: %v\n", err)
}
log.Printf("LLINDEX empty: %v\n", err)
//loop[id] = false
//done[id] <- true
break
}
// estrae <MaxLogins> login dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "1", strconv.Itoa(opts.MaxLogins)))
// estrae tutti i logins dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "1", "-1"))
if err != nil {
if opts.Debug {
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
}
log.Printf("LRANGE: %+v - %+v\n", err, logs)
}
// if opts.Debug {
// fmt.Printf("LRANGE: %s - %d\n", user, len(logs))
// log.Printf("LRANGE: %s - %d\n", user, len(logs))
// }
if opts.Debug {
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
// log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
if len(logs) > 0 {
counter <- Counterchan{
tipo: "user",
val: 1,
@ -74,11 +64,15 @@ func producer() {
fmt.Printf("PROD: %+v\n", time.Since(start))
}
if opts.Test {
log.Printf("PROD: %s - %d\n", user, len(logs))
}
consume <- produced{
user: user,
logins: logs,
}
}
}
done <- true

View file

@ -3,7 +3,7 @@ package main
import (
"fmt"
// "log"
"time"
)
@ -17,16 +17,16 @@ func remover() {
status = _Remover
// wg.Add(1)
start := time.Now()
if !rem.error {
for i := range rem.logins {
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", rem.logins[i])
}
}
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
if !rem.empty || rem.error {
if rem.error {
if opts.Debug {
fmt.Printf("SADD: %s\n", rem.user)
}
@ -37,10 +37,13 @@ func remover() {
}
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush()
if !rem.error {
counter <- Counterchan{
tipo: "rem",
val: len(rem.logins),
}
}
if opts.Debug {
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))