diff --git a/consumer.go b/consumer.go index 2b901ea..109812c 100644 --- a/consumer.go +++ b/consumer.go @@ -28,6 +28,7 @@ func consumer() { status = _Consumer var bulk = make(map[string]*mgo.Bulk) + var rtbulk []MongoLogin var col = make(map[string]*mgo.Collection) var slogin = make(map[string][]string) @@ -81,65 +82,126 @@ func consumer() { if opts.Month != ml.Date.Format("0601") { dt := fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")) if opts.Bulk { - if _, ok := bulk[dt]; !ok { - bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk() - bulk[dt].Unordered() + if dbs.isMongodb() { + if _, ok := bulk[dt]; !ok { + bulk[dt] = dbs.mdb.DB("lastlogin").C(dt).Bulk() + bulk[dt].Unordered() + } + bulk[dt].Insert(ml) + slogin[dt] = append(slogin[dt], login) + } + if dbs.isRethink() { + rtbulk = append(rtbulk, ml) + slogin["rt"] = append(slogin["rt"], login) } - bulk[dt].Insert(ml) - slogin[dt] = append(slogin[dt], login) } else { - if _, ok := col[dt]; !ok { - col[dt] = dbs.mdb.DB("lastlogin").C(dt) - } - err = col[dt].Insert(ml) - if err != nil { - if !strings.Contains(err.Error(), "E11000") { - fmt.Printf("Err: %+v\n", err) - cons.error = true - count.AddErr(1) - continue - } else { - count.AddDuplicate(1) + if dbs.isMongodb() { + if _, ok := col[dt]; !ok { + col[dt] = dbs.mdb.DB("lastlogin").C(dt) + } + err = col[dt].Insert(ml) + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Mongo Insert Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } } } + if dbs.isRethink() { + _, err = dbs.rtdb.Insert(ml) + if err != nil { + if !strings.Contains(err.Error(), "Duplicate primary key") { + fmt.Printf("RT Insert Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } + } + } + cons.logins = append(cons.logins, login) } } else { // inserisce il login su Mongodb if opts.Bulk { - bulk[opts.Month].Insert(ml) - slogin[opts.Month] = append(slogin[opts.Month], login) + if dbs.isMongodb() { + bulk[opts.Month].Insert(ml) + slogin[opts.Month] = append(slogin[opts.Month], login) + } + if dbs.isRethink() { + rtbulk = append(rtbulk, ml) + slogin["rt"] = append(slogin["rt"], login) + } } else { - err = col[opts.Month].Insert(ml) - if err != nil { - if !strings.Contains(err.Error(), "E11000") { - fmt.Printf("Err: %+v\n", err) - cons.error = true - count.AddErr(1) - continue - } else { - count.AddDuplicate(1) + if dbs.isMongodb() { + err = col[opts.Month].Insert(ml) + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } } } + if dbs.isRethink() { + resp, err := dbs.rtdb.Insert(ml) + count.AddInsert(resp.Inserted) + if err != nil { + if !strings.Contains(err.Error(), "Duplicate primary key") { + fmt.Printf("RT Insert Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } + } + } + cons.logins = append(cons.logins, login) } } } if opts.Bulk { - for key, _ := range bulk { - _, err := bulk[key].Run() + if dbs.isMongodb() { + for key, _ := range bulk { + _, err := bulk[key].Run() + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Err: %+v\n", err) + cons.error = true + count.AddErr(len(slogin[key])) + continue + } else { + count.AddDuplicate(strings.Count(err.Error(), "E11000")) + } + } + cons.logins = append(cons.logins, slogin[key]...) + } + } + if dbs.isRethink() { + resp, err := dbs.rtdb.MultiInsert(rtbulk) + count.AddInsert(resp.Inserted) if err != nil { - if !strings.Contains(err.Error(), "E11000") { - fmt.Printf("Err: %+v\n", err) + if !strings.Contains(err.Error(), "Duplicate primary key") { cons.error = true - count.AddErr(len(slogin[key])) + count.AddErr(resp.Errors) continue } else { - count.AddDuplicate(strings.Count(err.Error(), "E11000")) + count.AddDuplicate(resp.Errors) } } - cons.logins = append(cons.logins, slogin[key]...) + cons.logins = append(cons.logins, slogin["rt"]...) } } diff --git a/counter.go b/counter.go index 5370cb5..6f6c4b8 100644 --- a/counter.go +++ b/counter.go @@ -8,26 +8,28 @@ import ( // Counter structure type Counter struct { - mu sync.Mutex - user int - log int - rem int - err int - dup int - time time.Duration - wg int + mu sync.Mutex + user int + log int + insert int + rem int + err int + dup int + time time.Duration + wg int } // NewCounter iniitialized Counter structure func NewCounter() *Counter { return &Counter{ - user: 0, - log: 0, - err: 0, - rem: 0, - dup: 0, - time: 0, - wg: 0, + user: 0, + log: 0, + insert: 0, + err: 0, + rem: 0, + dup: 0, + time: 0, + wg: 0, } } @@ -45,6 +47,13 @@ func (c *Counter) AddDuplicate(add int) { c.dup += add } +// AddInsert increment number of inserted rows +func (c *Counter) AddInsert(add int) { + c.mu.Lock() + defer c.mu.Unlock() + c.insert += add +} + // AddLog increment number of log's rows managed func (c *Counter) AddLog(add int) { c.mu.Lock() @@ -104,6 +113,14 @@ func (c *Counter) GetLog() (ret int) { return } +// GetInsert return total inserted rows +func (c *Counter) GetInsert() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.insert + return +} + // GetErr return total errors func (c *Counter) GetErr() (ret int) { c.mu.Lock() diff --git a/dbs.go b/dbs.go index 79ac023..8239e0b 100644 --- a/dbs.go +++ b/dbs.go @@ -7,14 +7,15 @@ import ( "fmt" "log" "os" + "strings" "time" + // rt "gopkg.in/dancannon/gorethink.v2" "gopkg.in/mgo.v2" ) var ( dbs = Dbs{ - MongoURI: "mongodb://127.0.0.1:27018", RedisURI: "127.0.0.1:6379", Database: "lastlogin", } @@ -22,23 +23,25 @@ var ( // Dbs structure type Dbs struct { - MongoURI string - Database string - RedisURI string - rdb *redis.Pool //*redis.Client - mdb *mgo.Session - ll *mgo.Collection + MongoURI string + Database string + RedisURI string + RethinkURI string + rdb *redis.Pool //*redis.Client + rtdb *Rethink + mdb *mgo.Session + ll *mgo.Collection // us *mgo.Collection } // MongoLogin structure type MongoLogin struct { - ID string `json:"_id" bson:"_id"` - User string `json:"user"` - Protocol string `json:"protocol"` - IP string `json:"ip"` - Date time.Time `json:"date"` - Insert time.Time `json:"insert"` + ID string `json:"_id" bson:"_id" gorethink:"id"` + User string `json:"user" bson:"user" gorethink:"user"` + Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` + IP string `json:"ip" bson:"ip" gorethink:"ip"` + Date time.Time `json:"date" bson:"date" gorethink:"date"` + Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"` } // Ips structure @@ -59,6 +62,47 @@ type Index struct { Date time.Time `json:"date"` } +func (db *Dbs) isRethink() bool { + if db.RethinkURI != "" { + return true + } + + return false +} + +func (db *Dbs) isMongodb() bool { + if db.MongoURI != "" { + return true + } + + return false +} + +func (db *Dbs) poolRethink() { + var err error + + if opts.Debug { + fmt.Printf("DBS: %+v\n", db) + } + + if db.RethinkURI != "" { + uri := strings.Split(dbs.RethinkURI, ",") + if opts.Debug { + fmt.Printf("RT_URI: %s\n", uri) + } + db.rtdb, err = NewRethinkDB(uri) + if err != nil { + fmt.Println("RethinkDB connect Error: ", err.Error()) + os.Exit(-4) + } + } + + if opts.Debug { + fmt.Printf("DBS: %+v\n", db) + } + +} + func (db *Dbs) poolRedis() { dbs.rdb = &redis.Pool{ diff --git a/main.go b/main.go index aa02fd4..5a9c508 100644 --- a/main.go +++ b/main.go @@ -72,8 +72,15 @@ func main() { dbs.poolRedis() defer dbs.rdb.Close() - dbs.connectMongo() - defer dbs.mdb.Close() + if dbs.isMongodb() { + dbs.connectMongo() + defer dbs.mdb.Close() + } + + if dbs.isRethink() { + dbs.poolRethink() + defer dbs.rtdb.Close() + } if opts.Timeout > 0 { time.AfterFunc(opts.Timeout, exit) @@ -99,8 +106,8 @@ func main() { count.SetTime(time.Since(start)) - fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup()) - log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup()) + fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) + log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) if opts.Influxdb != "" { writeStats(start) diff --git a/options.go b/options.go index 2fb240b..9863ef9 100644 --- a/options.go +++ b/options.go @@ -59,9 +59,10 @@ func init() { flag.StringVar(&opts.Influxdb, "i", "", "influxdb server") flag.StringVar(&opts.Hostname, "H", "", "hostname") - flag.StringVar(&dbs.MongoURI, "m", dbs.MongoURI, "Mongodb") + flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb") flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database") flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis") + flag.StringVar(&dbs.RethinkURI, "R", "", "Rethink DB") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.IntVar(&opts.MaxLogins, "L", opts.MaxLogins, "Max lastlogins") flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")