From 0c9c9710ceeaefbd09fde6362cb8f009ed47936b Mon Sep 17 00:00:00 2001 From: Michele Date: Fri, 10 Mar 2017 13:40:26 +0100 Subject: [PATCH] first commit --- consumer.go | 79 ++++++++++++++++++++++++++++ counter.go | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++++ dbs.go | 85 +++++++++++++++++++++++++++++++ main.go | 98 +++++++++++++++++++++++++++++++++++ options.go | 64 +++++++++++++++++++++++ pid.go | 85 +++++++++++++++++++++++++++++++ producer.go | 117 ++++++++++++++++++++++++++++++++++++++++++ sigterm.go | 39 ++++++++++++++ 8 files changed, 711 insertions(+) create mode 100644 consumer.go create mode 100644 counter.go create mode 100644 dbs.go create mode 100644 main.go create mode 100644 options.go create mode 100644 pid.go create mode 100644 producer.go create mode 100644 sigterm.go diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..76289bf --- /dev/null +++ b/consumer.go @@ -0,0 +1,79 @@ +// consumer +package main + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "log" + // "strconv" + // "strings" + "time" + + // "gopkg.in/mgo.v2" +) + +type consumed struct { + user string + error bool + logins []string +} + +func hash(val []byte) string { + + h := sha256.New() + h.Write(val) + + return hex.EncodeToString(h.Sum(nil)) +} + +func consumer() { + + for { + + prod := <-consume + + start := time.Now() + + status = _Consumer + + if opts.Test { + fmt.Printf("CONS: users - %d\n", len(prod)) + } + + // for j := range dbs.mdb { + bulk := dbs.mdb.DB("quota").C("tiscali").Bulk() + bulk.Unordered() + + for p := range prod { + mquota := MongoQuota{ + User: prod[p].user, + Messages: prod[p].messages, + Storage: prod[p].storage, + Insert: time.Now(), + } + + bulk.Insert(mquota) + + if opts.Test { + log.Printf("OK: %s\n", prod[p].user) + } + } + + _, err := bulk.Run() + if err != nil { + fmt.Printf("Err: %+v\n", err) + counter <- Counterchan{ + tipo: "err", + val: 1, + } + continue + } + + if opts.Debug { + fmt.Printf("CONS: users=%d in %v - active=%d\n", len(prod), time.Since(start), dbs.rdb.ActiveCount()) + } + + //wg.Done() + } +} diff --git a/counter.go b/counter.go new file mode 100644 index 0000000..0c9a9cc --- /dev/null +++ b/counter.go @@ -0,0 +1,144 @@ +// counter +package main + +import ( + "sync" + "time" +) + +type Counterchan struct { + tipo string + val int +} + +// Counter structure +type Counter struct { + mu sync.Mutex + user int + insert int + err int + dup int + time time.Duration + wg int +} + +// NewCounter iniitialized Counter structure +func NewCounter() *Counter { + return &Counter{ + user: 0, + insert: 0, + err: 0, + dup: 0, + time: 0, + wg: 0, + } +} + +func (c *Counter) Run() { + for { + tocount := <-counter + + switch tocount.tipo { + case "user": + c.addUser() + case "dup": + c.addDuplicate(tocount.val) + case "ins": + c.addInsert(tocount.val) + case "wg": + if tocount.val > 0 { + c.addWG() + } else { + c.delWG() + } + case "err": + c.addErr(tocount.val) + + } + } +} + +// AddUser increment number of users managed +func (c *Counter) addUser() { + c.user++ +} + +// AddDuplicate increment number of duplicates log +func (c *Counter) addDuplicate(add int) { + c.dup += add +} + +// AddInsert increment number of inserted rows +func (c *Counter) addInsert(add int) { + c.insert += add +} + +// AddWG ... +func (c *Counter) addWG() { + c.wg++ +} + +// AddErr ... +func (c *Counter) addErr(add int) { + c.err += add +} + +// DelWG ... +func (c *Counter) delWG() { + c.wg-- +} + +// GetUser return total users +func (c *Counter) GetUser() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.user + return +} + +// GetDup return total duplicated logins +func (c *Counter) GetDup() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.dup + return +} + +// GetInsert return total inserted rows +func (c *Counter) GetInsert() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.insert + return +} + +// GetErr return total errors +func (c *Counter) GetErr() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.err + return +} + +// GetWG ... +func (c *Counter) GetWG() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.wg + return +} + +// GetTime ... +func (c *Counter) GetTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.time.Seconds() + return +} + +// SetTime ... +func (c *Counter) SetTime(t time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.time = t +} diff --git a/dbs.go b/dbs.go new file mode 100644 index 0000000..4de0902 --- /dev/null +++ b/dbs.go @@ -0,0 +1,85 @@ +// dbs +package main + +import ( + "fmt" + "log" + "os" + // "strings" + "time" + + "github.com/garyburd/redigo/redis" + + "gopkg.in/mgo.v2" +) + +var ( + dbs = Dbs{ + RedisURI: "127.0.0.1:6379", + Database: "lastlogin", + } +) + +// Dbs structure +type Dbs struct { + MongoURI string + Database string + RedisURI string + rdb *redis.Pool //*redis.Client + mdb *mgo.Session +} + +// MongoQuota structure +type MongoQuota struct { + // ID string `json:"_id" bson:"_id"` + User string `json:"user" bson:"user"` + Messages int `json:"messages" bson:"messages"` + Storage int `json:"storage" bson:"storage"` + Insert time.Time `json:"insert" bson:"insert"` +} + +func (db *Dbs) isMongodb() bool { + if db.MongoURI != "" { + return true + } + + return false +} + +func (db *Dbs) poolRedis() { + + dbs.rdb = &redis.Pool{ + MaxIdle: 128, + MaxActive: 1000, + Wait: true, + IdleTimeout: 1 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", db.RedisURI) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + +} + +func (db *Dbs) connectMongo() { + + // mongoList := strings.Split(db.MongoURI, ",") + + // for m := range mongoList { + nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", db.MongoURI)) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + nm.SetSocketTimeout(5 * time.Second) + nm.SetSyncTimeout(5 * time.Second) + db.mdb = nm + // } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..5d89e1c --- /dev/null +++ b/main.go @@ -0,0 +1,98 @@ +// llmongo.go +package main + +import ( + "flag" + "fmt" + "log" + "os" + "sync" + "time" +) + +const ( + _Version = "v1.0.0" + _Producer = 0 + _Consumer = 1 +) + +var ( + loop bool + + done chan bool + consume chan []userQuota + + counter chan Counterchan + + wg sync.WaitGroup + + status int + + count *Counter +) + +func main() { + flag.Usage = usage + flag.Parse() + + if opts.Version { + fmt.Println(os.Args[0], _Version) + os.Exit(0) + } + + setTerm() + + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + fmt.Println("Log file error: ", err.Error()) + os.Exit(-4) + } + defer fs.Close() + + log.SetOutput(fs) + + pid.PIDFile = opts.Pidfile + pid.Write(true) + defer pid.Remove() + + start := time.Now() + fmt.Printf("Start: %+v\n", opts) + log.Printf("Start: %+v\n", opts) + + opts.Month = start.Format("0601") + + dbs.poolRedis() + defer dbs.rdb.Close() + + dbs.connectMongo() + defer dbs.mdb.Close() + + if opts.Timeout > 0 { + time.AfterFunc(opts.Timeout, exit) + } + + count = NewCounter() + + consume = make(chan []userQuota, opts.Queue) + loop = true + done = make(chan bool) + counter = make(chan Counterchan) + + go count.Run() + go producer() + for i := 0; i < opts.Queue; i++ { + go consumer() + } + + <-done + fmt.Printf("Done\n") + close(done) + + fmt.Println("Waiting WG") + wg.Wait() + + count.SetTime(time.Since(start)) + + fmt.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) + log.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..b394e0a --- /dev/null +++ b/options.go @@ -0,0 +1,64 @@ +// options +package main + +import ( + "flag" + "fmt" + "log" + "os" + "path" + "path/filepath" + "time" +) + +// Options structure +type Options struct { + CurrentPath string + Exe string + LogFile string + ConfigFile string + Timeout time.Duration + Debug bool + Test bool + Version bool + MaxBulk int + Month string + Pidfile string + Queue int +} + +var ( + opts = Options{ + LogFile: "log/llmongo.log", + } +) + +func usage() { + fmt.Println("Usage: llmongo -m -r -t -l -T -x -H -i -v") + fmt.Println() + os.Exit(0) +} + +func init() { + var err error + opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) + opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") + opts.Exe = path.Base(os.Args[0]) + + flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb") + flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database") + flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") + flag.BoolVar(&opts.Debug, "D", false, "Debug") + flag.BoolVar(&opts.Test, "DD", false, "Test") + flag.IntVar(&opts.MaxBulk, "M", 100, "Max Mongodb bulk") + flag.IntVar(&opts.Queue, "q", 2, "parallel consumer") + flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file") +} diff --git a/pid.go b/pid.go new file mode 100644 index 0000000..5ccc2e5 --- /dev/null +++ b/pid.go @@ -0,0 +1,85 @@ +// pid +package main + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "path" + "strconv" + "strings" +) + +var ( + pid = PID{} +) + +// PID structure +type PID struct { + PID string + PIDFile string +} + +// verifica se esiste il PIDFile; +// se esiste legge il PID e controlla se e' running il processo associato +func (p *PID) check() bool { + bpid, err := ioutil.ReadFile(p.PIDFile) + p.PID = strings.TrimRight(string(bpid), "\n") + if err == nil && p.readCmd() { + return true + } + return false +} + +// controlla se esiste il processo associato al PID, +// se il cmd e' lo stesso e se e' in esecuzione. +func (p *PID) readCmd() bool { + bcmd, err := ioutil.ReadFile(path.Join("/proc", p.PID, "cmdline")) + // non esiste la dir relativa al PID su /proc + if err != nil { + fmt.Println("cmdline error: ", err) + return false + } + cmd := bytes.Trim(bcmd, "\x00") + if !strings.Contains(string(cmd), opts.Exe) { + fmt.Printf("PID %s used by %s\n", pid, cmd) + } + return true +} + +// scrive il PID nel PIDFile +func (p *PID) Write(l bool) { + + if p.check() { + if l { + log.Println("Running: ", p.PID) + } else { + fmt.Println("Running: ", p.PID) + } + + os.Exit(-6) + } + + fpid, err := os.OpenFile(p.PIDFile, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + if l { + log.Println("PID file error: ", err.Error()) + } else { + fmt.Println("PID file error: ", err.Error()) + } + + os.Exit(-5) + } + fpid.WriteString(strconv.Itoa(os.Getpid())) + fpid.Close() +} + +// Remove cancella il PIDFile +func (p *PID) Remove() { + err := os.Remove(p.PIDFile) + if err != nil { + fmt.Println("RM file error: ", err.Error()) + } +} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..246dd2b --- /dev/null +++ b/producer.go @@ -0,0 +1,117 @@ +// iterator +package main + +import ( + "fmt" + "log" + "strconv" + "strings" + "time" + + "github.com/garyburd/redigo/redis" +) + +type userQuota struct { + user string + storage int + messages int +} + +type produced struct { + user string + logins []string +} + +func producer() { + conn := dbs.rdb.Get() + defer conn.Close() + + keys, err := redis.Strings(conn.Do("KEYS", "*")) + + if err != nil { + if opts.Debug { + fmt.Printf("Keys error: %v\n", err) + } + log.Printf("Keys error: %v\n", err) + exit() + } + + users := make(map[string]bool) + + for _, key := range keys { + user := key[:strings.Index(key, "@")] + users[user] = true + } + + uq := make([]userQuota, 0) + max := 0 + + for key, _ := range users { + + if !loop { + break + } + + // wg.Wait() + status = _Producer + + start := time.Now() + if opts.Test { + fmt.Printf("MGET: %s (%d)\n", key, max) + } + // estrae un userid dalla lista degli utenti che hanno fatto login + quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key))) + // se non ci sono piu' userid esce + if err != nil { + if opts.Debug { + fmt.Printf("MGET err: %v\n", err) + } + log.Printf("MGET err: %v\n", err) + continue + } + + counter <- Counterchan{ + tipo: "user", + val: 1, + } + + msg, err := strconv.Atoi(quota[0]) + if err != nil { + msg = 0 + } + store, err := strconv.Atoi(quota[1]) + if err != nil { + store = 0 + } + + uq = append(uq, userQuota{ + user: key, + messages: msg, + storage: store, + }) + + if opts.Test { + fmt.Printf("User: %s - %+v\n", key, quota) + } + + if max >= opts.MaxBulk { + if opts.Debug { + fmt.Printf("\nPROD: %+v\n", time.Since(start)) + } + + // wg.Add(1) + // counter <- Counterchan{ + // tipo: "wg", + // val: 1, + // } + + consume <- uq + uq = make([]userQuota, 0) + max = 0 + } + + max += 1 + } + + done <- true +} diff --git a/sigterm.go b/sigterm.go new file mode 100644 index 0000000..79fc906 --- /dev/null +++ b/sigterm.go @@ -0,0 +1,39 @@ +// sigterm +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func kill() { + log.Printf("KILL %d\n", status) + fmt.Printf("KILL %d\n", status) + wg.Done() + counter <- Counterchan{ + tipo: "wg", + val: -1, + } + done <- true +} + +func exit() { + log.Printf("EXIT %d\n", status) + fmt.Printf("EXIT %d\n", status) + loop = false + time.AfterFunc(time.Second*20, kill) +} + +func setTerm() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + <-c + exit() + }() +}