From 3ca6a0906f707cc2b387ef40d6e7b77c8978719b Mon Sep 17 00:00:00 2001 From: Michele Date: Thu, 17 Nov 2016 10:09:23 +0100 Subject: [PATCH] test01 --- consumer.go | 114 +++++----------------------------------------------- dbs.go | 36 +++++++---------- main.go | 7 +--- mongo.go | 57 +++++++++++++++++++++++++- options.go | 2 +- producer.go | 8 ++-- rethink.go | 14 ++++--- 7 files changed, 96 insertions(+), 142 deletions(-) diff --git a/consumer.go b/consumer.go index 313166d..ed6722c 100644 --- a/consumer.go +++ b/consumer.go @@ -3,22 +3,11 @@ package m2r import ( "fmt" - // "github.com/garyburd/redigo/redis" - "log" - "strconv" + "strings" "time" - - "gopkg.in/mgo.v2" ) -type consumed struct { - user string - error bool - logins []string - empty bool -} - func consumer() { for { @@ -27,108 +16,25 @@ func consumer() { status = _Consumer - var bulk = make(map[string]*mgo.Bulk) - var rtbulk []MongoLogin - var col = make(map[string]*mgo.Collection) - var slogin = make(map[string][]string) - - if opts.Bulk { - bulk[opts.Month] = dbs.ll.Bulk() - bulk[opts.Month].Unordered() - } else { - col[opts.Month] = dbs.ll - } - - cons := consumed{ - user: prod.user, - logins: make([]string, 0), - error: false, - empty: true, - } - start := time.Now() - for i := range prod.logins { - login := prod.logins[i] - // se la riga di login e' vuota - if login == "" { - log.Println("Login empty: ", prod.user) - cons.logins = append(cons.logins, login) - continue - } - sval := strings.Split(login, ":") - // se il formato della riga di login non e' corretto - if sval[1] == "" { - log.Println("Login format error: ", login, prod.user) - cons.logins = append(cons.logins, login) - continue - } - // se il timestamp della riga di login non e' corretto - date, err := strconv.ParseInt(sval[1], 10, 64) - if err != nil { - log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) - cons.logins = append(cons.logins, login) - continue - } - ml := MongoLogin{ - // genera l' _ID con user e timestamp - ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")), - User: prod.user, - Protocol: sval[0], - IP: sval[2], - Date: time.Unix(date, 0), - Insert: time.Now(), - } - // inserisce il login su Mongodb - if opts.Bulk { - rtbulk = append(rtbulk, ml) - slogin["rt"] = append(slogin["rt"], login) + resp, err := dbs.Rethink.MultiInsert(prod.logins) + count.AddInsert(resp.Inserted) + if err != nil { + if !strings.Contains(err.Error(), "Duplicate primary key") { + count.AddErr(resp.Errors) + continue } else { - resp, err := dbs.rtdb.Insert(ml) - count.AddInsert(resp.Inserted) - if err != nil { - if !strings.Contains(err.Error(), "Duplicate primary key") { - fmt.Printf("RT Insert Err: %+v\n", err) - cons.error = true - count.AddErr(1) - continue - } else { - count.AddDuplicate(1) - } - } - - cons.logins = append(cons.logins, login) + count.AddDuplicate(resp.Errors) } - - } - - if opts.Bulk { - resp, err := dbs.rtdb.MultiInsert(rtbulk) - count.AddInsert(resp.Inserted) - if err != nil { - if !strings.Contains(err.Error(), "Duplicate primary key") { - cons.error = true - count.AddErr(resp.Errors) - continue - } else { - count.AddDuplicate(resp.Errors) - } - } - cons.logins = append(cons.logins, slogin["rt"]...) - } count.AddLog(len(prod.logins)) - if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { - cons.empty = false - } - if opts.Debug { - fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start)) + fmt.Printf("CONS: logins=%d in %v - active=%d\n", len(prod.logins), time.Since(start)) } - // wg.Done() - remove <- cons + wg.Done() } } diff --git a/dbs.go b/dbs.go index b87abdb..ab676e7 100644 --- a/dbs.go +++ b/dbs.go @@ -2,20 +2,20 @@ package m2r import ( + "errors" "fmt" "log" "os" "strings" - "time" // rt "gopkg.in/dancannon/gorethink.v2" - "gopkg.in/mgo.v2" + // "gopkg.in/mgo.v2" ) var ( dbs = Dbs{ - MongoDB: "lastlogin", - RethinkDB: "lastlogin", + Mongo: &Mongo{}, + Rethink: &Rethink{}, } ) @@ -25,19 +25,8 @@ type Dbs struct { MongoDB string RethinkURI string RethinkDB string - rtdb *Rethink - mdb *mgo.Session - ll *mgo.Collection -} - -// MongoLogin structure -type MongoLogin struct { - ID string `json:"_id" bson:"_id" gorethink:"id"` - User string `json:"user" bson:"user" gorethink:"user"` - Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` - IP string `json:"ip" bson:"ip" gorethink:"ip"` - Date time.Time `json:"date" bson:"date" gorethink:"date"` - Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"` + Mongo *Mongo + Rethink *Rethink } func (db *Dbs) poolRethink() { @@ -52,7 +41,14 @@ func (db *Dbs) poolRethink() { if opts.Debug { fmt.Printf("RT_URI: %s\n", uri) } - db.rtdb, err = NewRethinkDB(uri) + + database := strings.Split(dbs.RethinkDB, ":") + if len(database) > 1 { + db.Rethink, err = NewRethinkDB(uri, database[0], database[1]) + } else { + err = errors.New("DB format error - DB:Table") + } + if err != nil { fmt.Println("RethinkDB connect Error: ", err.Error()) os.Exit(-4) @@ -67,11 +63,9 @@ func (db *Dbs) poolRethink() { func (db *Dbs) connectMongo() { var err error - db.mdb, err = mgo.Dial(db.MongoURI) + db.Mongo, err = NewMongoDB(db.MongoURI, db.MongoDB) if err != nil { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } - - db.ll = db.mdb.DB(db.MongoDB).C(fmt.Sprintf("lastlogin_%s", opts.Month)) } diff --git a/main.go b/main.go index 88d035c..c6aa55c 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,6 @@ var ( done chan bool consume chan produced - remove chan consumed wg sync.WaitGroup @@ -70,10 +69,10 @@ func main() { opts.Month = start.Format("0601") dbs.connectMongo() - defer dbs.mdb.Close() + defer dbs.Mongo.Close() dbs.poolRethink() - defer dbs.rtdb.Close() + defer dbs.Rethink.Close() if opts.Timeout > 0 { time.AfterFunc(opts.Timeout, exit) @@ -82,7 +81,6 @@ func main() { count = NewCounter() consume = make(chan produced) - remove = make(chan consumed) loop = true done = make(chan bool) @@ -100,5 +98,4 @@ func main() { fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) - } diff --git a/mongo.go b/mongo.go index c8a1fa9..88838a5 100644 --- a/mongo.go +++ b/mongo.go @@ -1,4 +1,59 @@ // mongo package m2r -// "fmt" +import ( + "fmt" + "time" + + "gopkg.in/mgo.v2" + // "gopkg.in/mgo.v2/bson" +) + +type Mongo struct { + mdb *mgo.Session + ll *mgo.Collection + count int +} + +// MongoLogin structure +type MongoLogin struct { + ID string `json:"_id" bson:"_id" gorethink:"id"` + User string `json:"user" bson:"user" gorethink:"user"` + Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` + IP string `json:"ip" bson:"ip" gorethink:"ip"` + Date time.Time `json:"date" bson:"date" gorethink:"date"` + Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"` +} + +func NewMongoDB(uri string, db string) (*Mongo, error) { + var err error + mdb, err := mgo.Dial(uri) + if err != nil { + return &Mongo{}, err + } + + ll := mdb.DB(db).C(fmt.Sprintf("lastlogin_%s", opts.Month)) + + return &Mongo{ + mdb: mdb, + ll: ll, + count: 0, + }, nil +} + +func (m *Mongo) Find(limit int) []MongoLogin { + q := m.ll.Find(nil).Sort("$natural").Skip(m.count).Limit(limit) + iter := q.Iter() + defer iter.Close() + + var retval []MongoLogin + iter.All(&retval) + + m.count += limit + + return retval +} + +func (m *Mongo) Close() { + m.mdb.Close() +} diff --git a/options.go b/options.go index 3dad4aa..5c63b87 100644 --- a/options.go +++ b/options.go @@ -30,7 +30,7 @@ type Options struct { var ( opts = Options{ LogFile: "log/llmongo.log", - MaxLogins: -1, + MaxLogins: 100, } ) diff --git a/producer.go b/producer.go index 5d887f7..034b4ae 100644 --- a/producer.go +++ b/producer.go @@ -7,8 +7,7 @@ package m2r // "time" type produced struct { - user string - logins []string + logins []MongoLogin } func producer() { @@ -20,14 +19,13 @@ func producer() { //start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login + logs := dbs.Mongo.Find(opts.MaxLogins) - count.AddUser() wg.Add(1) count.AddWG() consume <- produced{ - user: "", - logins: []string{"", ""}, + logins: logs, } } diff --git a/rethink.go b/rethink.go index c6d43be..1d3086c 100644 --- a/rethink.go +++ b/rethink.go @@ -9,10 +9,12 @@ import ( ) type Rethink struct { - rtdb *rt.Session + DB string + Table string + rtdb *rt.Session } -func NewRethinkDB(cluster []string) (*Rethink, error) { +func NewRethinkDB(cluster []string, db string, table string) (*Rethink, error) { var ( err error session *rt.Session @@ -38,12 +40,14 @@ func NewRethinkDB(cluster []string) (*Rethink, error) { } return &Rethink{ - rtdb: session, + rtdb: session, + DB: db, + Table: table, }, nil } func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) { - resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb) + resp, err := rt.DB(r.DB).Table(r.Table).Insert(login).RunWrite(r.rtdb) if opts.Debug { fmt.Printf("RTinsert: %+v\n", resp) } @@ -55,7 +59,7 @@ func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) { } func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) { - resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb) + resp, err := rt.DB(r.DB).Table(r.Table).Insert(logins).RunWrite(r.rtdb) if opts.Debug { fmt.Printf("RTMulti: %+v\n", resp) }