test01
This commit is contained in:
parent
c9d9a7c70f
commit
3ca6a0906f
7 changed files with 96 additions and 142 deletions
114
consumer.go
114
consumer.go
|
@ -3,22 +3,11 @@ package m2r
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
// "github.com/garyburd/redigo/redis"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
type consumed struct {
|
||||
user string
|
||||
error bool
|
||||
logins []string
|
||||
empty bool
|
||||
}
|
||||
|
||||
func consumer() {
|
||||
|
||||
for {
|
||||
|
@ -27,108 +16,25 @@ func consumer() {
|
|||
|
||||
status = _Consumer
|
||||
|
||||
var bulk = make(map[string]*mgo.Bulk)
|
||||
var rtbulk []MongoLogin
|
||||
var col = make(map[string]*mgo.Collection)
|
||||
var slogin = make(map[string][]string)
|
||||
|
||||
if opts.Bulk {
|
||||
bulk[opts.Month] = dbs.ll.Bulk()
|
||||
bulk[opts.Month].Unordered()
|
||||
} else {
|
||||
col[opts.Month] = dbs.ll
|
||||
}
|
||||
|
||||
cons := consumed{
|
||||
user: prod.user,
|
||||
logins: make([]string, 0),
|
||||
error: false,
|
||||
empty: true,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
for i := range prod.logins {
|
||||
login := prod.logins[i]
|
||||
// se la riga di login e' vuota
|
||||
if login == "" {
|
||||
log.Println("Login empty: ", prod.user)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
sval := strings.Split(login, ":")
|
||||
// se il formato della riga di login non e' corretto
|
||||
if sval[1] == "" {
|
||||
log.Println("Login format error: ", login, prod.user)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
// se il timestamp della riga di login non e' corretto
|
||||
date, err := strconv.ParseInt(sval[1], 10, 64)
|
||||
if err != nil {
|
||||
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
ml := MongoLogin{
|
||||
// genera l' _ID con user e timestamp
|
||||
ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")),
|
||||
User: prod.user,
|
||||
Protocol: sval[0],
|
||||
IP: sval[2],
|
||||
Date: time.Unix(date, 0),
|
||||
Insert: time.Now(),
|
||||
}
|
||||
|
||||
// inserisce il login su Mongodb
|
||||
if opts.Bulk {
|
||||
rtbulk = append(rtbulk, ml)
|
||||
slogin["rt"] = append(slogin["rt"], login)
|
||||
resp, err := dbs.Rethink.MultiInsert(prod.logins)
|
||||
count.AddInsert(resp.Inserted)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "Duplicate primary key") {
|
||||
count.AddErr(resp.Errors)
|
||||
continue
|
||||
} else {
|
||||
resp, err := dbs.rtdb.Insert(ml)
|
||||
count.AddInsert(resp.Inserted)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "Duplicate primary key") {
|
||||
fmt.Printf("RT Insert Err: %+v\n", err)
|
||||
cons.error = true
|
||||
count.AddErr(1)
|
||||
continue
|
||||
} else {
|
||||
count.AddDuplicate(1)
|
||||
}
|
||||
}
|
||||
|
||||
cons.logins = append(cons.logins, login)
|
||||
count.AddDuplicate(resp.Errors)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if opts.Bulk {
|
||||
resp, err := dbs.rtdb.MultiInsert(rtbulk)
|
||||
count.AddInsert(resp.Inserted)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "Duplicate primary key") {
|
||||
cons.error = true
|
||||
count.AddErr(resp.Errors)
|
||||
continue
|
||||
} else {
|
||||
count.AddDuplicate(resp.Errors)
|
||||
}
|
||||
}
|
||||
cons.logins = append(cons.logins, slogin["rt"]...)
|
||||
|
||||
}
|
||||
|
||||
count.AddLog(len(prod.logins))
|
||||
|
||||
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
|
||||
cons.empty = false
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start))
|
||||
fmt.Printf("CONS: logins=%d in %v - active=%d\n", len(prod.logins), time.Since(start))
|
||||
}
|
||||
|
||||
// wg.Done()
|
||||
remove <- cons
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
|
36
dbs.go
36
dbs.go
|
@ -2,20 +2,20 @@
|
|||
package m2r
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
// rt "gopkg.in/dancannon/gorethink.v2"
|
||||
"gopkg.in/mgo.v2"
|
||||
// "gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
var (
|
||||
dbs = Dbs{
|
||||
MongoDB: "lastlogin",
|
||||
RethinkDB: "lastlogin",
|
||||
Mongo: &Mongo{},
|
||||
Rethink: &Rethink{},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -25,19 +25,8 @@ type Dbs struct {
|
|||
MongoDB string
|
||||
RethinkURI string
|
||||
RethinkDB string
|
||||
rtdb *Rethink
|
||||
mdb *mgo.Session
|
||||
ll *mgo.Collection
|
||||
}
|
||||
|
||||
// MongoLogin structure
|
||||
type MongoLogin struct {
|
||||
ID string `json:"_id" bson:"_id" gorethink:"id"`
|
||||
User string `json:"user" bson:"user" gorethink:"user"`
|
||||
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
|
||||
IP string `json:"ip" bson:"ip" gorethink:"ip"`
|
||||
Date time.Time `json:"date" bson:"date" gorethink:"date"`
|
||||
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
|
||||
Mongo *Mongo
|
||||
Rethink *Rethink
|
||||
}
|
||||
|
||||
func (db *Dbs) poolRethink() {
|
||||
|
@ -52,7 +41,14 @@ func (db *Dbs) poolRethink() {
|
|||
if opts.Debug {
|
||||
fmt.Printf("RT_URI: %s\n", uri)
|
||||
}
|
||||
db.rtdb, err = NewRethinkDB(uri)
|
||||
|
||||
database := strings.Split(dbs.RethinkDB, ":")
|
||||
if len(database) > 1 {
|
||||
db.Rethink, err = NewRethinkDB(uri, database[0], database[1])
|
||||
} else {
|
||||
err = errors.New("DB format error - DB:Table")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("RethinkDB connect Error: ", err.Error())
|
||||
os.Exit(-4)
|
||||
|
@ -67,11 +63,9 @@ func (db *Dbs) poolRethink() {
|
|||
|
||||
func (db *Dbs) connectMongo() {
|
||||
var err error
|
||||
db.mdb, err = mgo.Dial(db.MongoURI)
|
||||
db.Mongo, err = NewMongoDB(db.MongoURI, db.MongoDB)
|
||||
if err != nil {
|
||||
log.Println("Mongodb connect Error: ", err.Error())
|
||||
os.Exit(-3)
|
||||
}
|
||||
|
||||
db.ll = db.mdb.DB(db.MongoDB).C(fmt.Sprintf("lastlogin_%s", opts.Month))
|
||||
}
|
||||
|
|
7
main.go
7
main.go
|
@ -22,7 +22,6 @@ var (
|
|||
|
||||
done chan bool
|
||||
consume chan produced
|
||||
remove chan consumed
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
|
@ -70,10 +69,10 @@ func main() {
|
|||
opts.Month = start.Format("0601")
|
||||
|
||||
dbs.connectMongo()
|
||||
defer dbs.mdb.Close()
|
||||
defer dbs.Mongo.Close()
|
||||
|
||||
dbs.poolRethink()
|
||||
defer dbs.rtdb.Close()
|
||||
defer dbs.Rethink.Close()
|
||||
|
||||
if opts.Timeout > 0 {
|
||||
time.AfterFunc(opts.Timeout, exit)
|
||||
|
@ -82,7 +81,6 @@ func main() {
|
|||
count = NewCounter()
|
||||
|
||||
consume = make(chan produced)
|
||||
remove = make(chan consumed)
|
||||
loop = true
|
||||
done = make(chan bool)
|
||||
|
||||
|
@ -100,5 +98,4 @@ func main() {
|
|||
|
||||
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
|
||||
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
|
||||
|
||||
}
|
||||
|
|
57
mongo.go
57
mongo.go
|
@ -1,4 +1,59 @@
|
|||
// mongo
|
||||
package m2r
|
||||
|
||||
// "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2"
|
||||
// "gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type Mongo struct {
|
||||
mdb *mgo.Session
|
||||
ll *mgo.Collection
|
||||
count int
|
||||
}
|
||||
|
||||
// MongoLogin structure
|
||||
type MongoLogin struct {
|
||||
ID string `json:"_id" bson:"_id" gorethink:"id"`
|
||||
User string `json:"user" bson:"user" gorethink:"user"`
|
||||
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
|
||||
IP string `json:"ip" bson:"ip" gorethink:"ip"`
|
||||
Date time.Time `json:"date" bson:"date" gorethink:"date"`
|
||||
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
|
||||
}
|
||||
|
||||
func NewMongoDB(uri string, db string) (*Mongo, error) {
|
||||
var err error
|
||||
mdb, err := mgo.Dial(uri)
|
||||
if err != nil {
|
||||
return &Mongo{}, err
|
||||
}
|
||||
|
||||
ll := mdb.DB(db).C(fmt.Sprintf("lastlogin_%s", opts.Month))
|
||||
|
||||
return &Mongo{
|
||||
mdb: mdb,
|
||||
ll: ll,
|
||||
count: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *Mongo) Find(limit int) []MongoLogin {
|
||||
q := m.ll.Find(nil).Sort("$natural").Skip(m.count).Limit(limit)
|
||||
iter := q.Iter()
|
||||
defer iter.Close()
|
||||
|
||||
var retval []MongoLogin
|
||||
iter.All(&retval)
|
||||
|
||||
m.count += limit
|
||||
|
||||
return retval
|
||||
}
|
||||
|
||||
func (m *Mongo) Close() {
|
||||
m.mdb.Close()
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ type Options struct {
|
|||
var (
|
||||
opts = Options{
|
||||
LogFile: "log/llmongo.log",
|
||||
MaxLogins: -1,
|
||||
MaxLogins: 100,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -7,8 +7,7 @@ package m2r
|
|||
// "time"
|
||||
|
||||
type produced struct {
|
||||
user string
|
||||
logins []string
|
||||
logins []MongoLogin
|
||||
}
|
||||
|
||||
func producer() {
|
||||
|
@ -20,14 +19,13 @@ func producer() {
|
|||
|
||||
//start := time.Now()
|
||||
// estrae un userid dalla lista degli utenti che hanno fatto login
|
||||
logs := dbs.Mongo.Find(opts.MaxLogins)
|
||||
|
||||
count.AddUser()
|
||||
wg.Add(1)
|
||||
count.AddWG()
|
||||
|
||||
consume <- produced{
|
||||
user: "",
|
||||
logins: []string{"", ""},
|
||||
logins: logs,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
14
rethink.go
14
rethink.go
|
@ -9,10 +9,12 @@ import (
|
|||
)
|
||||
|
||||
type Rethink struct {
|
||||
rtdb *rt.Session
|
||||
DB string
|
||||
Table string
|
||||
rtdb *rt.Session
|
||||
}
|
||||
|
||||
func NewRethinkDB(cluster []string) (*Rethink, error) {
|
||||
func NewRethinkDB(cluster []string, db string, table string) (*Rethink, error) {
|
||||
var (
|
||||
err error
|
||||
session *rt.Session
|
||||
|
@ -38,12 +40,14 @@ func NewRethinkDB(cluster []string) (*Rethink, error) {
|
|||
}
|
||||
|
||||
return &Rethink{
|
||||
rtdb: session,
|
||||
rtdb: session,
|
||||
DB: db,
|
||||
Table: table,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
|
||||
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
|
||||
resp, err := rt.DB(r.DB).Table(r.Table).Insert(login).RunWrite(r.rtdb)
|
||||
if opts.Debug {
|
||||
fmt.Printf("RTinsert: %+v\n", resp)
|
||||
}
|
||||
|
@ -55,7 +59,7 @@ func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
|
|||
}
|
||||
|
||||
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
|
||||
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
|
||||
resp, err := rt.DB(r.DB).Table(r.Table).Insert(logins).RunWrite(r.rtdb)
|
||||
if opts.Debug {
|
||||
fmt.Printf("RTMulti: %+v\n", resp)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue