diff --git a/Docker/Dockerfile b/Docker/Dockerfile new file mode 100644 index 0000000..061eb89 --- /dev/null +++ b/Docker/Dockerfile @@ -0,0 +1,11 @@ +#FROM busybox:latest +FROM scratch + +MAINTAINER Michele Fadda "" + +ARG VER +ENV VER ${VER:-0.0.0} + +COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb + +ENTRYPOINT [ "/bin/lastlogin_mongodb" ] diff --git a/Docker/build.sh b/Docker/build.sh new file mode 100755 index 0000000..f4beae2 --- /dev/null +++ b/Docker/build.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) . diff --git a/Docker/run.sh b/Docker/run.sh new file mode 100644 index 0000000..7131db0 --- /dev/null +++ b/Docker/run.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +docker run \ + --rm \ + -v /opt/llmongo/log:/data \ + --name llmongo_tiscali \ + --log-opt max-size=2m \ + --log-opt max-file=5 \ + mikif70/llmongo:4.2.1 \ + -l /data/llmongo.log \ + -r redis-ll.mail.tiscali.sys:6379 \ + -m 10.39.80.189:27017 \ + -d lastlogin \ + -i enginedb1@10.39.109.107:8086 \ + -T 60s \ + $@ diff --git a/backup.sh b/backup.sh new file mode 100644 index 0000000..a1b4ea7 --- /dev/null +++ b/backup.sh @@ -0,0 +1,80 @@ +#!/bin/bash + +MONGODB="secondary.mongod.service.mail:27017" +DATAPATH="/mnt/mongobck/NEW_EXPORT" + +NAME="mtools" +DB="lastlogin" + +opt() +{ + case "$1" in + -d) + shift + DATE=$1 + shift + opt $* + ;; + -n) + shift + NAME=$1 + shift + opt $* + ;; + -db) + shift + DB=$1 + shift + opt $* + ;; + -pwd) + shift + DATAPATH=${PWD} + shift + opt $* + ;; + esac +} + +echo "Running... $(date)" + +if [ -n "$1" ] +then + opt $* +fi + +echo "Opts: DB-$DB DATE-$DATE" + +if [ -z "${DATE}" ] +then + DATE=$(date -d "yesterday" +%Y%m%d) +fi + +YDAY=$(date -d "${DATE}" +%Y-%m-%d) +TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d) +QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0 +0:00:00Z\")}}" + +FNAME=$(date -d "${DATE}" +%y%m%d) +CNAME=$(date -d "${DATE}" +%y%m) + +echo "Query: ${QUERY}" + +if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then + mkdir -p ${DATAPATH}/${DB}/${CNAME} +fi + +/usr/bin/docker run \ + --rm \ + -h mtools \ + --name ${NAME} \ + -v ${DATAPATH}:/data \ + mikif70/mongotools:3.4.5 mongoexport \ + --host "${MONGODB}" \ + --db $DB \ + --collection "ll_${FNAME}" \ + --type "csv" \ + --fields "_id,user,protocol,ip,date,insert,country" \ + --query "${QUERY}" \ + --readPreference "secondary" \ + --out "/data/${DB}/${CNAME}/ll_${FNAME}.csv" \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..9cf0267 --- /dev/null +++ b/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo . + +mv lastlogin_mongodb-$(git describe --tags) Docker/ diff --git a/consul.go b/consul.go new file mode 100644 index 0000000..a959a24 --- /dev/null +++ b/consul.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/hashicorp/consul/api" +) + +// Consul client structure +type Consul struct { + RedisTTL time.Duration + Timeout time.Duration + MaxError int + Influxdb string + Month string + Retention float64 + MongoURI string + Database string + RedisURI string + Client *api.Client +} + +// NewClient New consul client +func NewClient() *Consul { + client, err := api.NewClient(api.DefaultConfig()) + if err != nil { + log.Fatalf("Consul error: %+v\n", err) + if opts.Debug { + fmt.Printf("Consul error: %+v\n", err) + } + } + + consul := &Consul{ + Client: client, + RedisTTL: opts.RedisTTL, + Timeout: opts.Timeout, + MaxError: opts.MaxError, + Influxdb: opts.Influxdb, + } + + return consul +} diff --git a/consumer.go b/consumer.go index 8dca1ac..0c4b7f8 100644 --- a/consumer.go +++ b/consumer.go @@ -2,90 +2,187 @@ package main import ( + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "encoding/hex" "fmt" - "github.com/garyburd/redigo/redis" "log" "strconv" "strings" "time" + + "github.com/globalsign/mgo" + // "gopkg.in/mgo.v2" ) +type consumed struct { + user string + error bool + logins []string +} + +func hash256(val []byte) string { + + h := sha256.New() + h.Write(val) + + return hex.EncodeToString(h.Sum(nil)) +} + +func hash160(val []byte) string { + + h := sha1.New() + h.Write(val) + + return hex.EncodeToString(h.Sum(nil)) +} + +func hash128(val []byte) string { + + h := md5.New() + h.Write(val) + + return hex.EncodeToString(h.Sum(nil)) +} + +// protocol:timestamp:ip:country + func consumer() { - var date int64 - var lastval string - var conn = dbs.rdb.Get() - defer conn.Close() for { - user := <-msgs - // Estrae l'ultimo login dell'utente 'user' - val, err := redis.String(conn.Do("LINDEX", user, "-1")) - if err != nil { - if opts.Debug { - log.Printf("LINDEX error: %+v - %s\n\r", err, val) - fmt.Printf("LINDEX error: %+v - %s\n\r", err, val) - } - // se ha trovato user e righe di login - if lastval != "" { - // reinserisce l'ultimo login e imposta il ttl su Redis - retval, _ := conn.Do("lpush", user, lastval) - ttl, _ := conn.Do("expire", user, opts.RedisTTL.Seconds()) - if opts.Debug { - log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) - fmt.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) - } - } - // break - continue - } - // se la riga di login e' vuota - if val == "" { - log.Println("Login empty: ", user) - retval, _ := conn.Do("lrem", user, "-1", val) - log.Println("LREM retval: ", user, val, retval) - continue - } - sval := strings.Split(val, ":") - // se il formato della riga di login non e' corretto - if sval[1] == "" { - log.Println("Login format error: ", val, user) - retval, _ := conn.Do("lrem", user, "-1", val) - log.Println("LREM retval: ", user, val, retval) - continue - } - // se il timestamp della riga di login non e' corretto - date, err = strconv.ParseInt(sval[1], 10, 64) - if err != nil { - log.Printf("Date Error: %+v - %s\n", err, user) - continue - } - ml := MongoLogin{ - User: user, - Protocol: sval[0], - Ip: sval[2], - Date: time.Unix(date, 0), - } - ind := Index{ - User: user, - Date: time.Unix(date, 0), - } - // inserisce il login su Mongodb - count++ - _, err = dbs.ll.Upsert(ind, ml) - if err != nil { - log.Printf("Insert error: %+v\n", err) - // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente - if !strings.Contains(err.Error(), "E11000") { - errCount += 1 + + prod := <-consume + + start := time.Now() + + status = _Consumer + + var bulk = make(map[string][]*mgo.Bulk) + var allLogins = make(map[string]MongoLogin) + + for i := range prod.logins { + login := prod.logins[i] + // se la riga di login e' vuota + if login == "" { + log.Println("Login empty: ", prod.user) continue } + sval := strings.Split(login, ":") + // se il formato della riga di login non e' corretto + if sval[1] == "" { + log.Println("Login format error: ", login, prod.user) + continue + } + // se il timestamp della riga di login non e' corretto + date, err := strconv.ParseInt(sval[1], 10, 64) + if err != nil { + log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) + continue + } + + // se la data e' piu vecchia di RETENTION (15552000 sec) la scarta + if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 { + log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login) + if opts.Debug { + fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login) + } + + if !opts.Test { + continue + } + } + + // verifica se esiste la country + if len(sval) <= 3 { + sval = append(sval, "NONE") + } + + // genera l' _ID con user + timestamp + ip + mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405") + ml := MongoLogin{ + ID: mlID, + User: prod.user, + Protocol: sval[0], + IP: sval[2], + Date: time.Unix(date, 0), + Insert: time.Now(), + Country: sval[3], + } + + allLogins[mlID] = ml + } - // cancella da Redis la riga di login inserita - retval, err := conn.Do("lrem", user, "-1", val) + + for _, val := range allLogins { + dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay + if _, ok := bulk[dt]; !ok { + for j := range dbs.mdb { + b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk() + b.Unordered() + bulk[dt] = append(bulk[dt], b) + } + } + for _, bl := range bulk[dt] { + bl.Insert(val) + } + } + + for _, val := range bulk { + for j, bl := range val { + result, err := bl.Run() + if j == 0 { + if err != nil { + if !strings.Contains(err.Error(), "E11000") { + fmt.Printf("Err: %+v\n", err) + prod.err = true + counter <- Counterchan{ + tipo: "err", + val: len(prod.logins), + } + if opts.Test || opts.Debug { + log.Printf("ERR: %s - %+v\n", prod.user, prod.logins) + } + continue + } else { + counter <- Counterchan{ + tipo: "dup", + val: strings.Count(err.Error(), "E11000"), + } + if opts.Debug { + log.Printf("DUP: %s - %+v\n", prod.user, prod.logins) + } + } + } else { + if opts.Test { + log.Printf("OK: %s - %+v\n", prod.user, prod.logins) + log.Printf("BulkResult: %s - %+v\n", prod.user, result) + } + counter <- Counterchan{ + tipo: "ins", + val: len(allLogins), + } + } + } + } + } + if opts.Debug { - log.Println("LREM retval: ", retval, user, val) - fmt.Println("LREM retval: ", retval, user, val) + fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) } - lastval = val + + if opts.Debug && len(prod.logins) > 10 { + fmt.Printf("LOGS: %+v\n", prod.logins) + } + + if !prod.err { + counter <- Counterchan{ + tipo: "rem", + val: len(prod.logins), + } + } + + // wg.Done() + remove <- prod } } diff --git a/counter.go b/counter.go new file mode 100644 index 0000000..1b616a0 --- /dev/null +++ b/counter.go @@ -0,0 +1,180 @@ +// counter +package main + +import ( + "sync" + "time" +) + +// Counterchan counter structure +type Counterchan struct { + tipo string + val int +} + +// Counter structure +type Counter struct { + mu sync.Mutex + user int + log int + insert int + rem int + err int + dup int + time time.Duration + wg int +} + +// NewCounter iniitialized Counter structure +func NewCounter() *Counter { + return &Counter{ + user: 0, + log: 0, + insert: 0, + err: 0, + rem: 0, + dup: 0, + time: 0, + wg: 0, + } +} + +// Run open channel to receive counter +func (c *Counter) Run() { + for { + tocount := <-counter + + switch tocount.tipo { + case "user": + c.addUser() + case "dup": + c.addDuplicate(tocount.val) + case "ins": + c.addInsert(tocount.val) + case "log": + c.addLog(tocount.val) + case "rem": + c.addRem(tocount.val) + case "wg": + if tocount.val > 0 { + c.addWG() + } else { + c.delWG() + } + case "err": + c.addErr(tocount.val) + + } + } +} + +// AddUser increment number of users managed +func (c *Counter) addUser() { + c.user++ +} + +// AddDuplicate increment number of duplicates log +func (c *Counter) addDuplicate(add int) { + c.dup += add +} + +// AddInsert increment number of inserted rows +func (c *Counter) addInsert(add int) { + c.insert += add +} + +// AddLog increment number of log's rows managed +func (c *Counter) addLog(add int) { + c.log += add +} + +//AddRem increment removed logs row +func (c *Counter) addRem(add int) { + c.rem += add +} + +// AddWG ... +func (c *Counter) addWG() { + c.wg++ +} + +// AddErr ... +func (c *Counter) addErr(add int) { + c.err += add +} + +// DelWG ... +func (c *Counter) delWG() { + c.wg-- +} + +// GetUser return total users +func (c *Counter) GetUser() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.user + return +} + +// GetDup return total duplicated logins +func (c *Counter) GetDup() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.dup + return +} + +// GetLog return total log's rows +func (c *Counter) GetLog() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.log + return +} + +// GetInsert return total inserted rows +func (c *Counter) GetInsert() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.insert + return +} + +// GetErr return total errors +func (c *Counter) GetErr() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.err + return +} + +// GetRem return total removed log's rows +func (c *Counter) GetRem() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.rem + return +} + +// GetWG ... +func (c *Counter) GetWG() (ret int) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.wg + return +} + +// GetTime ... +func (c *Counter) GetTime() (ret float64) { + c.mu.Lock() + defer c.mu.Unlock() + ret = c.time.Seconds() + return +} + +// SetTime ... +func (c *Counter) SetTime(t time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.time = t +} diff --git a/day_init.js b/day_init.js new file mode 100644 index 0000000..8bef682 --- /dev/null +++ b/day_init.js @@ -0,0 +1,15 @@ +// v1.2 +var db = connect("192.168.0.1:27017/lastlogin"); +var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} +var now = new Date(); +var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate()) +print("creating index: ll_",col.slice(2)); +db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}]) +db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"}) +var db = connect("192.168.0.1:27017/katamail"); +var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} +var now = new Date(); +var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate()) +print("creating index: ll_",col.slice(2)); +db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}]) +db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"}) \ No newline at end of file diff --git a/dbs.go b/dbs.go index 235de8d..4e28cf5 100644 --- a/dbs.go +++ b/dbs.go @@ -2,55 +2,80 @@ package main import ( - "github.com/garyburd/redigo/redis" - // "github.com/fzzy/radix/redis" - "gopkg.in/mgo.v2" + "fmt" "log" "os" + "strings" "time" + + "github.com/garyburd/redigo/redis" + + "github.com/globalsign/mgo" + // "gopkg.in/mgo.v2" ) var ( dbs = Dbs{ - MongoUri: "mongodb://127.0.0.1:27018", - RedisUri: "127.0.0.1:6379", + RedisURI: "", + Database: "", } ) +// Dbs structure type Dbs struct { - MongoUri string - RedisUri string + MongoURI string + Database string + RedisURI string rdb *redis.Pool //*redis.Client - mdb *mgo.Session - ll *mgo.Collection - // us *mgo.Collection + mdb []*mgo.Session } +// MongoLogin structure type MongoLogin struct { - User string `json:"user"` - Protocol string `json:"protocol"` - Ip string `json:"ip"` - Date time.Time `json:"date"` + ID string `json:"_id" bson:"_id" gorethink:"id"` + User string `json:"user" bson:"user" gorethink:"user"` + Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` + IP string `json:"ip" bson:"ip" gorethink:"ip"` + Date time.Time `json:"date" bson:"date" gorethink:"date"` + Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"` + Country string `json:"country" bson:"country" gorethink:"country"` } +// Ips structure +type Ips struct { + IP string `json:"ip"` +} + +// UserLogin structure type UserLogin struct { User string `json:"user"` Date time.Time `json:"date"` Lock bool `json:"lock"` } +// Index structure type Index struct { User string `json:"user"` Date time.Time `json:"date"` } +func (db *Dbs) isMongodb() bool { + if db.MongoURI != "" { + return true + } + + return false +} + func (db *Dbs) poolRedis() { dbs.rdb = &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, + MaxIdle: 128, + MaxActive: 1000, + Wait: true, + IdleTimeout: 1 * time.Second, Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", db.RedisUri) + c, err := redis.Dial("tcp", db.RedisURI) if err != nil { return nil, err } @@ -64,24 +89,18 @@ func (db *Dbs) poolRedis() { } -/* -func (db *Dbs) connectRedis() { - var err error - db.rdb, err = redis.Dial("tcp", db.RedisUri) - if err != nil { - log.Println("Redis connect Error: ", err.Error()) - os.Exit(-1) - } -} -*/ - func (db *Dbs) connectMongo() { - var err error - db.mdb, err = mgo.Dial(db.MongoUri) - if err != nil { - log.Println("Mongodb connect Error: ", err.Error()) - os.Exit(-3) + + mongoList := strings.Split(db.MongoURI, "|") + + for m := range mongoList { + nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m])) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + nm.SetSocketTimeout(5 * time.Second) + nm.SetSyncTimeout(5 * time.Second) + db.mdb = append(db.mdb, nm) } - db.ll = db.mdb.DB("dovecot").C("lastlogin") - // db.us = db.mdb.DB("dovecot").C("userlogin") } diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml new file mode 100644 index 0000000..cf96102 --- /dev/null +++ b/docker-compose/docker-compose.yml @@ -0,0 +1,18 @@ +version: '2' +services: + mongodb: + image: mikif70/mongodb:3.6.9 + ports: + - 27017:27017 + container_name: ll_mongod + volumes: + - ./mongod:/data + depends_on: + - redis + redis: + image: "redis:alpine" + container_name: ll_redis + ports: + - 6379:6379 + volumes: + - ./redis:/data diff --git a/docker-compose/mongo.sh b/docker-compose/mongo.sh new file mode 100755 index 0000000..e7fadef --- /dev/null +++ b/docker-compose/mongo.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +if [ -z $1 ]; then + HOST=192.168.0.1:27017 +else + HOST=${1} +fi + +docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST} diff --git a/docker-compose/test.sh b/docker-compose/test.sh new file mode 100755 index 0000000..233ebbf --- /dev/null +++ b/docker-compose/test.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +echo "Starting mikif70/llmongo:$(git -C .. describe --tags)" + +docker run \ + --rm \ + -v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \ + --name llmongo \ + mikif70/llmongo:$(git -C .. describe --tags) \ + -l /data/llmongo.log \ + -r 192.168.0.1:6379 \ + -m 192.168.0.1:27017 \ + -d lastlogin \ + -i test@10.39.253.206:8086 \ + -T 80s \ + $@ diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..be821d2 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,73 @@ +// influxdb +package main + +import ( + "fmt" + "log" + "time" + + influxdb "github.com/influxdata/influxdb/client/v2" +) + +var ( + infdb string + infhost string +) + +func writeStats(start time.Time) { + if opts.Debug { + fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb) + fmt.Printf("host: %s -- addr: %s\n", infhost, infdb) + } + + c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: fmt.Sprintf("http://%s", infdb), + Timeout: 2 * time.Second, + }) + if err != nil { + fmt.Printf("InfluxDB connect Error: %+v\n", err) + log.Printf("InfluxDB connect Error: %+v\n", err) + return + } + defer c.Close() + + bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ + Database: "dovecot", + Precision: "s", + }) + if err != nil { + fmt.Printf("InfluxDB batch Error: %+v\n", err) + log.Printf("InfluxDB batch Error: %+v\n", err) + return + } + + tags := map[string]string{"server": infhost, "domain": dbs.Database} + fields := map[string]interface{}{ + "user": count.GetUser(), + "log": count.GetLog(), + "err": count.GetErr(), + "rem": count.GetRem(), + "dup": count.GetDup(), + "stop": count.GetTime(), + } + pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start) + if err != nil { + fmt.Printf("InfluxDB point Error: %+v\n", err) + log.Printf("InfluxDB point Error: %+v\n", err) + return + } + + if opts.Debug { + fmt.Printf("InfluxDB pt: %+v\n", pt) + } + + bp.AddPoint(pt) + + // Write the batch + err = c.Write(bp) + if err != nil { + fmt.Printf("InfluxDB write Error: %+v\n", err) + log.Printf("InfluxDB write Error: %+v\n", err) + return + } +} diff --git a/logrotate.cfg b/logrotate.cfg new file mode 100644 index 0000000..4f500de --- /dev/null +++ b/logrotate.cfg @@ -0,0 +1,28 @@ +/opt/mongodb/mongod/log/*.log +{ + daily + rotate 10 + maxsize 10M + compress + delaycompress + missingok + notifempty + sharedscripts + copytruncate + postrotate + /usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true + endscript +} + +/opt/llmongo/log/*.log +{ + daily + maxsize 5M + rotate 8 + compress + delaycompress + missingok + notifempty + sharedscripts + copytruncate +} diff --git a/main.go b/main.go index 3346835..304e716 100644 --- a/main.go +++ b/main.go @@ -6,78 +6,67 @@ import ( "fmt" "log" "os" - "path" - "path/filepath" + "regexp" + "sync" "time" + + _ "github.com/nats-io/go-nats" ) -type Options struct { - RedisTTL time.Duration - CurrentPath string - Exe string - LogFile string - Timeout bool - Debug bool - Version bool -} - const ( - _VERSION = "v2.0.0" + _Version = "v5.0.0b6" + _Producer = 0 + _Consumer = 1 + _Remover = 2 ) var ( - opts = Options{ - RedisTTL: time.Hour * 11688, // 16 mesi - LogFile: "log/llmongo.log", - } + loop bool - loop = true - ttl = time.Second * 55 - done = make(chan bool) - msgs = make(chan string) + done chan bool + consume chan loginsList + remove chan loginsList - count = 0 - errCount = 0 + counter chan Counterchan + + wg sync.WaitGroup + + status int + + count *Counter ) -func usage() { - fmt.Println("Usage: llmongo -m -r -t -l -T -D -v\n") - os.Exit(0) -} - -func init() { - var err error - opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) - if err != nil { - log.Fatal(err) - } - - opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) - pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") - opts.Exe = path.Base(os.Args[0]) - - flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb") - flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis") - flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") - flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL") - flag.BoolVar(&opts.Version, "v", false, "Version") - flag.BoolVar(&opts.Timeout, "T", false, "Timeout") - flag.BoolVar(&opts.Debug, "D", false, "Debug") -} - -func stopLoop() { - loop = false -} - func main() { flag.Usage = usage flag.Parse() if opts.Version { - fmt.Println(os.Args[0], _VERSION) + fmt.Println(os.Args[0], _Version) os.Exit(0) } + if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" { + flag.Usage() + os.Exit(-1) + } + + if opts.Influxdb != "" { + var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`) + // re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`) + if re.MatchString(opts.Influxdb) { + match := re.FindStringSubmatch(opts.Influxdb) + if opts.Debug { + fmt.Printf("Influxdb match: %+v\n", match) + } + infhost = match[1] + infdb = match[2] + ":" + match[3] + } else { + opts.Influxdb = "" + } + } + + setTerm() + fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) @@ -87,27 +76,58 @@ func main() { log.SetOutput(fs) - pid.Write(true) - defer pid.Remove() + // pid.PIDFile = opts.Pidfile + // pid.Write(true) + // defer pid.Remove() start := time.Now() - fmt.Printf("Start: %+v\n", opts) - log.Printf("Start: %+v\n", opts) + fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs) + log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs) + + opts.Month = start.Format("0601") dbs.poolRedis() defer dbs.rdb.Close() dbs.connectMongo() - defer dbs.mdb.Close() - - if opts.Timeout { - time.AfterFunc(ttl, stopLoop) + for k := range dbs.mdb { + defer dbs.mdb[k].Close() } - go producer() - go consumer() - <-done + if opts.Timeout > 0 { + time.AfterFunc(opts.Timeout, exit) + } - fmt.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount) - log.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount) + count = NewCounter() + + consume = make(chan loginsList, opts.Queue) + remove = make(chan loginsList, opts.Queue) + loop = true + done = make(chan bool) + counter = make(chan Counterchan) + + go count.Run() + go producer() + for i := 0; i < opts.Queue; i++ { + for j := 0; j < len(dbs.mdb); j++ { + go consumer() + } + go remover() + } + + <-done + fmt.Printf("Done\n") + close(done) + + fmt.Println("Waiting WG") + wg.Wait() + + count.SetTime(time.Since(start)) + + fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) + log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) + + if opts.Influxdb != "" { + writeStats(start) + } } diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..4bcb79e --- /dev/null +++ b/main_test.go @@ -0,0 +1,15 @@ +// Testmain.go +package main + +import ( + "flag" + "fmt" + "testing" +) + +func TestOptions(t *testing.T) { + flag.Usage = usage + flag.Parse() + + fmt.Printf("Config: %+v\n", opts) +} diff --git a/mongo_scripts.txt b/mongo_scripts.txt new file mode 100644 index 0000000..6dc7834 --- /dev/null +++ b/mongo_scripts.txt @@ -0,0 +1,34 @@ + +################ +var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} + +for (var i=1; i<18; i++) { + var col = "ll_1806"+dd(i); + print(col); + db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true}); +} + +########### +var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} + +for (var m=1; m<=12; m++) { + for (var d=1; d<=31; d++) { + var col = "ll_15"+dd(m)+dd(d); + if (db.getCollection(col).exists != null) { + print(col); + print(db.getCollection(col).totalSize()); + } + } +} + + +#### +var ar = db.getCollectionNames() +var len = ar.length + +for (var i=0; i + -d + -r + -t + -l + -T + -i + -C + -P [port] ## used by consul to check services ## + -q + -R + -v + -D + -DD `) + fmt.Println() + os.Exit(0) +} + +func init() { + var err error + opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) + opts.Exe = path.Base(os.Args[0]) + + flag.StringVar(&opts.Influxdb, "i", "", "influxdb server") + flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb") + flag.StringVar(&dbs.Database, "d", "", "Mongodb Database") + flag.StringVar(&dbs.RedisURI, "r", "", "Redis") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client") + flag.IntVar(&opts.Port, "P", 10500, "service check port") + flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") + flag.BoolVar(&opts.Debug, "D", false, "Debug") + flag.BoolVar(&opts.Test, "DD", false, "Test") + flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error") + flag.IntVar(&opts.Queue, "q", 2, "parallel consumer") + flag.Float64Var(&opts.Retention, "R", 15552000, "retention") +} diff --git a/pid.go b/pid.go index 2533af2..da94004 100644 --- a/pid.go +++ b/pid.go @@ -1,6 +1,7 @@ // pid package main +/* import ( "bytes" "fmt" @@ -16,6 +17,7 @@ var ( pid = PID{} ) +// PID structure type PID struct { PID string PIDFile string @@ -42,11 +44,8 @@ func (p *PID) readCmd() bool { return false } cmd := bytes.Trim(bcmd, "\x00") - if strings.Contains(string(cmd), opts.Exe) { - return true - } else { + if !strings.Contains(string(cmd), opts.Exe) { fmt.Printf("PID %s used by %s\n", pid, cmd) - return true } return true } @@ -78,10 +77,11 @@ func (p *PID) Write(l bool) { fpid.Close() } -// Cancella il PIDFile +// Remove cancella il PIDFile func (p *PID) Remove() { err := os.Remove(p.PIDFile) if err != nil { fmt.Println("RM file error: ", err.Error()) } } +*/ diff --git a/producer.go b/producer.go index 4185a85..1d39be0 100644 --- a/producer.go +++ b/producer.go @@ -3,30 +3,90 @@ package main import ( "fmt" - "github.com/garyburd/redigo/redis" "log" + // "strconv" + "time" + + "github.com/garyburd/redigo/redis" ) +type produced struct { + user string + logins []string +} + +type loginsList struct { + user string + logins []string + err bool +} + func producer() { conn := dbs.rdb.Get() defer conn.Close() for loop { + + wg.Wait() + status = _Producer + + start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login user, err := redis.String(conn.Do("spop", "llindex")) - if opts.Debug { - log.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err) - fmt.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err) - } // se non ci sono piu' userid esce if err != nil { if opts.Debug { - log.Printf("LLINDEX empty: %v\n\r", err) - fmt.Printf("LLINDEX empty: %v\n\r", err) + fmt.Printf("LLINDEX empty: %v\n", err) } + log.Printf("LLINDEX empty: %v\n", err) break } - msgs <- user + + // estrae tutti i logins dell'utente "user" + logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1")) + if err != nil { + if opts.Debug { + fmt.Printf("LRANGE: %+v - %+v\n", err, logs) + } + log.Printf("LRANGE: %+v - %+v\n", err, logs) + } + + if opts.Debug { + fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) + } + + if len(logs) > 0 { + counter <- Counterchan{ + tipo: "user", + val: 1, + } + + if opts.Debug { + fmt.Printf("PROD: %+v\n", time.Since(start)) + } + + if opts.Test { + log.Printf("PROD: %s - %d\n", user, len(logs)) + } + + wg.Add(1) + counter <- Counterchan{ + tipo: "wg", + val: 1, + } + + counter <- Counterchan{ + tipo: "log", + val: len(logs), + } + + consume <- loginsList{ + user: user, + logins: logs, + err: false, + } + } } + done <- true } diff --git a/remover.go b/remover.go new file mode 100644 index 0000000..9dc2679 --- /dev/null +++ b/remover.go @@ -0,0 +1,51 @@ +// finalizer +package main + +import ( + "fmt" + + "time" +) + +func remover() { + + var conn = dbs.rdb.Get() + defer conn.Close() + + for { + rem := <-remove + + status = _Remover + + start := time.Now() + if !rem.err { + for i := range rem.logins { + // cancella da Redis la riga di login inserita partendo da 1 + conn.Send("lrem", rem.user, "1", rem.logins[i]) + } + } + + // se ci sono errori o non e' vuota la lista di logins reinserisce lo user + if rem.err { + if opts.Debug { + fmt.Printf("SADD: %s\n", rem.user) + } + conn.Send("sadd", "llindex", rem.user) + if count.GetErr() >= opts.MaxError { + exit() + } + } + conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) + conn.Flush() + + if opts.Debug { + fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start)) + } + + counter <- Counterchan{ + tipo: "wg", + val: -1, + } + wg.Done() + } +} diff --git a/rethinkdb.go b/rethinkdb.go new file mode 100644 index 0000000..13c715d --- /dev/null +++ b/rethinkdb.go @@ -0,0 +1,75 @@ +package main + +/* + +import ( + "fmt" + "log" + // "strings" + + rt "gopkg.in/dancannon/gorethink.v2" +) + +type Rethink struct { + rtdb *rt.Session +} + +func NewRethinkDB(cluster []string) (*Rethink, error) { + var ( + err error + session *rt.Session + ) + + if len(cluster) > 1 { + session, err = rt.Connect(rt.ConnectOpts{ + Addresses: cluster, + InitialCap: 10, + MaxOpen: 10, + }) + } else { + session, err = rt.Connect(rt.ConnectOpts{ + Address: cluster[0], + InitialCap: 10, + MaxOpen: 10, + }) + } + + if err != nil { + log.Printf("RethinkDB ERROR: %+v\n", err.Error()) + return nil, err + } + + return &Rethink{ + rtdb: session, + }, nil +} + +func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) { + resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb) + if opts.Debug { + fmt.Printf("RTinsert: %+v\n", resp) + } + if err != nil { + return resp, err + } + + return resp, nil +} + +func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) { + resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb) + //resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb) + if opts.Debug { + fmt.Printf("RTMulti: %+v\n", resp) + } + if err != nil { + return resp, err + } + + return resp, nil +} + +func (r *Rethink) Close() { + r.rtdb.Close() +} +*/ diff --git a/sigterm.go b/sigterm.go new file mode 100644 index 0000000..79fc906 --- /dev/null +++ b/sigterm.go @@ -0,0 +1,39 @@ +// sigterm +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func kill() { + log.Printf("KILL %d\n", status) + fmt.Printf("KILL %d\n", status) + wg.Done() + counter <- Counterchan{ + tipo: "wg", + val: -1, + } + done <- true +} + +func exit() { + log.Printf("EXIT %d\n", status) + fmt.Printf("EXIT %d\n", status) + loop = false + time.AfterFunc(time.Second*20, kill) +} + +func setTerm() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + <-c + exit() + }() +} diff --git a/sys/llmongo.cron b/sys/llmongo.cron index d83b17f..0066f34 100644 --- a/sys/llmongo.cron +++ b/sys/llmongo.cron @@ -24,4 +24,9 @@ # # m h dom mon dow command -*/10 * * * * /opt/llmongo/start.sh > /dev/null +### lastlogin import +*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1 +*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1 +05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1 +### lastlogin create indexes +00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1 diff --git a/xymon.go b/xymon.go new file mode 100644 index 0000000..24c450f --- /dev/null +++ b/xymon.go @@ -0,0 +1,63 @@ +// xymon.go +package main + +/* +import ( + "fmt" + "log" + "net" + "strings" + "time" +) + +func sendStatus() { + + var msg string + + host := strings.Replace(opts.Hostname, ".", ",", -1) + + if count.GetErr() > 0 { + msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "yellow", time.Now(), "errors", count.GetErr()) + } else { + msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "green", time.Now(), "logins", count.GetLog()) + } + + if opts.Debug { + fmt.Println("Sending: ", msg) + } + + xymonSend([]byte(msg)) +} + +func xymonSend(msg []byte) error { + + var server string + + if strings.Contains(opts.Xymon, ":") { + server = opts.Xymon + } else { + server = opts.Xymon + ":1984" + } + + cl, err := net.Dial("tcp", server) + if err != nil { + fmt.Printf("xymon connect error: ", err) + log.Printf("xymon connect error: ", err) + return err + } + defer cl.Close() + + // fmt.Printf("xymon: %s - localhost: %s\n", cl.RemoteAddr(), cl.LocalAddr()) + + _, err = cl.Write(msg) + if err != nil { + fmt.Printf("xymon write error: ", err) + log.Printf("xymon write error: ", err) + return err + } + + // fmt.Printf("written: %d\n", tot) + + return nil +} +*/