diff --git a/Docker/Dockerfile b/Docker/Dockerfile deleted file mode 100644 index 061eb89..0000000 --- a/Docker/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -#FROM busybox:latest -FROM scratch - -MAINTAINER Michele Fadda "" - -ARG VER -ENV VER ${VER:-0.0.0} - -COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb - -ENTRYPOINT [ "/bin/lastlogin_mongodb" ] diff --git a/Docker/build.sh b/Docker/build.sh deleted file mode 100755 index f4beae2..0000000 --- a/Docker/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) . diff --git a/Docker/run.sh b/Docker/run.sh deleted file mode 100644 index 7131db0..0000000 --- a/Docker/run.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash - -docker run \ - --rm \ - -v /opt/llmongo/log:/data \ - --name llmongo_tiscali \ - --log-opt max-size=2m \ - --log-opt max-file=5 \ - mikif70/llmongo:4.2.1 \ - -l /data/llmongo.log \ - -r redis-ll.mail.tiscali.sys:6379 \ - -m 10.39.80.189:27017 \ - -d lastlogin \ - -i enginedb1@10.39.109.107:8086 \ - -T 60s \ - $@ diff --git a/backup.sh b/backup.sh deleted file mode 100644 index a1b4ea7..0000000 --- a/backup.sh +++ /dev/null @@ -1,80 +0,0 @@ -#!/bin/bash - -MONGODB="secondary.mongod.service.mail:27017" -DATAPATH="/mnt/mongobck/NEW_EXPORT" - -NAME="mtools" -DB="lastlogin" - -opt() -{ - case "$1" in - -d) - shift - DATE=$1 - shift - opt $* - ;; - -n) - shift - NAME=$1 - shift - opt $* - ;; - -db) - shift - DB=$1 - shift - opt $* - ;; - -pwd) - shift - DATAPATH=${PWD} - shift - opt $* - ;; - esac -} - -echo "Running... $(date)" - -if [ -n "$1" ] -then - opt $* -fi - -echo "Opts: DB-$DB DATE-$DATE" - -if [ -z "${DATE}" ] -then - DATE=$(date -d "yesterday" +%Y%m%d) -fi - -YDAY=$(date -d "${DATE}" +%Y-%m-%d) -TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d) -QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0 -0:00:00Z\")}}" - -FNAME=$(date -d "${DATE}" +%y%m%d) -CNAME=$(date -d "${DATE}" +%y%m) - -echo "Query: ${QUERY}" - -if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then - mkdir -p ${DATAPATH}/${DB}/${CNAME} -fi - -/usr/bin/docker run \ - --rm \ - -h mtools \ - --name ${NAME} \ - -v ${DATAPATH}:/data \ - mikif70/mongotools:3.4.5 mongoexport \ - --host "${MONGODB}" \ - --db $DB \ - --collection "ll_${FNAME}" \ - --type "csv" \ - --fields "_id,user,protocol,ip,date,insert,country" \ - --query "${QUERY}" \ - --readPreference "secondary" \ - --out "/data/${DB}/${CNAME}/ll_${FNAME}.csv" \ No newline at end of file diff --git a/build.sh b/build.sh deleted file mode 100755 index 9cf0267..0000000 --- a/build.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo . - -mv lastlogin_mongodb-$(git describe --tags) Docker/ diff --git a/consul.go b/consul.go deleted file mode 100644 index a959a24..0000000 --- a/consul.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" - - "github.com/hashicorp/consul/api" -) - -// Consul client structure -type Consul struct { - RedisTTL time.Duration - Timeout time.Duration - MaxError int - Influxdb string - Month string - Retention float64 - MongoURI string - Database string - RedisURI string - Client *api.Client -} - -// NewClient New consul client -func NewClient() *Consul { - client, err := api.NewClient(api.DefaultConfig()) - if err != nil { - log.Fatalf("Consul error: %+v\n", err) - if opts.Debug { - fmt.Printf("Consul error: %+v\n", err) - } - } - - consul := &Consul{ - Client: client, - RedisTTL: opts.RedisTTL, - Timeout: opts.Timeout, - MaxError: opts.MaxError, - Influxdb: opts.Influxdb, - } - - return consul -} diff --git a/consumer.go b/consumer.go index 0c4b7f8..8dca1ac 100644 --- a/consumer.go +++ b/consumer.go @@ -2,187 +2,90 @@ package main import ( - "crypto/md5" - "crypto/sha1" - "crypto/sha256" - "encoding/hex" "fmt" + "github.com/garyburd/redigo/redis" "log" "strconv" "strings" "time" - - "github.com/globalsign/mgo" - // "gopkg.in/mgo.v2" ) -type consumed struct { - user string - error bool - logins []string -} - -func hash256(val []byte) string { - - h := sha256.New() - h.Write(val) - - return hex.EncodeToString(h.Sum(nil)) -} - -func hash160(val []byte) string { - - h := sha1.New() - h.Write(val) - - return hex.EncodeToString(h.Sum(nil)) -} - -func hash128(val []byte) string { - - h := md5.New() - h.Write(val) - - return hex.EncodeToString(h.Sum(nil)) -} - -// protocol:timestamp:ip:country - func consumer() { + var date int64 + var lastval string + var conn = dbs.rdb.Get() + defer conn.Close() for { - - prod := <-consume - - start := time.Now() - - status = _Consumer - - var bulk = make(map[string][]*mgo.Bulk) - var allLogins = make(map[string]MongoLogin) - - for i := range prod.logins { - login := prod.logins[i] - // se la riga di login e' vuota - if login == "" { - log.Println("Login empty: ", prod.user) - continue + user := <-msgs + // Estrae l'ultimo login dell'utente 'user' + val, err := redis.String(conn.Do("LINDEX", user, "-1")) + if err != nil { + if opts.Debug { + log.Printf("LINDEX error: %+v - %s\n\r", err, val) + fmt.Printf("LINDEX error: %+v - %s\n\r", err, val) } - sval := strings.Split(login, ":") - // se il formato della riga di login non e' corretto - if sval[1] == "" { - log.Println("Login format error: ", login, prod.user) - continue - } - // se il timestamp della riga di login non e' corretto - date, err := strconv.ParseInt(sval[1], 10, 64) - if err != nil { - log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) - continue - } - - // se la data e' piu vecchia di RETENTION (15552000 sec) la scarta - if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 { - log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login) + // se ha trovato user e righe di login + if lastval != "" { + // reinserisce l'ultimo login e imposta il ttl su Redis + retval, _ := conn.Do("lpush", user, lastval) + ttl, _ := conn.Do("expire", user, opts.RedisTTL.Seconds()) if opts.Debug { - fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login) - } - - if !opts.Test { - continue + log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) + fmt.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds()) } } - - // verifica se esiste la country - if len(sval) <= 3 { - sval = append(sval, "NONE") - } - - // genera l' _ID con user + timestamp + ip - mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405") - ml := MongoLogin{ - ID: mlID, - User: prod.user, - Protocol: sval[0], - IP: sval[2], - Date: time.Unix(date, 0), - Insert: time.Now(), - Country: sval[3], - } - - allLogins[mlID] = ml - + // break + continue } - - for _, val := range allLogins { - dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay - if _, ok := bulk[dt]; !ok { - for j := range dbs.mdb { - b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk() - b.Unordered() - bulk[dt] = append(bulk[dt], b) - } - } - for _, bl := range bulk[dt] { - bl.Insert(val) + // se la riga di login e' vuota + if val == "" { + log.Println("Login empty: ", user) + retval, _ := conn.Do("lrem", user, "-1", val) + log.Println("LREM retval: ", user, val, retval) + continue + } + sval := strings.Split(val, ":") + // se il formato della riga di login non e' corretto + if sval[1] == "" { + log.Println("Login format error: ", val, user) + retval, _ := conn.Do("lrem", user, "-1", val) + log.Println("LREM retval: ", user, val, retval) + continue + } + // se il timestamp della riga di login non e' corretto + date, err = strconv.ParseInt(sval[1], 10, 64) + if err != nil { + log.Printf("Date Error: %+v - %s\n", err, user) + continue + } + ml := MongoLogin{ + User: user, + Protocol: sval[0], + Ip: sval[2], + Date: time.Unix(date, 0), + } + ind := Index{ + User: user, + Date: time.Unix(date, 0), + } + // inserisce il login su Mongodb + count++ + _, err = dbs.ll.Upsert(ind, ml) + if err != nil { + log.Printf("Insert error: %+v\n", err) + // se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente + if !strings.Contains(err.Error(), "E11000") { + errCount += 1 + continue } } - - for _, val := range bulk { - for j, bl := range val { - result, err := bl.Run() - if j == 0 { - if err != nil { - if !strings.Contains(err.Error(), "E11000") { - fmt.Printf("Err: %+v\n", err) - prod.err = true - counter <- Counterchan{ - tipo: "err", - val: len(prod.logins), - } - if opts.Test || opts.Debug { - log.Printf("ERR: %s - %+v\n", prod.user, prod.logins) - } - continue - } else { - counter <- Counterchan{ - tipo: "dup", - val: strings.Count(err.Error(), "E11000"), - } - if opts.Debug { - log.Printf("DUP: %s - %+v\n", prod.user, prod.logins) - } - } - } else { - if opts.Test { - log.Printf("OK: %s - %+v\n", prod.user, prod.logins) - log.Printf("BulkResult: %s - %+v\n", prod.user, result) - } - counter <- Counterchan{ - tipo: "ins", - val: len(allLogins), - } - } - } - } - } - + // cancella da Redis la riga di login inserita + retval, err := conn.Do("lrem", user, "-1", val) if opts.Debug { - fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount()) + log.Println("LREM retval: ", retval, user, val) + fmt.Println("LREM retval: ", retval, user, val) } - - if opts.Debug && len(prod.logins) > 10 { - fmt.Printf("LOGS: %+v\n", prod.logins) - } - - if !prod.err { - counter <- Counterchan{ - tipo: "rem", - val: len(prod.logins), - } - } - - // wg.Done() - remove <- prod + lastval = val } } diff --git a/counter.go b/counter.go deleted file mode 100644 index 1b616a0..0000000 --- a/counter.go +++ /dev/null @@ -1,180 +0,0 @@ -// counter -package main - -import ( - "sync" - "time" -) - -// Counterchan counter structure -type Counterchan struct { - tipo string - val int -} - -// Counter structure -type Counter struct { - mu sync.Mutex - user int - log int - insert int - rem int - err int - dup int - time time.Duration - wg int -} - -// NewCounter iniitialized Counter structure -func NewCounter() *Counter { - return &Counter{ - user: 0, - log: 0, - insert: 0, - err: 0, - rem: 0, - dup: 0, - time: 0, - wg: 0, - } -} - -// Run open channel to receive counter -func (c *Counter) Run() { - for { - tocount := <-counter - - switch tocount.tipo { - case "user": - c.addUser() - case "dup": - c.addDuplicate(tocount.val) - case "ins": - c.addInsert(tocount.val) - case "log": - c.addLog(tocount.val) - case "rem": - c.addRem(tocount.val) - case "wg": - if tocount.val > 0 { - c.addWG() - } else { - c.delWG() - } - case "err": - c.addErr(tocount.val) - - } - } -} - -// AddUser increment number of users managed -func (c *Counter) addUser() { - c.user++ -} - -// AddDuplicate increment number of duplicates log -func (c *Counter) addDuplicate(add int) { - c.dup += add -} - -// AddInsert increment number of inserted rows -func (c *Counter) addInsert(add int) { - c.insert += add -} - -// AddLog increment number of log's rows managed -func (c *Counter) addLog(add int) { - c.log += add -} - -//AddRem increment removed logs row -func (c *Counter) addRem(add int) { - c.rem += add -} - -// AddWG ... -func (c *Counter) addWG() { - c.wg++ -} - -// AddErr ... -func (c *Counter) addErr(add int) { - c.err += add -} - -// DelWG ... -func (c *Counter) delWG() { - c.wg-- -} - -// GetUser return total users -func (c *Counter) GetUser() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.user - return -} - -// GetDup return total duplicated logins -func (c *Counter) GetDup() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.dup - return -} - -// GetLog return total log's rows -func (c *Counter) GetLog() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.log - return -} - -// GetInsert return total inserted rows -func (c *Counter) GetInsert() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.insert - return -} - -// GetErr return total errors -func (c *Counter) GetErr() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.err - return -} - -// GetRem return total removed log's rows -func (c *Counter) GetRem() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.rem - return -} - -// GetWG ... -func (c *Counter) GetWG() (ret int) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.wg - return -} - -// GetTime ... -func (c *Counter) GetTime() (ret float64) { - c.mu.Lock() - defer c.mu.Unlock() - ret = c.time.Seconds() - return -} - -// SetTime ... -func (c *Counter) SetTime(t time.Duration) { - c.mu.Lock() - defer c.mu.Unlock() - c.time = t -} diff --git a/day_init.js b/day_init.js deleted file mode 100644 index 8bef682..0000000 --- a/day_init.js +++ /dev/null @@ -1,15 +0,0 @@ -// v1.2 -var db = connect("192.168.0.1:27017/lastlogin"); -var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} -var now = new Date(); -var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate()) -print("creating index: ll_",col.slice(2)); -db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}]) -db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"}) -var db = connect("192.168.0.1:27017/katamail"); -var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} -var now = new Date(); -var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate()) -print("creating index: ll_",col.slice(2)); -db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}]) -db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"}) \ No newline at end of file diff --git a/dbs.go b/dbs.go index 4e28cf5..235de8d 100644 --- a/dbs.go +++ b/dbs.go @@ -2,80 +2,55 @@ package main import ( - "fmt" + "github.com/garyburd/redigo/redis" + // "github.com/fzzy/radix/redis" + "gopkg.in/mgo.v2" "log" "os" - "strings" "time" - - "github.com/garyburd/redigo/redis" - - "github.com/globalsign/mgo" - // "gopkg.in/mgo.v2" ) var ( dbs = Dbs{ - RedisURI: "", - Database: "", + MongoUri: "mongodb://127.0.0.1:27018", + RedisUri: "127.0.0.1:6379", } ) -// Dbs structure type Dbs struct { - MongoURI string - Database string - RedisURI string + MongoUri string + RedisUri string rdb *redis.Pool //*redis.Client - mdb []*mgo.Session + mdb *mgo.Session + ll *mgo.Collection + // us *mgo.Collection } -// MongoLogin structure type MongoLogin struct { - ID string `json:"_id" bson:"_id" gorethink:"id"` - User string `json:"user" bson:"user" gorethink:"user"` - Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` - IP string `json:"ip" bson:"ip" gorethink:"ip"` - Date time.Time `json:"date" bson:"date" gorethink:"date"` - Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"` - Country string `json:"country" bson:"country" gorethink:"country"` + User string `json:"user"` + Protocol string `json:"protocol"` + Ip string `json:"ip"` + Date time.Time `json:"date"` } -// Ips structure -type Ips struct { - IP string `json:"ip"` -} - -// UserLogin structure type UserLogin struct { User string `json:"user"` Date time.Time `json:"date"` Lock bool `json:"lock"` } -// Index structure type Index struct { User string `json:"user"` Date time.Time `json:"date"` } -func (db *Dbs) isMongodb() bool { - if db.MongoURI != "" { - return true - } - - return false -} - func (db *Dbs) poolRedis() { dbs.rdb = &redis.Pool{ - MaxIdle: 128, - MaxActive: 1000, - Wait: true, - IdleTimeout: 1 * time.Second, + MaxIdle: 3, + IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", db.RedisURI) + c, err := redis.Dial("tcp", db.RedisUri) if err != nil { return nil, err } @@ -89,18 +64,24 @@ func (db *Dbs) poolRedis() { } -func (db *Dbs) connectMongo() { - - mongoList := strings.Split(db.MongoURI, "|") - - for m := range mongoList { - nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m])) - if err != nil { - log.Println("Mongodb connect Error: ", err.Error()) - os.Exit(-3) - } - nm.SetSocketTimeout(5 * time.Second) - nm.SetSyncTimeout(5 * time.Second) - db.mdb = append(db.mdb, nm) +/* +func (db *Dbs) connectRedis() { + var err error + db.rdb, err = redis.Dial("tcp", db.RedisUri) + if err != nil { + log.Println("Redis connect Error: ", err.Error()) + os.Exit(-1) } } +*/ + +func (db *Dbs) connectMongo() { + var err error + db.mdb, err = mgo.Dial(db.MongoUri) + if err != nil { + log.Println("Mongodb connect Error: ", err.Error()) + os.Exit(-3) + } + db.ll = db.mdb.DB("dovecot").C("lastlogin") + // db.us = db.mdb.DB("dovecot").C("userlogin") +} diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml deleted file mode 100644 index cf96102..0000000 --- a/docker-compose/docker-compose.yml +++ /dev/null @@ -1,18 +0,0 @@ -version: '2' -services: - mongodb: - image: mikif70/mongodb:3.6.9 - ports: - - 27017:27017 - container_name: ll_mongod - volumes: - - ./mongod:/data - depends_on: - - redis - redis: - image: "redis:alpine" - container_name: ll_redis - ports: - - 6379:6379 - volumes: - - ./redis:/data diff --git a/docker-compose/mongo.sh b/docker-compose/mongo.sh deleted file mode 100755 index e7fadef..0000000 --- a/docker-compose/mongo.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -if [ -z $1 ]; then - HOST=192.168.0.1:27017 -else - HOST=${1} -fi - -docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST} diff --git a/docker-compose/test.sh b/docker-compose/test.sh deleted file mode 100755 index 233ebbf..0000000 --- a/docker-compose/test.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash - -echo "Starting mikif70/llmongo:$(git -C .. describe --tags)" - -docker run \ - --rm \ - -v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \ - --name llmongo \ - mikif70/llmongo:$(git -C .. describe --tags) \ - -l /data/llmongo.log \ - -r 192.168.0.1:6379 \ - -m 192.168.0.1:27017 \ - -d lastlogin \ - -i test@10.39.253.206:8086 \ - -T 80s \ - $@ diff --git a/influxdb.go b/influxdb.go deleted file mode 100644 index be821d2..0000000 --- a/influxdb.go +++ /dev/null @@ -1,73 +0,0 @@ -// influxdb -package main - -import ( - "fmt" - "log" - "time" - - influxdb "github.com/influxdata/influxdb/client/v2" -) - -var ( - infdb string - infhost string -) - -func writeStats(start time.Time) { - if opts.Debug { - fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb) - fmt.Printf("host: %s -- addr: %s\n", infhost, infdb) - } - - c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ - Addr: fmt.Sprintf("http://%s", infdb), - Timeout: 2 * time.Second, - }) - if err != nil { - fmt.Printf("InfluxDB connect Error: %+v\n", err) - log.Printf("InfluxDB connect Error: %+v\n", err) - return - } - defer c.Close() - - bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{ - Database: "dovecot", - Precision: "s", - }) - if err != nil { - fmt.Printf("InfluxDB batch Error: %+v\n", err) - log.Printf("InfluxDB batch Error: %+v\n", err) - return - } - - tags := map[string]string{"server": infhost, "domain": dbs.Database} - fields := map[string]interface{}{ - "user": count.GetUser(), - "log": count.GetLog(), - "err": count.GetErr(), - "rem": count.GetRem(), - "dup": count.GetDup(), - "stop": count.GetTime(), - } - pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start) - if err != nil { - fmt.Printf("InfluxDB point Error: %+v\n", err) - log.Printf("InfluxDB point Error: %+v\n", err) - return - } - - if opts.Debug { - fmt.Printf("InfluxDB pt: %+v\n", pt) - } - - bp.AddPoint(pt) - - // Write the batch - err = c.Write(bp) - if err != nil { - fmt.Printf("InfluxDB write Error: %+v\n", err) - log.Printf("InfluxDB write Error: %+v\n", err) - return - } -} diff --git a/logrotate.cfg b/logrotate.cfg deleted file mode 100644 index 4f500de..0000000 --- a/logrotate.cfg +++ /dev/null @@ -1,28 +0,0 @@ -/opt/mongodb/mongod/log/*.log -{ - daily - rotate 10 - maxsize 10M - compress - delaycompress - missingok - notifempty - sharedscripts - copytruncate - postrotate - /usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true - endscript -} - -/opt/llmongo/log/*.log -{ - daily - maxsize 5M - rotate 8 - compress - delaycompress - missingok - notifempty - sharedscripts - copytruncate -} diff --git a/main.go b/main.go index 304e716..3346835 100644 --- a/main.go +++ b/main.go @@ -6,67 +6,78 @@ import ( "fmt" "log" "os" - "regexp" - "sync" + "path" + "path/filepath" "time" - - _ "github.com/nats-io/go-nats" ) +type Options struct { + RedisTTL time.Duration + CurrentPath string + Exe string + LogFile string + Timeout bool + Debug bool + Version bool +} + const ( - _Version = "v5.0.0b6" - _Producer = 0 - _Consumer = 1 - _Remover = 2 + _VERSION = "v2.0.0" ) var ( - loop bool + opts = Options{ + RedisTTL: time.Hour * 11688, // 16 mesi + LogFile: "log/llmongo.log", + } - done chan bool - consume chan loginsList - remove chan loginsList + loop = true + ttl = time.Second * 55 + done = make(chan bool) + msgs = make(chan string) - counter chan Counterchan - - wg sync.WaitGroup - - status int - - count *Counter + count = 0 + errCount = 0 ) +func usage() { + fmt.Println("Usage: llmongo -m -r -t -l -T -D -v\n") + os.Exit(0) +} + +func init() { + var err error + opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) + if err != nil { + log.Fatal(err) + } + + opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) + pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid") + opts.Exe = path.Base(os.Args[0]) + + flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb") + flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis") + flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL") + flag.BoolVar(&opts.Version, "v", false, "Version") + flag.BoolVar(&opts.Timeout, "T", false, "Timeout") + flag.BoolVar(&opts.Debug, "D", false, "Debug") +} + +func stopLoop() { + loop = false +} + func main() { flag.Usage = usage flag.Parse() if opts.Version { - fmt.Println(os.Args[0], _Version) + fmt.Println(os.Args[0], _VERSION) os.Exit(0) } - if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" { - flag.Usage() - os.Exit(-1) - } - - if opts.Influxdb != "" { - var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`) - // re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`) - if re.MatchString(opts.Influxdb) { - match := re.FindStringSubmatch(opts.Influxdb) - if opts.Debug { - fmt.Printf("Influxdb match: %+v\n", match) - } - infhost = match[1] - infdb = match[2] + ":" + match[3] - } else { - opts.Influxdb = "" - } - } - - setTerm() - fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) @@ -76,58 +87,27 @@ func main() { log.SetOutput(fs) - // pid.PIDFile = opts.Pidfile - // pid.Write(true) - // defer pid.Remove() + pid.Write(true) + defer pid.Remove() start := time.Now() - fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs) - log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs) - - opts.Month = start.Format("0601") + fmt.Printf("Start: %+v\n", opts) + log.Printf("Start: %+v\n", opts) dbs.poolRedis() defer dbs.rdb.Close() dbs.connectMongo() - for k := range dbs.mdb { - defer dbs.mdb[k].Close() + defer dbs.mdb.Close() + + if opts.Timeout { + time.AfterFunc(ttl, stopLoop) } - if opts.Timeout > 0 { - time.AfterFunc(opts.Timeout, exit) - } - - count = NewCounter() - - consume = make(chan loginsList, opts.Queue) - remove = make(chan loginsList, opts.Queue) - loop = true - done = make(chan bool) - counter = make(chan Counterchan) - - go count.Run() go producer() - for i := 0; i < opts.Queue; i++ { - for j := 0; j < len(dbs.mdb); j++ { - go consumer() - } - go remover() - } - + go consumer() <-done - fmt.Printf("Done\n") - close(done) - fmt.Println("Waiting WG") - wg.Wait() - - count.SetTime(time.Since(start)) - - fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) - log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) - - if opts.Influxdb != "" { - writeStats(start) - } + fmt.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount) + log.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount) } diff --git a/main_test.go b/main_test.go deleted file mode 100644 index 4bcb79e..0000000 --- a/main_test.go +++ /dev/null @@ -1,15 +0,0 @@ -// Testmain.go -package main - -import ( - "flag" - "fmt" - "testing" -) - -func TestOptions(t *testing.T) { - flag.Usage = usage - flag.Parse() - - fmt.Printf("Config: %+v\n", opts) -} diff --git a/mongo_scripts.txt b/mongo_scripts.txt deleted file mode 100644 index 6dc7834..0000000 --- a/mongo_scripts.txt +++ /dev/null @@ -1,34 +0,0 @@ - -################ -var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} - -for (var i=1; i<18; i++) { - var col = "ll_1806"+dd(i); - print(col); - db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true}); -} - -########### -var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} - -for (var m=1; m<=12; m++) { - for (var d=1; d<=31; d++) { - var col = "ll_15"+dd(m)+dd(d); - if (db.getCollection(col).exists != null) { - print(col); - print(db.getCollection(col).totalSize()); - } - } -} - - -#### -var ar = db.getCollectionNames() -var len = ar.length - -for (var i=0; i - -d - -r - -t - -l - -T - -i - -C - -P [port] ## used by consul to check services ## - -q - -R - -v - -D - -DD `) - fmt.Println() - os.Exit(0) -} - -func init() { - var err error - opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0])) - if err != nil { - log.Fatal(err) - } - - opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile) - opts.Exe = path.Base(os.Args[0]) - - flag.StringVar(&opts.Influxdb, "i", "", "influxdb server") - flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb") - flag.StringVar(&dbs.Database, "d", "", "Mongodb Database") - flag.StringVar(&dbs.RedisURI, "r", "", "Redis") - flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") - flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client") - flag.IntVar(&opts.Port, "P", 10500, "service check port") - flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL") - flag.BoolVar(&opts.Version, "v", false, "Version") - flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout") - flag.BoolVar(&opts.Debug, "D", false, "Debug") - flag.BoolVar(&opts.Test, "DD", false, "Test") - flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error") - flag.IntVar(&opts.Queue, "q", 2, "parallel consumer") - flag.Float64Var(&opts.Retention, "R", 15552000, "retention") -} diff --git a/pid.go b/pid.go index da94004..2533af2 100644 --- a/pid.go +++ b/pid.go @@ -1,7 +1,6 @@ // pid package main -/* import ( "bytes" "fmt" @@ -17,7 +16,6 @@ var ( pid = PID{} ) -// PID structure type PID struct { PID string PIDFile string @@ -44,8 +42,11 @@ func (p *PID) readCmd() bool { return false } cmd := bytes.Trim(bcmd, "\x00") - if !strings.Contains(string(cmd), opts.Exe) { + if strings.Contains(string(cmd), opts.Exe) { + return true + } else { fmt.Printf("PID %s used by %s\n", pid, cmd) + return true } return true } @@ -77,11 +78,10 @@ func (p *PID) Write(l bool) { fpid.Close() } -// Remove cancella il PIDFile +// Cancella il PIDFile func (p *PID) Remove() { err := os.Remove(p.PIDFile) if err != nil { fmt.Println("RM file error: ", err.Error()) } } -*/ diff --git a/producer.go b/producer.go index 1d39be0..4185a85 100644 --- a/producer.go +++ b/producer.go @@ -3,90 +3,30 @@ package main import ( "fmt" - "log" - // "strconv" - "time" - "github.com/garyburd/redigo/redis" + "log" ) -type produced struct { - user string - logins []string -} - -type loginsList struct { - user string - logins []string - err bool -} - func producer() { conn := dbs.rdb.Get() defer conn.Close() for loop { - - wg.Wait() - status = _Producer - - start := time.Now() // estrae un userid dalla lista degli utenti che hanno fatto login user, err := redis.String(conn.Do("spop", "llindex")) + if opts.Debug { + log.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err) + fmt.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err) + } // se non ci sono piu' userid esce if err != nil { if opts.Debug { - fmt.Printf("LLINDEX empty: %v\n", err) + log.Printf("LLINDEX empty: %v\n\r", err) + fmt.Printf("LLINDEX empty: %v\n\r", err) } - log.Printf("LLINDEX empty: %v\n", err) break } - - // estrae tutti i logins dell'utente "user" - logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1")) - if err != nil { - if opts.Debug { - fmt.Printf("LRANGE: %+v - %+v\n", err, logs) - } - log.Printf("LRANGE: %+v - %+v\n", err, logs) - } - - if opts.Debug { - fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount()) - } - - if len(logs) > 0 { - counter <- Counterchan{ - tipo: "user", - val: 1, - } - - if opts.Debug { - fmt.Printf("PROD: %+v\n", time.Since(start)) - } - - if opts.Test { - log.Printf("PROD: %s - %d\n", user, len(logs)) - } - - wg.Add(1) - counter <- Counterchan{ - tipo: "wg", - val: 1, - } - - counter <- Counterchan{ - tipo: "log", - val: len(logs), - } - - consume <- loginsList{ - user: user, - logins: logs, - err: false, - } - } + msgs <- user } - done <- true } diff --git a/remover.go b/remover.go deleted file mode 100644 index 9dc2679..0000000 --- a/remover.go +++ /dev/null @@ -1,51 +0,0 @@ -// finalizer -package main - -import ( - "fmt" - - "time" -) - -func remover() { - - var conn = dbs.rdb.Get() - defer conn.Close() - - for { - rem := <-remove - - status = _Remover - - start := time.Now() - if !rem.err { - for i := range rem.logins { - // cancella da Redis la riga di login inserita partendo da 1 - conn.Send("lrem", rem.user, "1", rem.logins[i]) - } - } - - // se ci sono errori o non e' vuota la lista di logins reinserisce lo user - if rem.err { - if opts.Debug { - fmt.Printf("SADD: %s\n", rem.user) - } - conn.Send("sadd", "llindex", rem.user) - if count.GetErr() >= opts.MaxError { - exit() - } - } - conn.Send("expire", rem.user, opts.RedisTTL.Seconds()) - conn.Flush() - - if opts.Debug { - fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start)) - } - - counter <- Counterchan{ - tipo: "wg", - val: -1, - } - wg.Done() - } -} diff --git a/rethinkdb.go b/rethinkdb.go deleted file mode 100644 index 13c715d..0000000 --- a/rethinkdb.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -/* - -import ( - "fmt" - "log" - // "strings" - - rt "gopkg.in/dancannon/gorethink.v2" -) - -type Rethink struct { - rtdb *rt.Session -} - -func NewRethinkDB(cluster []string) (*Rethink, error) { - var ( - err error - session *rt.Session - ) - - if len(cluster) > 1 { - session, err = rt.Connect(rt.ConnectOpts{ - Addresses: cluster, - InitialCap: 10, - MaxOpen: 10, - }) - } else { - session, err = rt.Connect(rt.ConnectOpts{ - Address: cluster[0], - InitialCap: 10, - MaxOpen: 10, - }) - } - - if err != nil { - log.Printf("RethinkDB ERROR: %+v\n", err.Error()) - return nil, err - } - - return &Rethink{ - rtdb: session, - }, nil -} - -func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) { - resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb) - if opts.Debug { - fmt.Printf("RTinsert: %+v\n", resp) - } - if err != nil { - return resp, err - } - - return resp, nil -} - -func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) { - resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb) - //resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb) - if opts.Debug { - fmt.Printf("RTMulti: %+v\n", resp) - } - if err != nil { - return resp, err - } - - return resp, nil -} - -func (r *Rethink) Close() { - r.rtdb.Close() -} -*/ diff --git a/sigterm.go b/sigterm.go deleted file mode 100644 index 79fc906..0000000 --- a/sigterm.go +++ /dev/null @@ -1,39 +0,0 @@ -// sigterm -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - "syscall" - "time" -) - -func kill() { - log.Printf("KILL %d\n", status) - fmt.Printf("KILL %d\n", status) - wg.Done() - counter <- Counterchan{ - tipo: "wg", - val: -1, - } - done <- true -} - -func exit() { - log.Printf("EXIT %d\n", status) - fmt.Printf("EXIT %d\n", status) - loop = false - time.AfterFunc(time.Second*20, kill) -} - -func setTerm() { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - signal.Notify(c, syscall.SIGTERM) - go func() { - <-c - exit() - }() -} diff --git a/sys/llmongo.cron b/sys/llmongo.cron index 0066f34..d83b17f 100644 --- a/sys/llmongo.cron +++ b/sys/llmongo.cron @@ -24,9 +24,4 @@ # # m h dom mon dow command -### lastlogin import -*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1 -*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1 -05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1 -### lastlogin create indexes -00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1 +*/10 * * * * /opt/llmongo/start.sh > /dev/null diff --git a/xymon.go b/xymon.go deleted file mode 100644 index 24c450f..0000000 --- a/xymon.go +++ /dev/null @@ -1,63 +0,0 @@ -// xymon.go -package main - -/* -import ( - "fmt" - "log" - "net" - "strings" - "time" -) - -func sendStatus() { - - var msg string - - host := strings.Replace(opts.Hostname, ".", ",", -1) - - if count.GetErr() > 0 { - msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "yellow", time.Now(), "errors", count.GetErr()) - } else { - msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "green", time.Now(), "logins", count.GetLog()) - } - - if opts.Debug { - fmt.Println("Sending: ", msg) - } - - xymonSend([]byte(msg)) -} - -func xymonSend(msg []byte) error { - - var server string - - if strings.Contains(opts.Xymon, ":") { - server = opts.Xymon - } else { - server = opts.Xymon + ":1984" - } - - cl, err := net.Dial("tcp", server) - if err != nil { - fmt.Printf("xymon connect error: ", err) - log.Printf("xymon connect error: ", err) - return err - } - defer cl.Close() - - // fmt.Printf("xymon: %s - localhost: %s\n", cl.RemoteAddr(), cl.LocalAddr()) - - _, err = cl.Write(msg) - if err != nil { - fmt.Printf("xymon write error: ", err) - log.Printf("xymon write error: ", err) - return err - } - - // fmt.Printf("written: %d\n", tot) - - return nil -} -*/