rethinkDB

This commit is contained in:
Michele 2016-11-10 09:01:10 +01:00
parent 94606006a8
commit a3a02eac55
5 changed files with 199 additions and 68 deletions

View file

@ -28,6 +28,7 @@ func consumer() {
status = _Consumer
var bulk = make(map[string]*mgo.Bulk)
var rtbulk []MongoLogin
var col = make(map[string]*mgo.Collection)
var slogin = make(map[string][]string)
@ -81,65 +82,126 @@ func consumer() {
if opts.Month != ml.Date.Format("0601") {
dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601"))
if opts.Bulk {
if _, ok := bulk[dt]; !ok {
bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk()
bulk[dt].Unordered()
if dbs.isMongodb() {
if _, ok := bulk[dt]; !ok {
bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk()
bulk[dt].Unordered()
}
bulk[dt].Insert(ml)
slogin[dt] = append(slogin[dt], login)
}
if dbs.isRethink() {
rtbulk = append(rtbulk, ml)
slogin["rt"] = append(slogin["rt"], login)
}
bulk[dt].Insert(ml)
slogin[dt] = append(slogin[dt], login)
} else {
if _, ok := col[dt]; !ok {
col[dt] = dbs.mdb.DB("lastlogin").C(dt)
}
err = col[dt].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
if dbs.isMongodb() {
if _, ok := col[dt]; !ok {
col[dt] = dbs.mdb.DB("lastlogin").C(dt)
}
err = col[dt].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Mongo Insert Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
}
}
}
if dbs.isRethink() {
_, err = dbs.rtdb.Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") {
fmt.Printf("RT Insert Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
}
}
}
cons.logins = append(cons.logins, login)
}
} else {
// inserisce il login su Mongodb
if opts.Bulk {
bulk[opts.Month].Insert(ml)
slogin[opts.Month] = append(slogin[opts.Month], login)
if dbs.isMongodb() {
bulk[opts.Month].Insert(ml)
slogin[opts.Month] = append(slogin[opts.Month], login)
}
if dbs.isRethink() {
rtbulk = append(rtbulk, ml)
slogin["rt"] = append(slogin["rt"], login)
}
} else {
err = col[opts.Month].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
if dbs.isMongodb() {
err = col[opts.Month].Insert(ml)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
}
}
}
if dbs.isRethink() {
resp, err := dbs.rtdb.Insert(ml)
count.AddInsert(resp.Inserted)
if err != nil {
if !strings.Contains(err.Error(), "Duplicate primary key") {
fmt.Printf("RT Insert Err: %+v\n", err)
cons.error = true
count.AddErr(1)
continue
} else {
count.AddDuplicate(1)
}
}
}
cons.logins = append(cons.logins, login)
}
}
}
if opts.Bulk {
for key, _ := range bulk {
_, err := bulk[key].Run()
if dbs.isMongodb() {
for key, _ := range bulk {
_, err := bulk[key].Run()
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
cons.error = true
count.AddErr(len(slogin[key]))
continue
} else {
count.AddDuplicate(strings.Count(err.Error(), "E11000"))
}
}
cons.logins = append(cons.logins, slogin[key]...)
}
}
if dbs.isRethink() {
resp, err := dbs.rtdb.MultiInsert(rtbulk)
count.AddInsert(resp.Inserted)
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
if !strings.Contains(err.Error(), "Duplicate primary key") {
cons.error = true
count.AddErr(len(slogin[key]))
count.AddErr(resp.Errors)
continue
} else {
count.AddDuplicate(strings.Count(err.Error(), "E11000"))
count.AddDuplicate(resp.Errors)
}
}
cons.logins = append(cons.logins, slogin[key]...)
cons.logins = append(cons.logins, slogin["rt"]...)
}
}

View file

@ -8,26 +8,28 @@ import (
// Counter structure
type Counter struct {
mu sync.Mutex
user int
log int
rem int
err int
dup int
time time.Duration
wg int
mu sync.Mutex
user int
log int
insert int
rem int
err int
dup int
time time.Duration
wg int
}
// NewCounter iniitialized Counter structure
func NewCounter() *Counter {
return &Counter{
user: 0,
log: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: 0,
user: 0,
log: 0,
insert: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: 0,
}
}
@ -45,6 +47,13 @@ func (c *Counter) AddDuplicate(add int) {
c.dup += add
}
// AddInsert increment number of inserted rows
func (c *Counter) AddInsert(add int) {
c.mu.Lock()
defer c.mu.Unlock()
c.insert += add
}
// AddLog increment number of log's rows managed
func (c *Counter) AddLog(add int) {
c.mu.Lock()
@ -104,6 +113,14 @@ func (c *Counter) GetLog() (ret int) {
return
}
// GetInsert return total inserted rows
func (c *Counter) GetInsert() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.insert
return
}
// GetErr return total errors
func (c *Counter) GetErr() (ret int) {
c.mu.Lock()

70
dbs.go
View file

@ -7,14 +7,15 @@ import (
"fmt"
"log"
"os"
"strings"
"time"
// rt "gopkg.in/dancannon/gorethink.v2"
"gopkg.in/mgo.v2"
)
var (
dbs = Dbs{
MongoURI: "mongodb://127.0.0.1:27018",
RedisURI: "127.0.0.1:6379",
Database: "lastlogin",
}
@ -22,23 +23,25 @@ var (
// Dbs structure
type Dbs struct {
MongoURI string
Database string
RedisURI string
rdb *redis.Pool //*redis.Client
mdb *mgo.Session
ll *mgo.Collection
MongoURI string
Database string
RedisURI string
RethinkURI string
rdb *redis.Pool //*redis.Client
rtdb *Rethink
mdb *mgo.Session
ll *mgo.Collection
// us *mgo.Collection
}
// MongoLogin structure
type MongoLogin struct {
ID string `json:"_id" bson:"_id"`
User string `json:"user"`
Protocol string `json:"protocol"`
IP string `json:"ip"`
Date time.Time `json:"date"`
Insert time.Time `json:"insert"`
ID string `json:"_id" bson:"_id" gorethink:"id"`
User string `json:"user" bson:"user" gorethink:"user"`
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
IP string `json:"ip" bson:"ip" gorethink:"ip"`
Date time.Time `json:"date" bson:"date" gorethink:"date"`
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
}
// Ips structure
@ -59,6 +62,47 @@ type Index struct {
Date time.Time `json:"date"`
}
func (db *Dbs) isRethink() bool {
if db.RethinkURI != "" {
return true
}
return false
}
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
}
return false
}
func (db *Dbs) poolRethink() {
var err error
if opts.Debug {
fmt.Printf("DBS: %+v\n", db)
}
if db.RethinkURI != "" {
uri := strings.Split(dbs.RethinkURI, ",")
if opts.Debug {
fmt.Printf("RT_URI: %s\n", uri)
}
db.rtdb, err = NewRethinkDB(uri)
if err != nil {
fmt.Println("RethinkDB connect Error: ", err.Error())
os.Exit(-4)
}
}
if opts.Debug {
fmt.Printf("DBS: %+v\n", db)
}
}
func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{

15
main.go
View file

@ -72,8 +72,15 @@ func main() {
dbs.poolRedis()
defer dbs.rdb.Close()
dbs.connectMongo()
defer dbs.mdb.Close()
if dbs.isMongodb() {
dbs.connectMongo()
defer dbs.mdb.Close()
}
if dbs.isRethink() {
dbs.poolRethink()
defer dbs.rtdb.Close()
}
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
@ -99,8 +106,8 @@ func main() {
count.SetTime(time.Since(start))
fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup())
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
if opts.Influxdb != "" {
writeStats(start)

View file

@ -59,9 +59,10 @@ func init() {
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&opts.Hostname, "H", "", "hostname")
flag.StringVar(&dbs.MongoURI, "m", dbs.MongoURI, "Mongodb")
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis")
flag.StringVar(&dbs.RethinkURI, "R", "", "Rethink DB")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.IntVar(&opts.MaxLogins, "L", opts.MaxLogins, "Max lastlogins")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")