Compare commits

..

No commits in common. "master" and "v2.0.0" have entirely different histories.

26 changed files with 180 additions and 1241 deletions

View file

@ -1,11 +0,0 @@
#FROM busybox:latest
FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
ARG VER
ENV VER ${VER:-0.0.0}
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]

View file

@ -1,3 +0,0 @@
#!/bin/bash
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .

View file

@ -1,16 +0,0 @@
#!/bin/bash
docker run \
--rm \
-v /opt/llmongo/log:/data \
--name llmongo_tiscali \
--log-opt max-size=2m \
--log-opt max-file=5 \
mikif70/llmongo:4.2.1 \
-l /data/llmongo.log \
-r redis-ll.mail.tiscali.sys:6379 \
-m 10.39.80.189:27017 \
-d lastlogin \
-i enginedb1@10.39.109.107:8086 \
-T 60s \
$@

View file

@ -1,80 +0,0 @@
#!/bin/bash
MONGODB="secondary.mongod.service.mail:27017"
DATAPATH="/mnt/mongobck/NEW_EXPORT"
NAME="mtools"
DB="lastlogin"
opt()
{
case "$1" in
-d)
shift
DATE=$1
shift
opt $*
;;
-n)
shift
NAME=$1
shift
opt $*
;;
-db)
shift
DB=$1
shift
opt $*
;;
-pwd)
shift
DATAPATH=${PWD}
shift
opt $*
;;
esac
}
echo "Running... $(date)"
if [ -n "$1" ]
then
opt $*
fi
echo "Opts: DB-$DB DATE-$DATE"
if [ -z "${DATE}" ]
then
DATE=$(date -d "yesterday" +%Y%m%d)
fi
YDAY=$(date -d "${DATE}" +%Y-%m-%d)
TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d)
QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0
0:00:00Z\")}}"
FNAME=$(date -d "${DATE}" +%y%m%d)
CNAME=$(date -d "${DATE}" +%y%m)
echo "Query: ${QUERY}"
if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then
mkdir -p ${DATAPATH}/${DB}/${CNAME}
fi
/usr/bin/docker run \
--rm \
-h mtools \
--name ${NAME} \
-v ${DATAPATH}:/data \
mikif70/mongotools:3.4.5 mongoexport \
--host "${MONGODB}" \
--db $DB \
--collection "ll_${FNAME}" \
--type "csv" \
--fields "_id,user,protocol,ip,date,insert,country" \
--query "${QUERY}" \
--readPreference "secondary" \
--out "/data/${DB}/${CNAME}/ll_${FNAME}.csv"

View file

@ -1,5 +0,0 @@
#!/bin/bash
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
mv lastlogin_mongodb-$(git describe --tags) Docker/

View file

@ -1,44 +0,0 @@
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
// Consul client structure
type Consul struct {
RedisTTL time.Duration
Timeout time.Duration
MaxError int
Influxdb string
Month string
Retention float64
MongoURI string
Database string
RedisURI string
Client *api.Client
}
// NewClient New consul client
func NewClient() *Consul {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalf("Consul error: %+v\n", err)
if opts.Debug {
fmt.Printf("Consul error: %+v\n", err)
}
}
consul := &Consul{
Client: client,
RedisTTL: opts.RedisTTL,
Timeout: opts.Timeout,
MaxError: opts.MaxError,
Influxdb: opts.Influxdb,
}
return consul
}

View file

@ -2,187 +2,90 @@
package main package main
import ( import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"fmt" "fmt"
"github.com/garyburd/redigo/redis"
"log" "log"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
) )
type consumed struct {
user string
error bool
logins []string
}
func hash256(val []byte) string {
h := sha256.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash160(val []byte) string {
h := sha1.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash128(val []byte) string {
h := md5.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
// protocol:timestamp:ip:country
func consumer() { func consumer() {
var date int64
var lastval string
var conn = dbs.rdb.Get()
defer conn.Close()
for { for {
user := <-msgs
prod := <-consume // Estrae l'ultimo login dell'utente 'user'
val, err := redis.String(conn.Do("LINDEX", user, "-1"))
start := time.Now() if err != nil {
if opts.Debug {
status = _Consumer log.Printf("LINDEX error: %+v - %s\n\r", err, val)
fmt.Printf("LINDEX error: %+v - %s\n\r", err, val)
var bulk = make(map[string][]*mgo.Bulk) }
var allLogins = make(map[string]MongoLogin) // se ha trovato user e righe di login
if lastval != "" {
for i := range prod.logins { // reinserisce l'ultimo login e imposta il ttl su Redis
login := prod.logins[i] retval, _ := conn.Do("lpush", user, lastval)
// se la riga di login e' vuota ttl, _ := conn.Do("expire", user, opts.RedisTTL.Seconds())
if login == "" { if opts.Debug {
log.Println("Login empty: ", prod.user) log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
fmt.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
}
}
// break
continue continue
} }
sval := strings.Split(login, ":") // se la riga di login e' vuota
if val == "" {
log.Println("Login empty: ", user)
retval, _ := conn.Do("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
sval := strings.Split(val, ":")
// se il formato della riga di login non e' corretto // se il formato della riga di login non e' corretto
if sval[1] == "" { if sval[1] == "" {
log.Println("Login format error: ", login, prod.user) log.Println("Login format error: ", val, user)
retval, _ := conn.Do("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue continue
} }
// se il timestamp della riga di login non e' corretto // se il timestamp della riga di login non e' corretto
date, err := strconv.ParseInt(sval[1], 10, 64) date, err = strconv.ParseInt(sval[1], 10, 64)
if err != nil { if err != nil {
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login) log.Printf("Date Error: %+v - %s\n", err, user)
continue continue
} }
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
if opts.Debug {
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
}
if !opts.Test {
continue
}
}
// verifica se esiste la country
if len(sval) <= 3 {
sval = append(sval, "NONE")
}
// genera l' _ID con user + timestamp + ip
mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
ml := MongoLogin{ ml := MongoLogin{
ID: mlID, User: user,
User: prod.user,
Protocol: sval[0], Protocol: sval[0],
IP: sval[2], Ip: sval[2],
Date: time.Unix(date, 0), Date: time.Unix(date, 0),
Insert: time.Now(),
Country: sval[3],
} }
ind := Index{
allLogins[mlID] = ml User: user,
Date: time.Unix(date, 0),
} }
// inserisce il login su Mongodb
for _, val := range allLogins { count++
dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay _, err = dbs.ll.Upsert(ind, ml)
if _, ok := bulk[dt]; !ok {
for j := range dbs.mdb {
b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk()
b.Unordered()
bulk[dt] = append(bulk[dt], b)
}
}
for _, bl := range bulk[dt] {
bl.Insert(val)
}
}
for _, val := range bulk {
for j, bl := range val {
result, err := bl.Run()
if j == 0 {
if err != nil { if err != nil {
log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
if !strings.Contains(err.Error(), "E11000") { if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err) errCount += 1
prod.err = true
counter <- Counterchan{
tipo: "err",
val: len(prod.logins),
}
if opts.Test || opts.Debug {
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
}
continue continue
} else {
counter <- Counterchan{
tipo: "dup",
val: strings.Count(err.Error(), "E11000"),
} }
}
// cancella da Redis la riga di login inserita
retval, err := conn.Do("lrem", user, "-1", val)
if opts.Debug { if opts.Debug {
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins) log.Println("LREM retval: ", retval, user, val)
fmt.Println("LREM retval: ", retval, user, val)
} }
} lastval = val
} else {
if opts.Test {
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
}
counter <- Counterchan{
tipo: "ins",
val: len(allLogins),
}
}
}
}
}
if opts.Debug {
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}
if opts.Debug && len(prod.logins) > 10 {
fmt.Printf("LOGS: %+v\n", prod.logins)
}
if !prod.err {
counter <- Counterchan{
tipo: "rem",
val: len(prod.logins),
}
}
// wg.Done()
remove <- prod
} }
} }

View file

@ -1,180 +0,0 @@
// counter
package main
import (
"sync"
"time"
)
// Counterchan counter structure
type Counterchan struct {
tipo string
val int
}
// Counter structure
type Counter struct {
mu sync.Mutex
user int
log int
insert int
rem int
err int
dup int
time time.Duration
wg int
}
// NewCounter iniitialized Counter structure
func NewCounter() *Counter {
return &Counter{
user: 0,
log: 0,
insert: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: 0,
}
}
// Run open channel to receive counter
func (c *Counter) Run() {
for {
tocount := <-counter
switch tocount.tipo {
case "user":
c.addUser()
case "dup":
c.addDuplicate(tocount.val)
case "ins":
c.addInsert(tocount.val)
case "log":
c.addLog(tocount.val)
case "rem":
c.addRem(tocount.val)
case "wg":
if tocount.val > 0 {
c.addWG()
} else {
c.delWG()
}
case "err":
c.addErr(tocount.val)
}
}
}
// AddUser increment number of users managed
func (c *Counter) addUser() {
c.user++
}
// AddDuplicate increment number of duplicates log
func (c *Counter) addDuplicate(add int) {
c.dup += add
}
// AddInsert increment number of inserted rows
func (c *Counter) addInsert(add int) {
c.insert += add
}
// AddLog increment number of log's rows managed
func (c *Counter) addLog(add int) {
c.log += add
}
//AddRem increment removed logs row
func (c *Counter) addRem(add int) {
c.rem += add
}
// AddWG ...
func (c *Counter) addWG() {
c.wg++
}
// AddErr ...
func (c *Counter) addErr(add int) {
c.err += add
}
// DelWG ...
func (c *Counter) delWG() {
c.wg--
}
// GetUser return total users
func (c *Counter) GetUser() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.user
return
}
// GetDup return total duplicated logins
func (c *Counter) GetDup() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.dup
return
}
// GetLog return total log's rows
func (c *Counter) GetLog() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.log
return
}
// GetInsert return total inserted rows
func (c *Counter) GetInsert() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.insert
return
}
// GetErr return total errors
func (c *Counter) GetErr() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.err
return
}
// GetRem return total removed log's rows
func (c *Counter) GetRem() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.rem
return
}
// GetWG ...
func (c *Counter) GetWG() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.wg
return
}
// GetTime ...
func (c *Counter) GetTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.time.Seconds()
return
}
// SetTime ...
func (c *Counter) SetTime(t time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.time = t
}

View file

@ -1,15 +0,0 @@
// v1.2
var db = connect("192.168.0.1:27017/lastlogin");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
var db = connect("192.168.0.1:27017/katamail");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})

83
dbs.go
View file

@ -2,80 +2,55 @@
package main package main
import ( import (
"fmt" "github.com/garyburd/redigo/redis"
// "github.com/fzzy/radix/redis"
"gopkg.in/mgo.v2"
"log" "log"
"os" "os"
"strings"
"time" "time"
"github.com/garyburd/redigo/redis"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
) )
var ( var (
dbs = Dbs{ dbs = Dbs{
RedisURI: "", MongoUri: "mongodb://127.0.0.1:27018",
Database: "", RedisUri: "127.0.0.1:6379",
} }
) )
// Dbs structure
type Dbs struct { type Dbs struct {
MongoURI string MongoUri string
Database string RedisUri string
RedisURI string
rdb *redis.Pool //*redis.Client rdb *redis.Pool //*redis.Client
mdb []*mgo.Session mdb *mgo.Session
ll *mgo.Collection
// us *mgo.Collection
} }
// MongoLogin structure
type MongoLogin struct { type MongoLogin struct {
ID string `json:"_id" bson:"_id" gorethink:"id"` User string `json:"user"`
User string `json:"user" bson:"user" gorethink:"user"` Protocol string `json:"protocol"`
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"` Ip string `json:"ip"`
IP string `json:"ip" bson:"ip" gorethink:"ip"` Date time.Time `json:"date"`
Date time.Time `json:"date" bson:"date" gorethink:"date"`
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
Country string `json:"country" bson:"country" gorethink:"country"`
} }
// Ips structure
type Ips struct {
IP string `json:"ip"`
}
// UserLogin structure
type UserLogin struct { type UserLogin struct {
User string `json:"user"` User string `json:"user"`
Date time.Time `json:"date"` Date time.Time `json:"date"`
Lock bool `json:"lock"` Lock bool `json:"lock"`
} }
// Index structure
type Index struct { type Index struct {
User string `json:"user"` User string `json:"user"`
Date time.Time `json:"date"` Date time.Time `json:"date"`
} }
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
}
return false
}
func (db *Dbs) poolRedis() { func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{ dbs.rdb = &redis.Pool{
MaxIdle: 128, MaxIdle: 3,
MaxActive: 1000, IdleTimeout: 240 * time.Second,
Wait: true,
IdleTimeout: 1 * time.Second,
Dial: func() (redis.Conn, error) { Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", db.RedisURI) c, err := redis.Dial("tcp", db.RedisUri)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -89,18 +64,24 @@ func (db *Dbs) poolRedis() {
} }
/*
func (db *Dbs) connectRedis() {
var err error
db.rdb, err = redis.Dial("tcp", db.RedisUri)
if err != nil {
log.Println("Redis connect Error: ", err.Error())
os.Exit(-1)
}
}
*/
func (db *Dbs) connectMongo() { func (db *Dbs) connectMongo() {
var err error
mongoList := strings.Split(db.MongoURI, "|") db.mdb, err = mgo.Dial(db.MongoUri)
for m := range mongoList {
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m]))
if err != nil { if err != nil {
log.Println("Mongodb connect Error: ", err.Error()) log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3) os.Exit(-3)
} }
nm.SetSocketTimeout(5 * time.Second) db.ll = db.mdb.DB("dovecot").C("lastlogin")
nm.SetSyncTimeout(5 * time.Second) // db.us = db.mdb.DB("dovecot").C("userlogin")
db.mdb = append(db.mdb, nm)
}
} }

View file

@ -1,18 +0,0 @@
version: '2'
services:
mongodb:
image: mikif70/mongodb:3.6.9
ports:
- 27017:27017
container_name: ll_mongod
volumes:
- ./mongod:/data
depends_on:
- redis
redis:
image: "redis:alpine"
container_name: ll_redis
ports:
- 6379:6379
volumes:
- ./redis:/data

View file

@ -1,9 +0,0 @@
#!/bin/bash
if [ -z $1 ]; then
HOST=192.168.0.1:27017
else
HOST=${1}
fi
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}

View file

@ -1,16 +0,0 @@
#!/bin/bash
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
docker run \
--rm \
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
--name llmongo \
mikif70/llmongo:$(git -C .. describe --tags) \
-l /data/llmongo.log \
-r 192.168.0.1:6379 \
-m 192.168.0.1:27017 \
-d lastlogin \
-i test@10.39.253.206:8086 \
-T 80s \
$@

View file

@ -1,73 +0,0 @@
// influxdb
package main
import (
"fmt"
"log"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
)
var (
infdb string
infhost string
)
func writeStats(start time.Time) {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: fmt.Sprintf("http://%s", infdb),
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("InfluxDB connect Error: %+v\n", err)
log.Printf("InfluxDB connect Error: %+v\n", err)
return
}
defer c.Close()
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: "dovecot",
Precision: "s",
})
if err != nil {
fmt.Printf("InfluxDB batch Error: %+v\n", err)
log.Printf("InfluxDB batch Error: %+v\n", err)
return
}
tags := map[string]string{"server": infhost, "domain": dbs.Database}
fields := map[string]interface{}{
"user": count.GetUser(),
"log": count.GetLog(),
"err": count.GetErr(),
"rem": count.GetRem(),
"dup": count.GetDup(),
"stop": count.GetTime(),
}
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
if err != nil {
fmt.Printf("InfluxDB point Error: %+v\n", err)
log.Printf("InfluxDB point Error: %+v\n", err)
return
}
if opts.Debug {
fmt.Printf("InfluxDB pt: %+v\n", pt)
}
bp.AddPoint(pt)
// Write the batch
err = c.Write(bp)
if err != nil {
fmt.Printf("InfluxDB write Error: %+v\n", err)
log.Printf("InfluxDB write Error: %+v\n", err)
return
}
}

View file

@ -1,28 +0,0 @@
/opt/mongodb/mongod/log/*.log
{
daily
rotate 10
maxsize 10M
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
postrotate
/usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true
endscript
}
/opt/llmongo/log/*.log
{
daily
maxsize 5M
rotate 8
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
}

146
main.go
View file

@ -6,67 +6,78 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"regexp" "path"
"sync" "path/filepath"
"time" "time"
_ "github.com/nats-io/go-nats"
) )
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
Timeout bool
Debug bool
Version bool
}
const ( const (
_Version = "v5.0.0b6" _VERSION = "v2.0.0"
_Producer = 0
_Consumer = 1
_Remover = 2
) )
var ( var (
loop bool opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
done chan bool loop = true
consume chan loginsList ttl = time.Second * 55
remove chan loginsList done = make(chan bool)
msgs = make(chan string)
counter chan Counterchan count = 0
errCount = 0
wg sync.WaitGroup
status int
count *Counter
) )
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -T -D -v\n")
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.BoolVar(&opts.Timeout, "T", false, "Timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
}
func stopLoop() {
loop = false
}
func main() { func main() {
flag.Usage = usage flag.Usage = usage
flag.Parse() flag.Parse()
if opts.Version { if opts.Version {
fmt.Println(os.Args[0], _Version) fmt.Println(os.Args[0], _VERSION)
os.Exit(0) os.Exit(0)
} }
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
flag.Usage()
os.Exit(-1)
}
if opts.Influxdb != "" {
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
if opts.Debug {
fmt.Printf("Influxdb match: %+v\n", match)
}
infhost = match[1]
infdb = match[2] + ":" + match[3]
} else {
opts.Influxdb = ""
}
}
setTerm()
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil { if err != nil {
fmt.Println("Log file error: ", err.Error()) fmt.Println("Log file error: ", err.Error())
@ -76,58 +87,27 @@ func main() {
log.SetOutput(fs) log.SetOutput(fs)
// pid.PIDFile = opts.Pidfile pid.Write(true)
// pid.Write(true) defer pid.Remove()
// defer pid.Remove()
start := time.Now() start := time.Now()
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs) fmt.Printf("Start: %+v\n", opts)
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs) log.Printf("Start: %+v\n", opts)
opts.Month = start.Format("0601")
dbs.poolRedis() dbs.poolRedis()
defer dbs.rdb.Close() defer dbs.rdb.Close()
dbs.connectMongo() dbs.connectMongo()
for k := range dbs.mdb { defer dbs.mdb.Close()
defer dbs.mdb[k].Close()
if opts.Timeout {
time.AfterFunc(ttl, stopLoop)
} }
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
}
count = NewCounter()
consume = make(chan loginsList, opts.Queue)
remove = make(chan loginsList, opts.Queue)
loop = true
done = make(chan bool)
counter = make(chan Counterchan)
go count.Run()
go producer() go producer()
for i := 0; i < opts.Queue; i++ {
for j := 0; j < len(dbs.mdb); j++ {
go consumer() go consumer()
}
go remover()
}
<-done <-done
fmt.Printf("Done\n")
close(done)
fmt.Println("Waiting WG") fmt.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount)
wg.Wait() log.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount)
count.SetTime(time.Since(start))
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
if opts.Influxdb != "" {
writeStats(start)
}
} }

View file

@ -1,15 +0,0 @@
// Testmain.go
package main
import (
"flag"
"fmt"
"testing"
)
func TestOptions(t *testing.T) {
flag.Usage = usage
flag.Parse()
fmt.Printf("Config: %+v\n", opts)
}

View file

@ -1,34 +0,0 @@
################
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var i=1; i<18; i++) {
var col = "ll_1806"+dd(i);
print(col);
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
}
###########
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var m=1; m<=12; m++) {
for (var d=1; d<=31; d++) {
var col = "ll_15"+dd(m)+dd(d);
if (db.getCollection(col).exists != null) {
print(col);
print(db.getCollection(col).totalSize());
}
}
}
####
var ar = db.getCollectionNames()
var len = ar.length
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
####
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})

View file

@ -1,85 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"path"
"path/filepath"
"time"
)
// Options structure
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
ConfigFile string
Timeout time.Duration
Debug bool
Test bool
Version bool
MaxError int
Influxdb string
Month string
Queue int
Retention float64
Consul string
Port int
}
var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
)
func usage() {
fmt.Println(`Usage: llmongo
-m <mongodb [ip:port,ip:port]>
-d <mongodb database [dbname]>
-r <redisdb [ip:port]>
-t <redis keys ttl>
-l <logfile>
-T <running timeout>
-i <influxdb [localname@ip:port]>
-C <consul [ip:port]>
-P <check port> [port] ## used by consul to check services ##
-q <parallel consumer>
-R <retention>
-v <version>
-D <debug>
-DD <Test>`)
fmt.Println()
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
flag.IntVar(&opts.Port, "P", 10500, "service check port")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
}

10
pid.go
View file

@ -1,7 +1,6 @@
// pid // pid
package main package main
/*
import ( import (
"bytes" "bytes"
"fmt" "fmt"
@ -17,7 +16,6 @@ var (
pid = PID{} pid = PID{}
) )
// PID structure
type PID struct { type PID struct {
PID string PID string
PIDFile string PIDFile string
@ -44,8 +42,11 @@ func (p *PID) readCmd() bool {
return false return false
} }
cmd := bytes.Trim(bcmd, "\x00") cmd := bytes.Trim(bcmd, "\x00")
if !strings.Contains(string(cmd), opts.Exe) { if strings.Contains(string(cmd), opts.Exe) {
return true
} else {
fmt.Printf("PID %s used by %s\n", pid, cmd) fmt.Printf("PID %s used by %s\n", pid, cmd)
return true
} }
return true return true
} }
@ -77,11 +78,10 @@ func (p *PID) Write(l bool) {
fpid.Close() fpid.Close()
} }
// Remove cancella il PIDFile // Cancella il PIDFile
func (p *PID) Remove() { func (p *PID) Remove() {
err := os.Remove(p.PIDFile) err := os.Remove(p.PIDFile)
if err != nil { if err != nil {
fmt.Println("RM file error: ", err.Error()) fmt.Println("RM file error: ", err.Error())
} }
} }
*/

View file

@ -3,90 +3,30 @@ package main
import ( import (
"fmt" "fmt"
"log"
// "strconv"
"time"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"log"
) )
type produced struct {
user string
logins []string
}
type loginsList struct {
user string
logins []string
err bool
}
func producer() { func producer() {
conn := dbs.rdb.Get() conn := dbs.rdb.Get()
defer conn.Close() defer conn.Close()
for loop { for loop {
wg.Wait()
status = _Producer
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login // estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex")) user, err := redis.String(conn.Do("spop", "llindex"))
if opts.Debug {
log.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err)
fmt.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err)
}
// se non ci sono piu' userid esce // se non ci sono piu' userid esce
if err != nil { if err != nil {
if opts.Debug { if opts.Debug {
fmt.Printf("LLINDEX empty: %v\n", err) log.Printf("LLINDEX empty: %v\n\r", err)
fmt.Printf("LLINDEX empty: %v\n\r", err)
} }
log.Printf("LLINDEX empty: %v\n", err)
break break
} }
msgs <- user
// estrae tutti i logins dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
if err != nil {
if opts.Debug {
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
} }
log.Printf("LRANGE: %+v - %+v\n", err, logs)
}
if opts.Debug {
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
if len(logs) > 0 {
counter <- Counterchan{
tipo: "user",
val: 1,
}
if opts.Debug {
fmt.Printf("PROD: %+v\n", time.Since(start))
}
if opts.Test {
log.Printf("PROD: %s - %d\n", user, len(logs))
}
wg.Add(1)
counter <- Counterchan{
tipo: "wg",
val: 1,
}
counter <- Counterchan{
tipo: "log",
val: len(logs),
}
consume <- loginsList{
user: user,
logins: logs,
err: false,
}
}
}
done <- true done <- true
} }

View file

@ -1,51 +0,0 @@
// finalizer
package main
import (
"fmt"
"time"
)
func remover() {
var conn = dbs.rdb.Get()
defer conn.Close()
for {
rem := <-remove
status = _Remover
start := time.Now()
if !rem.err {
for i := range rem.logins {
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", rem.logins[i])
}
}
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
if rem.err {
if opts.Debug {
fmt.Printf("SADD: %s\n", rem.user)
}
conn.Send("sadd", "llindex", rem.user)
if count.GetErr() >= opts.MaxError {
exit()
}
}
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush()
if opts.Debug {
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
}
counter <- Counterchan{
tipo: "wg",
val: -1,
}
wg.Done()
}
}

View file

@ -1,75 +0,0 @@
package main
/*
import (
"fmt"
"log"
// "strings"
rt "gopkg.in/dancannon/gorethink.v2"
)
type Rethink struct {
rtdb *rt.Session
}
func NewRethinkDB(cluster []string) (*Rethink, error) {
var (
err error
session *rt.Session
)
if len(cluster) > 1 {
session, err = rt.Connect(rt.ConnectOpts{
Addresses: cluster,
InitialCap: 10,
MaxOpen: 10,
})
} else {
session, err = rt.Connect(rt.ConnectOpts{
Address: cluster[0],
InitialCap: 10,
MaxOpen: 10,
})
}
if err != nil {
log.Printf("RethinkDB ERROR: %+v\n", err.Error())
return nil, err
}
return &Rethink{
rtdb: session,
}, nil
}
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTinsert: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb)
//resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTMulti: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) Close() {
r.rtdb.Close()
}
*/

View file

@ -1,39 +0,0 @@
// sigterm
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func kill() {
log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status)
wg.Done()
counter <- Counterchan{
tipo: "wg",
val: -1,
}
done <- true
}
func exit() {
log.Printf("EXIT %d\n", status)
fmt.Printf("EXIT %d\n", status)
loop = false
time.AfterFunc(time.Second*20, kill)
}
func setTerm() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
exit()
}()
}

View file

@ -24,9 +24,4 @@
# #
# m h dom mon dow command # m h dom mon dow command
### lastlogin import */10 * * * * /opt/llmongo/start.sh > /dev/null
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
### lastlogin create indexes
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1

View file

@ -1,63 +0,0 @@
// xymon.go
package main
/*
import (
"fmt"
"log"
"net"
"strings"
"time"
)
func sendStatus() {
var msg string
host := strings.Replace(opts.Hostname, ".", ",", -1)
if count.GetErr() > 0 {
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "yellow", time.Now(), "errors", count.GetErr())
} else {
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "green", time.Now(), "logins", count.GetLog())
}
if opts.Debug {
fmt.Println("Sending: ", msg)
}
xymonSend([]byte(msg))
}
func xymonSend(msg []byte) error {
var server string
if strings.Contains(opts.Xymon, ":") {
server = opts.Xymon
} else {
server = opts.Xymon + ":1984"
}
cl, err := net.Dial("tcp", server)
if err != nil {
fmt.Printf("xymon connect error: ", err)
log.Printf("xymon connect error: ", err)
return err
}
defer cl.Close()
// fmt.Printf("xymon: %s - localhost: %s\n", cl.RemoteAddr(), cl.LocalAddr())
_, err = cl.Write(msg)
if err != nil {
fmt.Printf("xymon write error: ", err)
log.Printf("xymon write error: ", err)
return err
}
// fmt.Printf("written: %d\n", tot)
return nil
}
*/