first commit

This commit is contained in:
Michele 2017-03-10 13:40:26 +01:00
commit 0c9c9710ce
8 changed files with 711 additions and 0 deletions

79
consumer.go Normal file
View file

@ -0,0 +1,79 @@
// consumer
package main
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
// "strconv"
// "strings"
"time"
// "gopkg.in/mgo.v2"
)
type consumed struct {
user string
error bool
logins []string
}
func hash(val []byte) string {
h := sha256.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func consumer() {
for {
prod := <-consume
start := time.Now()
status = _Consumer
if opts.Test {
fmt.Printf("CONS: users - %d\n", len(prod))
}
// for j := range dbs.mdb {
bulk := dbs.mdb.DB("quota").C("tiscali").Bulk()
bulk.Unordered()
for p := range prod {
mquota := MongoQuota{
User: prod[p].user,
Messages: prod[p].messages,
Storage: prod[p].storage,
Insert: time.Now(),
}
bulk.Insert(mquota)
if opts.Test {
log.Printf("OK: %s\n", prod[p].user)
}
}
_, err := bulk.Run()
if err != nil {
fmt.Printf("Err: %+v\n", err)
counter <- Counterchan{
tipo: "err",
val: 1,
}
continue
}
if opts.Debug {
fmt.Printf("CONS: users=%d in %v - active=%d\n", len(prod), time.Since(start), dbs.rdb.ActiveCount())
}
//wg.Done()
}
}

144
counter.go Normal file
View file

@ -0,0 +1,144 @@
// counter
package main
import (
"sync"
"time"
)
type Counterchan struct {
tipo string
val int
}
// Counter structure
type Counter struct {
mu sync.Mutex
user int
insert int
err int
dup int
time time.Duration
wg int
}
// NewCounter iniitialized Counter structure
func NewCounter() *Counter {
return &Counter{
user: 0,
insert: 0,
err: 0,
dup: 0,
time: 0,
wg: 0,
}
}
func (c *Counter) Run() {
for {
tocount := <-counter
switch tocount.tipo {
case "user":
c.addUser()
case "dup":
c.addDuplicate(tocount.val)
case "ins":
c.addInsert(tocount.val)
case "wg":
if tocount.val > 0 {
c.addWG()
} else {
c.delWG()
}
case "err":
c.addErr(tocount.val)
}
}
}
// AddUser increment number of users managed
func (c *Counter) addUser() {
c.user++
}
// AddDuplicate increment number of duplicates log
func (c *Counter) addDuplicate(add int) {
c.dup += add
}
// AddInsert increment number of inserted rows
func (c *Counter) addInsert(add int) {
c.insert += add
}
// AddWG ...
func (c *Counter) addWG() {
c.wg++
}
// AddErr ...
func (c *Counter) addErr(add int) {
c.err += add
}
// DelWG ...
func (c *Counter) delWG() {
c.wg--
}
// GetUser return total users
func (c *Counter) GetUser() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.user
return
}
// GetDup return total duplicated logins
func (c *Counter) GetDup() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.dup
return
}
// GetInsert return total inserted rows
func (c *Counter) GetInsert() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.insert
return
}
// GetErr return total errors
func (c *Counter) GetErr() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.err
return
}
// GetWG ...
func (c *Counter) GetWG() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.wg
return
}
// GetTime ...
func (c *Counter) GetTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.time.Seconds()
return
}
// SetTime ...
func (c *Counter) SetTime(t time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.time = t
}

85
dbs.go Normal file
View file

@ -0,0 +1,85 @@
// dbs
package main
import (
"fmt"
"log"
"os"
// "strings"
"time"
"github.com/garyburd/redigo/redis"
"gopkg.in/mgo.v2"
)
var (
dbs = Dbs{
RedisURI: "127.0.0.1:6379",
Database: "lastlogin",
}
)
// Dbs structure
type Dbs struct {
MongoURI string
Database string
RedisURI string
rdb *redis.Pool //*redis.Client
mdb *mgo.Session
}
// MongoQuota structure
type MongoQuota struct {
// ID string `json:"_id" bson:"_id"`
User string `json:"user" bson:"user"`
Messages int `json:"messages" bson:"messages"`
Storage int `json:"storage" bson:"storage"`
Insert time.Time `json:"insert" bson:"insert"`
}
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
}
return false
}
func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{
MaxIdle: 128,
MaxActive: 1000,
Wait: true,
IdleTimeout: 1 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", db.RedisURI)
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func (db *Dbs) connectMongo() {
// mongoList := strings.Split(db.MongoURI, ",")
// for m := range mongoList {
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", db.MongoURI))
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
nm.SetSocketTimeout(5 * time.Second)
nm.SetSyncTimeout(5 * time.Second)
db.mdb = nm
// }
}

98
main.go Normal file
View file

@ -0,0 +1,98 @@
// llmongo.go
package main
import (
"flag"
"fmt"
"log"
"os"
"sync"
"time"
)
const (
_Version = "v1.0.0"
_Producer = 0
_Consumer = 1
)
var (
loop bool
done chan bool
consume chan []userQuota
counter chan Counterchan
wg sync.WaitGroup
status int
count *Counter
)
func main() {
flag.Usage = usage
flag.Parse()
if opts.Version {
fmt.Println(os.Args[0], _Version)
os.Exit(0)
}
setTerm()
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
os.Exit(-4)
}
defer fs.Close()
log.SetOutput(fs)
pid.PIDFile = opts.Pidfile
pid.Write(true)
defer pid.Remove()
start := time.Now()
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
opts.Month = start.Format("0601")
dbs.poolRedis()
defer dbs.rdb.Close()
dbs.connectMongo()
defer dbs.mdb.Close()
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
}
count = NewCounter()
consume = make(chan []userQuota, opts.Queue)
loop = true
done = make(chan bool)
counter = make(chan Counterchan)
go count.Run()
go producer()
for i := 0; i < opts.Queue; i++ {
go consumer()
}
<-done
fmt.Printf("Done\n")
close(done)
fmt.Println("Waiting WG")
wg.Wait()
count.SetTime(time.Since(start))
fmt.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser())
log.Printf("Stop %v - user: %d\n\r", count.GetTime(), count.GetUser())
}

64
options.go Normal file
View file

@ -0,0 +1,64 @@
// options
package main
import (
"flag"
"fmt"
"log"
"os"
"path"
"path/filepath"
"time"
)
// Options structure
type Options struct {
CurrentPath string
Exe string
LogFile string
ConfigFile string
Timeout time.Duration
Debug bool
Test bool
Version bool
MaxBulk int
Month string
Pidfile string
Queue int
}
var (
opts = Options{
LogFile: "log/llmongo.log",
}
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -T <running ttl> -x <xymon server> -H <hostname> -i <influxdb uri> -v")
fmt.Println()
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", dbs.RedisURI, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxBulk, "M", 100, "Max Mongodb bulk")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")
}

85
pid.go Normal file
View file

@ -0,0 +1,85 @@
// pid
package main
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strconv"
"strings"
)
var (
pid = PID{}
)
// PID structure
type PID struct {
PID string
PIDFile string
}
// verifica se esiste il PIDFile;
// se esiste legge il PID e controlla se e' running il processo associato
func (p *PID) check() bool {
bpid, err := ioutil.ReadFile(p.PIDFile)
p.PID = strings.TrimRight(string(bpid), "\n")
if err == nil && p.readCmd() {
return true
}
return false
}
// controlla se esiste il processo associato al PID,
// se il cmd e' lo stesso e se e' in esecuzione.
func (p *PID) readCmd() bool {
bcmd, err := ioutil.ReadFile(path.Join("/proc", p.PID, "cmdline"))
// non esiste la dir relativa al PID su /proc
if err != nil {
fmt.Println("cmdline error: ", err)
return false
}
cmd := bytes.Trim(bcmd, "\x00")
if !strings.Contains(string(cmd), opts.Exe) {
fmt.Printf("PID %s used by %s\n", pid, cmd)
}
return true
}
// scrive il PID nel PIDFile
func (p *PID) Write(l bool) {
if p.check() {
if l {
log.Println("Running: ", p.PID)
} else {
fmt.Println("Running: ", p.PID)
}
os.Exit(-6)
}
fpid, err := os.OpenFile(p.PIDFile, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
if l {
log.Println("PID file error: ", err.Error())
} else {
fmt.Println("PID file error: ", err.Error())
}
os.Exit(-5)
}
fpid.WriteString(strconv.Itoa(os.Getpid()))
fpid.Close()
}
// Remove cancella il PIDFile
func (p *PID) Remove() {
err := os.Remove(p.PIDFile)
if err != nil {
fmt.Println("RM file error: ", err.Error())
}
}

117
producer.go Normal file
View file

@ -0,0 +1,117 @@
// iterator
package main
import (
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/garyburd/redigo/redis"
)
type userQuota struct {
user string
storage int
messages int
}
type produced struct {
user string
logins []string
}
func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
keys, err := redis.Strings(conn.Do("KEYS", "*"))
if err != nil {
if opts.Debug {
fmt.Printf("Keys error: %v\n", err)
}
log.Printf("Keys error: %v\n", err)
exit()
}
users := make(map[string]bool)
for _, key := range keys {
user := key[:strings.Index(key, "@")]
users[user] = true
}
uq := make([]userQuota, 0)
max := 0
for key, _ := range users {
if !loop {
break
}
// wg.Wait()
status = _Producer
start := time.Now()
if opts.Test {
fmt.Printf("MGET: %s (%d)\n", key, max)
}
// estrae un userid dalla lista degli utenti che hanno fatto login
quota, err := redis.Strings(conn.Do("mget", fmt.Sprintf("%s@tiscali.it/quota/messages", key), fmt.Sprintf("%s@tiscali.it/quota/storage", key)))
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("MGET err: %v\n", err)
}
log.Printf("MGET err: %v\n", err)
continue
}
counter <- Counterchan{
tipo: "user",
val: 1,
}
msg, err := strconv.Atoi(quota[0])
if err != nil {
msg = 0
}
store, err := strconv.Atoi(quota[1])
if err != nil {
store = 0
}
uq = append(uq, userQuota{
user: key,
messages: msg,
storage: store,
})
if opts.Test {
fmt.Printf("User: %s - %+v\n", key, quota)
}
if max >= opts.MaxBulk {
if opts.Debug {
fmt.Printf("\nPROD: %+v\n", time.Since(start))
}
// wg.Add(1)
// counter <- Counterchan{
// tipo: "wg",
// val: 1,
// }
consume <- uq
uq = make([]userQuota, 0)
max = 0
}
max += 1
}
done <- true
}

39
sigterm.go Normal file
View file

@ -0,0 +1,39 @@
// sigterm
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func kill() {
log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status)
wg.Done()
counter <- Counterchan{
tipo: "wg",
val: -1,
}
done <- true
}
func exit() {
log.Printf("EXIT %d\n", status)
fmt.Printf("EXIT %d\n", status)
loop = false
time.AfterFunc(time.Second*20, kill)
}
func setTerm() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
exit()
}()
}