From c9d9a7c70fc818c8979e009b27e3a5a596a80429 Mon Sep 17 00:00:00 2001 From: Michele Date: Thu, 10 Nov 2016 17:26:17 +0100 Subject: [PATCH] first commit --- README.md | 0 consumer.go | 134 +++++++++++++++++++++++++++++++++++++++++++ counter.go | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++++ dbs.go | 77 +++++++++++++++++++++++++ main.go | 104 +++++++++++++++++++++++++++++++++ mongo.go | 4 ++ options.go | 66 +++++++++++++++++++++ pid.go | 85 +++++++++++++++++++++++++++ producer.go | 35 ++++++++++++ rethink.go | 71 +++++++++++++++++++++++ sigterm.go | 36 ++++++++++++ 11 files changed, 773 insertions(+) create mode 100644 README.md create mode 100644 consumer.go create mode 100644 counter.go create mode 100644 dbs.go create mode 100644 main.go create mode 100644 mongo.go create mode 100644 options.go create mode 100644 pid.go create mode 100644 producer.go create mode 100644 rethink.go create mode 100644 sigterm.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..313166d --- /dev/null +++ b/consumer.go @@ -0,0 +1,134 @@ +// consumer +package m2r + +import ( + "fmt" + // "github.com/garyburd/redigo/redis" + "log" + "strconv" + "strings" + "time" + + "gopkg.in/mgo.v2" +) + +type consumed struct { + user string + error bool + logins []string + empty bool +} + +func consumer() { + + for { + + prod := <-consume + + status = _Consumer + + var bulk = make(map[string]*mgo.Bulk) + var rtbulk []MongoLogin + var col = make(map[string]*mgo.Collection) + var slogin = make(map[string][]string) + + if opts.Bulk { + bulk[opts.Month] = dbs.ll.Bulk() + bulk[opts.Month].Unordered() + } else { + col[opts.Month] = dbs.ll + } + + cons := consumed{ + user: prod.user, + logins: make([]string, 0), + error: false, + empty: true, + } + + start := time.Now() + for i := range prod.logins { + login := prod.logins[i] + // se la riga di login e' vuota + if login == "" { + log.Println("Login empty: ", prod.user) + cons.logins = append(cons.logins, login) + continue + } + sval := strings.Split(login, ":") + // se il formato della riga di login non e' corretto + if sval[1] == "" { + log.Println("Login format error: ", login, prod.user) + cons.logins = append(cons.logins, login) + continue + } + // se il timestamp della riga di login non e' corretto + date, err := strconv.ParseInt(sval[1], 10, 64) + if err != nil { + log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) + cons.logins = append(cons.logins, login) + continue + } + ml := MongoLogin{ + // genera l' _ID con user e timestamp + ID: fmt.Sprintf("%s_%s", prod.user, time.Unix(date, 0).Format("20060102T150405")), + User: prod.user, + Protocol: sval[0], + IP: sval[2], + Date: time.Unix(date, 0), + Insert: time.Now(), + } + + // inserisce il login su Mongodb + if opts.Bulk { + rtbulk = append(rtbulk, ml) + slogin["rt"] = append(slogin["rt"], login) + } else { + resp, err := dbs.rtdb.Insert(ml) + count.AddInsert(resp.Inserted) + if err != nil { + if !strings.Contains(err.Error(), "Duplicate primary key") { + fmt.Printf("RT Insert Err: %+v\n", err) + cons.error = true + count.AddErr(1) + continue + } else { + count.AddDuplicate(1) + } + } + + cons.logins = append(cons.logins, login) + } + + } + + if opts.Bulk { + resp, err := dbs.rtdb.MultiInsert(rtbulk) + count.AddInsert(resp.Inserted) + if err != nil { + if !strings.Contains(err.Error(), "Duplicate primary key") { + cons.error = true + count.AddErr(resp.Errors) + continue + } else { + count.AddDuplicate(resp.Errors) + } + } + cons.logins = append(cons.logins, slogin["rt"]...) + + } + + count.AddLog(len(prod.logins)) + + if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins { + cons.empty = false + } + + if opts.Debug { + fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start)) + } + + // wg.Done() + remove <- cons + } +} diff --git a/counter.go b/counter.go new file mode 100644 index 0000000..2ab6fda --- /dev/null +++ b/counter.go @@ -0,0 +1,161 @@ +// counter +package m2r + +import ( + "sync" + "time" +) + +// Counter structure +type Counter struct { + mu sync.Mutex + user int + log int + insert int + rem int + err int + dup int + time time.Duration + wg int +} + +// NewCounter iniitialized Counter structure +func NewCounter() *Counter { + return &Counter{ + user: 0, + log: 0, + insert: 0, + err: 0, + rem: 0, + dup: 0, + time: 0, + wg: 0, + } +} + +// AddUser increment number of users managed +func (c *Counter) AddUser() { + c.mu.Lock() + defer c.mu.Unlock() + c.user++ +} + +// AddDuplicate increment number of duplicates log +func (c *Counter) AddDuplicate(add int) { + c.mu.Lock() + defer c.mu.Unlock() + c.dup += add +} + +// AddInsert increment number of inserted rows +func (c *Counter) AddInsert(add int) { + c.mu.Lock() + defer c.mu.Unlock() + c.insert += add +} + +// AddLog increment number of log's rows managed +func (c *Counter) AddLog(add int) { + c.mu.Lock() + defer c.mu.Unlock() + c.log += add +} + +//AddRem increment removed logs row +func (c *Counter) AddRem(add int) { + c.mu.Lock() + defer c.mu.Unlock() + c.rem += add +} + +// AddWG ... +func (c *Counter) AddWG() { + c.mu.Lock() + defer c.mu.Unlock() + c.wg++ +} + +// AddErr ... +func (c *Counter) AddErr(add int) { + c.mu.Lock() + defer c.mu.Unlock() + c.err += add +} + +// DelWG ... +func (c *Counter) DelWG() { + c.mu.Lock() + defer c.mu.Unlock() + c.wg-- +} + +// GetUser return total users +func (c *Counter) GetUser() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.user + return +} + +// GetDup return total duplicated logins +func (c *Counter) GetDup() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.dup + return +} + +// GetLog return total log's rows +func (c *Counter) GetLog() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.log + return +} + +// GetInsert return total inserted rows +func (c *Counter) GetInsert() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.insert + return +} + +// GetErr return total errors +func (c *Counter) GetErr() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.err + return +} + +// GetRem return total removed log's rows +func (c *Counter) GetRem() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.rem + return +} + +// GetWG ... +func (c *Counter) GetWG() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.wg + return +} + +// GetTime ... +func (c *Counter) GetTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.time.Seconds() + return +} + +// SetTime ... +func (c *Counter) SetTime(t time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.time = t +} diff --git a/dbs.go b/dbs.go new file mode 100644 index 0000000..b87abdb --- /dev/null +++ b/dbs.go @@ -0,0 +1,77 @@ +// dbs +package m2r + +import ( + "fmt" + "log" + "os" + "strings" + "time" + + // rt "gopkg.in/dancannon/gorethink.v2" + "gopkg.in/mgo.v2" +) + +var ( + dbs = Dbs{ + MongoDB: "lastlogin", + RethinkDB: "lastlogin", + } +) + +// Dbs structure +type Dbs struct { + MongoURI string + MongoDB string + RethinkURI string + RethinkDB string + rtdb *Rethink + mdb *mgo.Session + ll *mgo.Collection +} + +// MongoLogin structure +type MongoLogin struct { + ID string `json:"_id" bson:"_id" gorethink:"id"` + User string `json:"user" bson:"user" gorethink:"user"` + Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` + IP string `json:"ip" bson:"ip" gorethink:"ip"` + Date time.Time `json:"date" bson:"date" gorethink:"date"` + Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"` +} + +func (db *Dbs) poolRethink() { + var err error + + if opts.Debug { + fmt.Printf("DBS: %+v\n", db) + } + + if db.RethinkURI != "" { + uri := strings.Split(dbs.RethinkURI, ",") + if opts.Debug { + fmt.Printf("RT_URI: %s\n", uri) + } + db.rtdb, err = NewRethinkDB(uri) + if err != nil { + fmt.Println("RethinkDB connect Error: ", err.Error()) + os.Exit(-4) + } + } + + if opts.Debug { + fmt.Printf("DBS: %+v\n", db) + } + +} + +func (db *Dbs) connectMongo() { + var err error + db.mdb, err = mgo.Dial(db.MongoURI) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + + db.ll = db.mdb.DB(db.MongoDB).C(fmt.Sprintf("lastlogin_%s", opts.Month)) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..88d035c --- /dev/null +++ b/main.go @@ -0,0 +1,104 @@ +// llmongo.go +package m2r + +import ( + "flag" + "fmt" + "log" + "os" + "sync" + "time" +) + +const ( + _Version = "v1.0.0" + _Producer = 0 + _Consumer = 1 + _Remover = 2 +) + +var ( + loop bool + + done chan bool + consume chan produced + remove chan consumed + + wg sync.WaitGroup + + status int + + count *Counter +) + +func main() { + flag.Usage = usage + flag.Parse() + + if opts.Version { + fmt.Println(os.Args[0], _Version) + os.Exit(0) + } + + if opts.Hostname == "" { + var err error + opts.Hostname, err = os.Hostname() + if err != nil { + fmt.Println("Hostname error: ", err.Error()) + } + } + + setTerm() + + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + fmt.Println("Log file error: ", err.Error()) + os.Exit(-4) + } + defer fs.Close() + + log.SetOutput(fs) + + pid.PIDFile = opts.Pidfile + pid.Write(true) + defer pid.Remove() + + start := time.Now() + fmt.Printf("Start: %+v\n", opts) + log.Printf("Start: %+v\n", opts) + + opts.Month = start.Format("0601") + + dbs.connectMongo() + defer dbs.mdb.Close() + + dbs.poolRethink() + defer dbs.rtdb.Close() + + if opts.Timeout > 0 { + time.AfterFunc(opts.Timeout, exit) + } + + count = NewCounter() + + consume = make(chan produced) + remove = make(chan consumed) + loop = true + done = make(chan bool) + + go producer() + go consumer() + + <-done + fmt.Printf("Done\n") + close(done) + + fmt.Println("Waiting WG") + wg.Wait() + + count.SetTime(time.Since(start)) + + fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) + log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) + +} diff --git a/mongo.go b/mongo.go new file mode 100644 index 0000000..c8a1fa9 --- /dev/null +++ b/mongo.go @@ -0,0 +1,4 @@ +// mongo +package m2r + +// "fmt" diff --git a/options.go b/options.go new file mode 100644 index 0000000..3dad4aa --- /dev/null +++ b/options.go @@ -0,0 +1,66 @@ +// options +package m2r + +import ( + "flag" + "fmt" + "log" + "os" + "path" + "path/filepath" + "time" +) + +// Options structure +type Options struct { + CurrentPath string + Exe string + LogFile string + ConfigFile string + Timeout time.Duration + MaxLogins int + Debug bool + Version bool + Bulk bool + Month string + Hostname string + Pidfile string +} + +var ( + opts = Options{ + LogFile: "log/llmongo.log", + MaxLogins: -1, + } +) + +func usage() { + fmt.Println("Usage: llmongo -m -r -l -T -H -v") + fmt.Println() + os.Exit(0) +} + +func init() { + var err error + opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) + opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") + opts.Exe = path.Base(os.Args[0]) + + flag.StringVar(&opts.Hostname, "H", "", "hostname") + flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb URI") + flag.StringVar(&dbs.MongoDB, "md", dbs.MongoDB, "Mongodb Database") + flag.StringVar(&dbs.RethinkURI, "r", "", "RethinkDB URI") + flag.StringVar(&dbs.RethinkDB, "rd", dbs.RethinkDB, "Rethinkdb Database") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.IntVar(&opts.MaxLogins, "L", opts.MaxLogins, "Max lastlogins") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") + flag.BoolVar(&opts.Debug, "D", false, "Debug") + flag.BoolVar(&opts.Bulk, "B", false, "Bulk") + flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file") +} diff --git a/pid.go b/pid.go new file mode 100644 index 0000000..858b0a5 --- /dev/null +++ b/pid.go @@ -0,0 +1,85 @@ +// pid +package m2r + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "path" + "strconv" + "strings" +) + +var ( + pid = PID{} +) + +// PID structure +type PID struct { + PID string + PIDFile string +} + +// verifica se esiste il PIDFile; +// se esiste legge il PID e controlla se e' running il processo associato +func (p *PID) check() bool { + bpid, err := ioutil.ReadFile(p.PIDFile) + p.PID = strings.TrimRight(string(bpid), "\n") + if err == nil && p.readCmd() { + return true + } + return false +} + +// controlla se esiste il processo associato al PID, +// se il cmd e' lo stesso e se e' in esecuzione. +func (p *PID) readCmd() bool { + bcmd, err := ioutil.ReadFile(path.Join("/proc", p.PID, "cmdline")) + // non esiste la dir relativa al PID su /proc + if err != nil { + fmt.Println("cmdline error: ", err) + return false + } + cmd := bytes.Trim(bcmd, "\x00") + if !strings.Contains(string(cmd), opts.Exe) { + fmt.Printf("PID %s used by %s\n", pid, cmd) + } + return true +} + +// scrive il PID nel PIDFile +func (p *PID) Write(l bool) { + + if p.check() { + if l { + log.Println("Running: ", p.PID) + } else { + fmt.Println("Running: ", p.PID) + } + + os.Exit(-6) + } + + fpid, err := os.OpenFile(p.PIDFile, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + if l { + log.Println("PID file error: ", err.Error()) + } else { + fmt.Println("PID file error: ", err.Error()) + } + + os.Exit(-5) + } + fpid.WriteString(strconv.Itoa(os.Getpid())) + fpid.Close() +} + +// Remove cancella il PIDFile +func (p *PID) Remove() { + err := os.Remove(p.PIDFile) + if err != nil { + fmt.Println("RM file error: ", err.Error()) + } +} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..5d887f7 --- /dev/null +++ b/producer.go @@ -0,0 +1,35 @@ +// iterator +package m2r + +// "fmt" +// "log" +// "strconv" +// "time" + +type produced struct { + user string + logins []string +} + +func producer() { + + for loop { + + wg.Wait() + status = _Producer + + //start := time.Now() + // estrae un userid dalla lista degli utenti che hanno fatto login + + count.AddUser() + wg.Add(1) + count.AddWG() + + consume <- produced{ + user: "", + logins: []string{"", ""}, + } + } + + done <- true +} diff --git a/rethink.go b/rethink.go new file mode 100644 index 0000000..c6d43be --- /dev/null +++ b/rethink.go @@ -0,0 +1,71 @@ +package m2r + +import ( + "fmt" + "log" + // "strings" + + rt "gopkg.in/dancannon/gorethink.v2" +) + +type Rethink struct { + rtdb *rt.Session +} + +func NewRethinkDB(cluster []string) (*Rethink, error) { + var ( + err error + session *rt.Session + ) + + if len(cluster) > 1 { + session, err = rt.Connect(rt.ConnectOpts{ + Addresses: cluster, + InitialCap: 10, + MaxOpen: 10, + }) + } else { + session, err = rt.Connect(rt.ConnectOpts{ + Address: cluster[0], + InitialCap: 10, + MaxOpen: 10, + }) + } + + if err != nil { + log.Printf("RethinkDB ERROR: %+v\n", err.Error()) + return nil, err + } + + return &Rethink{ + rtdb: session, + }, nil +} + +func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) { + resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb) + if opts.Debug { + fmt.Printf("RTinsert: %+v\n", resp) + } + if err != nil { + return resp, err + } + + return resp, nil +} + +func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) { + resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb) + if opts.Debug { + fmt.Printf("RTMulti: %+v\n", resp) + } + if err != nil { + return resp, err + } + + return resp, nil +} + +func (r *Rethink) Close() { + r.rtdb.Close() +} diff --git a/sigterm.go b/sigterm.go new file mode 100644 index 0000000..3e36a7d --- /dev/null +++ b/sigterm.go @@ -0,0 +1,36 @@ +// sigterm +package m2r + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func kill() { + log.Printf("KILL %d\n", status) + fmt.Printf("KILL %d\n", status) + wg.Done() + count.DelWG() + done <- true +} + +func exit() { + log.Printf("EXIT %d\n", status) + fmt.Printf("EXIT %d\n", status) + loop = false + time.AfterFunc(time.Second*20, kill) +} + +func setTerm() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + <-c + exit() + }() +}