commit 0c9c9710ceeaefbd09fde6362cb8f009ed47936b Author: Michele Date: Fri Mar 10 13:40:26 2017 +0100 first commit diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..76289bf --- /dev/null +++ b/consumer.go @@ -0,0 +1,79 @@ +// consumer +package main + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "log" + // "strconv" + // "strings" + "time" + + // "gopkg.in/mgo.v2" +) + +type consumed struct { + user string + error bool + logins []string +} + +func hash(val []byte) string { + + h := sha256.New() + h.Write(val) + + return hex.EncodeToString(h.Sum(nil)) +} + +func consumer() { + + for { + + prod := <-consume + + start := time.Now() + + status = _Consumer + + if opts.Test { + fmt.Printf("CONS: users - %d\n", len(prod)) + } + + // for j := range dbs.mdb { + bulk := dbs.mdb.DB("quota").C("tiscali").Bulk() + bulk.Unordered() + + for p := range prod { + mquota := MongoQuota{ + User: prod[p].user, + Messages: prod[p].messages, + Storage: prod[p].storage, + Insert: time.Now(), + } + + bulk.Insert(mquota) + + if opts.Test { + log.Printf("OK: %s\n", prod[p].user) + } + } + + _, err := bulk.Run() + if err != nil { + fmt.Printf("Err: %+v\n", err) + counter <- Counterchan{ + tipo: "err", + val: 1, + } + continue + } + + if opts.Debug { + fmt.Printf("CONS: users=%d in %v - active=%d\n", len(prod), time.Since(start), dbs.rdb.ActiveCount()) + } + + //wg.Done() + } +} diff --git a/counter.go b/counter.go new file mode 100644 index 0000000..0c9a9cc --- /dev/null +++ b/counter.go @@ -0,0 +1,144 @@ +// counter +package main + +import ( + "sync" + "time" +) + +type Counterchan struct { + tipo string + val int +} + +// Counter structure +type Counter struct { + mu sync.Mutex + user int + insert int + err int + dup int + time time.Duration + wg int +} + +// NewCounter iniitialized Counter structure +func NewCounter() *Counter { + return &Counter{ + user: 0, + insert: 0, + err: 0, + dup: 0, + time: 0, + wg: 0, + } +} + +func (c *Counter) Run() { + for { + tocount := <-counter + + switch tocount.tipo { + case "user": + c.addUser() + case "dup": + c.addDuplicate(tocount.val) + case "ins": + c.addInsert(tocount.val) + case "wg": + if tocount.val > 0 { + c.addWG() + } else { + c.delWG() + } + case "err": + c.addErr(tocount.val) + + } + } +} + +// AddUser increment number of users managed +func (c *Counter) addUser() { + c.user++ +} + +// AddDuplicate increment number of duplicates log +func (c *Counter) addDuplicate(add int) { + c.dup += add +} + +// AddInsert increment number of inserted rows +func (c *Counter) addInsert(add int) { + c.insert += add +} + +// AddWG ... +func (c *Counter) addWG() { + c.wg++ +} + +// AddErr ... +func (c *Counter) addErr(add int) { + c.err += add +} + +// DelWG ... +func (c *Counter) delWG() { + c.wg-- +} + +// GetUser return total users +func (c *Counter) GetUser() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.user + return +} + +// GetDup return total duplicated logins +func (c *Counter) GetDup() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.dup + return +} + +// GetInsert return total inserted rows +func (c *Counter) GetInsert() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.insert + return +} + +// GetErr return total errors +func (c *Counter) GetErr() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.err + return +} + +// GetWG ... +func (c *Counter) GetWG() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.wg + return +} + +// GetTime ... +func (c *Counter) GetTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.time.Seconds() + return +} + +// SetTime ... +func (c *Counter) SetTime(t time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.time = t +} diff --git a/dbs.go b/dbs.go new file mode 100644 index 0000000..4de0902 --- /dev/null +++ b/dbs.go @@ -0,0 +1,85 @@ +// dbs +package main + +import ( + "fmt" + "log" + "os" + // "strings" + "time" + + "github.com/garyburd/redigo/redis" + + "gopkg.in/mgo.v2" +) + +var ( + dbs = Dbs{ + RedisURI: "127.0.0.1:6379", + Database: "lastlogin", + } +) + +// Dbs structure +type Dbs struct { + MongoURI string + Database string + RedisURI string + rdb *redis.Pool //*redis.Client + mdb *mgo.Session +} + +// MongoQuota structure +type MongoQuota struct { + // ID string `json:"_id" bson:"_id"` + User string `json:"user" bson:"user"` + Messages int `json:"messages" bson:"messages"` + Storage int `json:"storage" bson:"storage"` + Insert time.Time `json:"insert" bson:"insert"` +} + +func (db *Dbs) isMongodb() bool { + if db.MongoURI != "" { + return true + } + + return false +} + +func (db *Dbs) poolRedis() { + + dbs.rdb = &redis.Pool{ + MaxIdle: 128, + MaxActive: 1000, + Wait: true, + IdleTimeout: 1 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", db.RedisURI) + if err != nil { + return nil, err + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + +} + +func (db *Dbs) connectMongo() { + + // mongoList := strings.Split(db.MongoURI, ",") + + // for m := range mongoList { + nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", db.MongoURI)) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + nm.SetSocketTimeout(5 * time.Second) + nm.SetSyncTimeout(5 * time.Second) + db.mdb = nm + // } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..5d89e1c --- /dev/null +++ b/main.go @@ -0,0 +1,98 @@ +// llmongo.go +package main + +import ( + "flag" + "fmt" + "log" + "os" + "sync" + "time" +) + +const ( + _Version = "v1.0.0" + _Producer = 0 + _Consumer = 1 +) + +var ( + loop bool + + done chan bool + consume chan []userQuota + + counter chan Counterchan + + wg sync.WaitGroup + + status int + + count *Counter +) + +func main() { + flag.Usage = usage + flag.Parse() + + if opts.Version { + fmt.Println(os.Args[0], _Version) + os.Exit(0) + } + + setTerm() + + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + fmt.Println("Log file error: ", err.Error()) + os.Exit(-4) + } + defer fs.Close() + + log.SetOutput(fs) + + pid.PIDFile = opts.Pidfile + pid.Write(true) + defer pid.Remove() + + start := time.Now() + fmt.Printf("Start: %+v\n", opts) + log.Printf("Start: %+v\n", opts) + + opts.Month = start.Format("0601") + + dbs.poolRedis() + defer dbs.rdb.Close() + + dbs.connectMongo() + defer dbs.mdb.Close() + + if opts.Timeout > 0 { + time.AfterFunc(opts.Timeout, exit) + } + + count = NewCounter() + + consume = make(chan []userQuota, opts.Queue) + loop = true + done = make(chan bool) + counter = make(chan Counterchan) + + go count.Run() + go producer() + for i := 0; i < opts.Queue; i++ { + go consumer() + } + + <-done + fmt.Printf("Done\n") + close(done) + + fmt.Println("Waiting WG") + wg.Wait() + + count.SetTime(time.Since(start)) + + fmt.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) + log.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser()) +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..b394e0a --- /dev/null +++ b/options.go @@ -0,0 +1,64 @@ +// options +package main + +import ( + "flag" + "fmt" + "log" + "os" + "path" + "path/filepath" + "time" +) + +// Options structure +type Options struct { + CurrentPath string + Exe string + LogFile string + ConfigFile string + Timeout time.Duration + Debug bool + Test bool + Version bool + MaxBulk int + Month string + Pidfile string + Queue int +} + +var ( + opts = Options{ + LogFile: "log/llmongo.log", + } +) + +func usage() { + fmt.Println("Usage: llmongo -m -r -t -l -T -x -H -i -v") + fmt.Println() + os.Exit(0) +} + +func init() { + var err error + opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) + opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") + opts.Exe = path.Base(os.Args[0]) + + flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb") + flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database") + flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") + flag.BoolVar(&opts.Debug, "D", false, "Debug") + flag.BoolVar(&opts.Test, "DD", false, "Test") + flag.IntVar(&opts.MaxBulk, "M", 100, "Max Mongodb bulk") + flag.IntVar(&opts.Queue, "q", 2, "parallel consumer") + flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file") +} diff --git a/pid.go b/pid.go new file mode 100644 index 0000000..5ccc2e5 --- /dev/null +++ b/pid.go @@ -0,0 +1,85 @@ +// pid +package main + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "path" + "strconv" + "strings" +) + +var ( + pid = PID{} +) + +// PID structure +type PID struct { + PID string + PIDFile string +} + +// verifica se esiste il PIDFile; +// se esiste legge il PID e controlla se e' running il processo associato +func (p *PID) check() bool { + bpid, err := ioutil.ReadFile(p.PIDFile) + p.PID = strings.TrimRight(string(bpid), "\n") + if err == nil && p.readCmd() { + return true + } + return false +} + +// controlla se esiste il processo associato al PID, +// se il cmd e' lo stesso e se e' in esecuzione. +func (p *PID) readCmd() bool { + bcmd, err := ioutil.ReadFile(path.Join("/proc", p.PID, "cmdline")) + // non esiste la dir relativa al PID su /proc + if err != nil { + fmt.Println("cmdline error: ", err) + return false + } + cmd := bytes.Trim(bcmd, "\x00") + if !strings.Contains(string(cmd), opts.Exe) { + fmt.Printf("PID %s used by %s\n", pid, cmd) + } + return true +} + +// scrive il PID nel PIDFile +func (p *PID) Write(l bool) { + + if p.check() { + if l { + log.Println("Running: ", p.PID) + } else { + fmt.Println("Running: ", p.PID) + } + + os.Exit(-6) + } + + fpid, err := os.OpenFile(p.PIDFile, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + if l { + log.Println("PID file error: ", err.Error()) + } else { + fmt.Println("PID file error: ", err.Error()) + } + + os.Exit(-5) + } + fpid.WriteString(strconv.Itoa(os.Getpid())) + fpid.Close() +} + +// Remove cancella il PIDFile +func (p *PID) Remove() { + err := os.Remove(p.PIDFile) + if err != nil { + fmt.Println("RM file error: ", err.Error()) + } +} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..246dd2b --- /dev/null +++ b/producer.go @@ -0,0 +1,117 @@ +// iterator +package main + +import ( + "fmt" + "log" + "strconv" + "strings" + "time" + + "github.com/garyburd/redigo/redis" +) + +type userQuota struct { + user string + storage int + messages int +} + +type produced struct { + user string + logins []string +} + +func producer() { + conn := dbs.rdb.Get() + defer conn.Close() + + keys, err := redis.Strings(conn.Do("KEYS", "*")) + + if err != nil { + if opts.Debug { + fmt.Printf("Keys error: %v\n", err) + } + log.Printf("Keys error: %v\n", err) + exit() + } + + users := make(map[string]bool) + + for _, key := range keys { + user := key[:strings.Index(key, "@")] + users[user] = true + } + + uq := make([]userQuota, 0) + max := 0 + + for key, _ := range users { + + if !loop { + break + } + + // wg.Wait() + status = _Producer + + start := time.Now() + if opts.Test { + fmt.Printf("MGET: %s (%d)\n", key, max) + } + // estrae un userid dalla lista degli utenti che hanno fatto login + quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key))) + // se non ci sono piu' userid esce + if err != nil { + if opts.Debug { + fmt.Printf("MGET err: %v\n", err) + } + log.Printf("MGET err: %v\n", err) + continue + } + + counter <- Counterchan{ + tipo: "user", + val: 1, + } + + msg, err := strconv.Atoi(quota[0]) + if err != nil { + msg = 0 + } + store, err := strconv.Atoi(quota[1]) + if err != nil { + store = 0 + } + + uq = append(uq, userQuota{ + user: key, + messages: msg, + storage: store, + }) + + if opts.Test { + fmt.Printf("User: %s - %+v\n", key, quota) + } + + if max >= opts.MaxBulk { + if opts.Debug { + fmt.Printf("\nPROD: %+v\n", time.Since(start)) + } + + // wg.Add(1) + // counter <- Counterchan{ + // tipo: "wg", + // val: 1, + // } + + consume <- uq + uq = make([]userQuota, 0) + max = 0 + } + + max += 1 + } + + done <- true +} diff --git a/sigterm.go b/sigterm.go new file mode 100644 index 0000000..79fc906 --- /dev/null +++ b/sigterm.go @@ -0,0 +1,39 @@ +// sigterm +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func kill() { + log.Printf("KILL %d\n", status) + fmt.Printf("KILL %d\n", status) + wg.Done() + counter <- Counterchan{ + tipo: "wg", + val: -1, + } + done <- true +} + +func exit() { + log.Printf("EXIT %d\n", status) + fmt.Printf("EXIT %d\n", status) + loop = false + time.AfterFunc(time.Second*20, kill) +} + +func setTerm() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + <-c + exit() + }() +}