Compare commits

..

No commits in common. "master" and "v2.0.0" have entirely different histories.

26 changed files with 180 additions and 1241 deletions

View file

@ -1,11 +0,0 @@
#FROM busybox:latest
FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
ARG VER
ENV VER ${VER:-0.0.0}
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]

View file

@ -1,3 +0,0 @@
#!/bin/bash
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .

View file

@ -1,16 +0,0 @@
#!/bin/bash
docker run \
--rm \
-v /opt/llmongo/log:/data \
--name llmongo_tiscali \
--log-opt max-size=2m \
--log-opt max-file=5 \
mikif70/llmongo:4.2.1 \
-l /data/llmongo.log \
-r redis-ll.mail.tiscali.sys:6379 \
-m 10.39.80.189:27017 \
-d lastlogin \
-i enginedb1@10.39.109.107:8086 \
-T 60s \
$@

View file

@ -1,80 +0,0 @@
#!/bin/bash
MONGODB="secondary.mongod.service.mail:27017"
DATAPATH="/mnt/mongobck/NEW_EXPORT"
NAME="mtools"
DB="lastlogin"
opt()
{
case "$1" in
-d)
shift
DATE=$1
shift
opt $*
;;
-n)
shift
NAME=$1
shift
opt $*
;;
-db)
shift
DB=$1
shift
opt $*
;;
-pwd)
shift
DATAPATH=${PWD}
shift
opt $*
;;
esac
}
echo "Running... $(date)"
if [ -n "$1" ]
then
opt $*
fi
echo "Opts: DB-$DB DATE-$DATE"
if [ -z "${DATE}" ]
then
DATE=$(date -d "yesterday" +%Y%m%d)
fi
YDAY=$(date -d "${DATE}" +%Y-%m-%d)
TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d)
QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0
0:00:00Z\")}}"
FNAME=$(date -d "${DATE}" +%y%m%d)
CNAME=$(date -d "${DATE}" +%y%m)
echo "Query: ${QUERY}"
if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then
mkdir -p ${DATAPATH}/${DB}/${CNAME}
fi
/usr/bin/docker run \
--rm \
-h mtools \
--name ${NAME} \
-v ${DATAPATH}:/data \
mikif70/mongotools:3.4.5 mongoexport \
--host "${MONGODB}" \
--db $DB \
--collection "ll_${FNAME}" \
--type "csv" \
--fields "_id,user,protocol,ip,date,insert,country" \
--query "${QUERY}" \
--readPreference "secondary" \
--out "/data/${DB}/${CNAME}/ll_${FNAME}.csv"

View file

@ -1,5 +0,0 @@
#!/bin/bash
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
mv lastlogin_mongodb-$(git describe --tags) Docker/

View file

@ -1,44 +0,0 @@
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
// Consul client structure
type Consul struct {
RedisTTL time.Duration
Timeout time.Duration
MaxError int
Influxdb string
Month string
Retention float64
MongoURI string
Database string
RedisURI string
Client *api.Client
}
// NewClient New consul client
func NewClient() *Consul {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalf("Consul error: %+v\n", err)
if opts.Debug {
fmt.Printf("Consul error: %+v\n", err)
}
}
consul := &Consul{
Client: client,
RedisTTL: opts.RedisTTL,
Timeout: opts.Timeout,
MaxError: opts.MaxError,
Influxdb: opts.Influxdb,
}
return consul
}

View file

@ -2,187 +2,90 @@
package main
import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/garyburd/redigo/redis"
"log"
"strconv"
"strings"
"time"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
type consumed struct {
user string
error bool
logins []string
}
func hash256(val []byte) string {
h := sha256.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash160(val []byte) string {
h := sha1.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash128(val []byte) string {
h := md5.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
// protocol:timestamp:ip:country
func consumer() {
var date int64
var lastval string
var conn = dbs.rdb.Get()
defer conn.Close()
for {
prod := <-consume
start := time.Now()
status = _Consumer
var bulk = make(map[string][]*mgo.Bulk)
var allLogins = make(map[string]MongoLogin)
for i := range prod.logins {
login := prod.logins[i]
// se la riga di login e' vuota
if login == "" {
log.Println("Login empty: ", prod.user)
user := <-msgs
// Estrae l'ultimo login dell'utente 'user'
val, err := redis.String(conn.Do("LINDEX", user, "-1"))
if err != nil {
if opts.Debug {
log.Printf("LINDEX error: %+v - %s\n\r", err, val)
fmt.Printf("LINDEX error: %+v - %s\n\r", err, val)
}
// se ha trovato user e righe di login
if lastval != "" {
// reinserisce l'ultimo login e imposta il ttl su Redis
retval, _ := conn.Do("lpush", user, lastval)
ttl, _ := conn.Do("expire", user, opts.RedisTTL.Seconds())
if opts.Debug {
log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
fmt.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
}
}
// break
continue
}
sval := strings.Split(login, ":")
// se la riga di login e' vuota
if val == "" {
log.Println("Login empty: ", user)
retval, _ := conn.Do("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
sval := strings.Split(val, ":")
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", login, prod.user)
log.Println("Login format error: ", val, user)
retval, _ := conn.Do("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
// se il timestamp della riga di login non e' corretto
date, err := strconv.ParseInt(sval[1], 10, 64)
date, err = strconv.ParseInt(sval[1], 10, 64)
if err != nil {
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
log.Printf("Date Error: %+v - %s\n", err, user)
continue
}
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
if opts.Debug {
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
}
if !opts.Test {
continue
}
}
// verifica se esiste la country
if len(sval) <= 3 {
sval = append(sval, "NONE")
}
// genera l' _ID con user + timestamp + ip
mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
ml := MongoLogin{
ID: mlID,
User: prod.user,
User: user,
Protocol: sval[0],
IP: sval[2],
Ip: sval[2],
Date: time.Unix(date, 0),
Insert: time.Now(),
Country: sval[3],
}
allLogins[mlID] = ml
ind := Index{
User: user,
Date: time.Unix(date, 0),
}
for _, val := range allLogins {
dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay
if _, ok := bulk[dt]; !ok {
for j := range dbs.mdb {
b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk()
b.Unordered()
bulk[dt] = append(bulk[dt], b)
}
}
for _, bl := range bulk[dt] {
bl.Insert(val)
}
}
for _, val := range bulk {
for j, bl := range val {
result, err := bl.Run()
if j == 0 {
// inserisce il login su Mongodb
count++
_, err = dbs.ll.Upsert(ind, ml)
if err != nil {
log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
prod.err = true
counter <- Counterchan{
tipo: "err",
val: len(prod.logins),
}
if opts.Test || opts.Debug {
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
}
errCount += 1
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: strings.Count(err.Error(), "E11000"),
}
}
// cancella da Redis la riga di login inserita
retval, err := conn.Do("lrem", user, "-1", val)
if opts.Debug {
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins)
}
}
} else {
if opts.Test {
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
}
counter <- Counterchan{
tipo: "ins",
val: len(allLogins),
}
}
}
}
}
if opts.Debug {
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}
if opts.Debug && len(prod.logins) > 10 {
fmt.Printf("LOGS: %+v\n", prod.logins)
}
if !prod.err {
counter <- Counterchan{
tipo: "rem",
val: len(prod.logins),
}
}
// wg.Done()
remove <- prod
log.Println("LREM retval: ", retval, user, val)
fmt.Println("LREM retval: ", retval, user, val)
}
lastval = val
}
}

View file

@ -1,180 +0,0 @@
// counter
package main
import (
"sync"
"time"
)
// Counterchan counter structure
type Counterchan struct {
tipo string
val int
}
// Counter structure
type Counter struct {
mu sync.Mutex
user int
log int
insert int
rem int
err int
dup int
time time.Duration
wg int
}
// NewCounter iniitialized Counter structure
func NewCounter() *Counter {
return &Counter{
user: 0,
log: 0,
insert: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: 0,
}
}
// Run open channel to receive counter
func (c *Counter) Run() {
for {
tocount := <-counter
switch tocount.tipo {
case "user":
c.addUser()
case "dup":
c.addDuplicate(tocount.val)
case "ins":
c.addInsert(tocount.val)
case "log":
c.addLog(tocount.val)
case "rem":
c.addRem(tocount.val)
case "wg":
if tocount.val > 0 {
c.addWG()
} else {
c.delWG()
}
case "err":
c.addErr(tocount.val)
}
}
}
// AddUser increment number of users managed
func (c *Counter) addUser() {
c.user++
}
// AddDuplicate increment number of duplicates log
func (c *Counter) addDuplicate(add int) {
c.dup += add
}
// AddInsert increment number of inserted rows
func (c *Counter) addInsert(add int) {
c.insert += add
}
// AddLog increment number of log's rows managed
func (c *Counter) addLog(add int) {
c.log += add
}
//AddRem increment removed logs row
func (c *Counter) addRem(add int) {
c.rem += add
}
// AddWG ...
func (c *Counter) addWG() {
c.wg++
}
// AddErr ...
func (c *Counter) addErr(add int) {
c.err += add
}
// DelWG ...
func (c *Counter) delWG() {
c.wg--
}
// GetUser return total users
func (c *Counter) GetUser() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.user
return
}
// GetDup return total duplicated logins
func (c *Counter) GetDup() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.dup
return
}
// GetLog return total log's rows
func (c *Counter) GetLog() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.log
return
}
// GetInsert return total inserted rows
func (c *Counter) GetInsert() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.insert
return
}
// GetErr return total errors
func (c *Counter) GetErr() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.err
return
}
// GetRem return total removed log's rows
func (c *Counter) GetRem() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.rem
return
}
// GetWG ...
func (c *Counter) GetWG() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.wg
return
}
// GetTime ...
func (c *Counter) GetTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.time.Seconds()
return
}
// SetTime ...
func (c *Counter) SetTime(t time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.time = t
}

View file

@ -1,15 +0,0 @@
// v1.2
var db = connect("192.168.0.1:27017/lastlogin");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
var db = connect("192.168.0.1:27017/katamail");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})

83
dbs.go
View file

@ -2,80 +2,55 @@
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
// "github.com/fzzy/radix/redis"
"gopkg.in/mgo.v2"
"log"
"os"
"strings"
"time"
"github.com/garyburd/redigo/redis"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
var (
dbs = Dbs{
RedisURI: "",
Database: "",
MongoUri: "mongodb://127.0.0.1:27018",
RedisUri: "127.0.0.1:6379",
}
)
// Dbs structure
type Dbs struct {
MongoURI string
Database string
RedisURI string
MongoUri string
RedisUri string
rdb *redis.Pool //*redis.Client
mdb []*mgo.Session
mdb *mgo.Session
ll *mgo.Collection
// us *mgo.Collection
}
// MongoLogin structure
type MongoLogin struct {
ID string `json:"_id" bson:"_id" gorethink:"id"`
User string `json:"user" bson:"user" gorethink:"user"`
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
IP string `json:"ip" bson:"ip" gorethink:"ip"`
Date time.Time `json:"date" bson:"date" gorethink:"date"`
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
Country string `json:"country" bson:"country" gorethink:"country"`
User string `json:"user"`
Protocol string `json:"protocol"`
Ip string `json:"ip"`
Date time.Time `json:"date"`
}
// Ips structure
type Ips struct {
IP string `json:"ip"`
}
// UserLogin structure
type UserLogin struct {
User string `json:"user"`
Date time.Time `json:"date"`
Lock bool `json:"lock"`
}
// Index structure
type Index struct {
User string `json:"user"`
Date time.Time `json:"date"`
}
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
}
return false
}
func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{
MaxIdle: 128,
MaxActive: 1000,
Wait: true,
IdleTimeout: 1 * time.Second,
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", db.RedisURI)
c, err := redis.Dial("tcp", db.RedisUri)
if err != nil {
return nil, err
}
@ -89,18 +64,24 @@ func (db *Dbs) poolRedis() {
}
/*
func (db *Dbs) connectRedis() {
var err error
db.rdb, err = redis.Dial("tcp", db.RedisUri)
if err != nil {
log.Println("Redis connect Error: ", err.Error())
os.Exit(-1)
}
}
*/
func (db *Dbs) connectMongo() {
mongoList := strings.Split(db.MongoURI, "|")
for m := range mongoList {
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m]))
var err error
db.mdb, err = mgo.Dial(db.MongoUri)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
nm.SetSocketTimeout(5 * time.Second)
nm.SetSyncTimeout(5 * time.Second)
db.mdb = append(db.mdb, nm)
}
db.ll = db.mdb.DB("dovecot").C("lastlogin")
// db.us = db.mdb.DB("dovecot").C("userlogin")
}

View file

@ -1,18 +0,0 @@
version: '2'
services:
mongodb:
image: mikif70/mongodb:3.6.9
ports:
- 27017:27017
container_name: ll_mongod
volumes:
- ./mongod:/data
depends_on:
- redis
redis:
image: "redis:alpine"
container_name: ll_redis
ports:
- 6379:6379
volumes:
- ./redis:/data

View file

@ -1,9 +0,0 @@
#!/bin/bash
if [ -z $1 ]; then
HOST=192.168.0.1:27017
else
HOST=${1}
fi
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}

View file

@ -1,16 +0,0 @@
#!/bin/bash
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
docker run \
--rm \
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
--name llmongo \
mikif70/llmongo:$(git -C .. describe --tags) \
-l /data/llmongo.log \
-r 192.168.0.1:6379 \
-m 192.168.0.1:27017 \
-d lastlogin \
-i test@10.39.253.206:8086 \
-T 80s \
$@

View file

@ -1,73 +0,0 @@
// influxdb
package main
import (
"fmt"
"log"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
)
var (
infdb string
infhost string
)
func writeStats(start time.Time) {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: fmt.Sprintf("http://%s", infdb),
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("InfluxDB connect Error: %+v\n", err)
log.Printf("InfluxDB connect Error: %+v\n", err)
return
}
defer c.Close()
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: "dovecot",
Precision: "s",
})
if err != nil {
fmt.Printf("InfluxDB batch Error: %+v\n", err)
log.Printf("InfluxDB batch Error: %+v\n", err)
return
}
tags := map[string]string{"server": infhost, "domain": dbs.Database}
fields := map[string]interface{}{
"user": count.GetUser(),
"log": count.GetLog(),
"err": count.GetErr(),
"rem": count.GetRem(),
"dup": count.GetDup(),
"stop": count.GetTime(),
}
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
if err != nil {
fmt.Printf("InfluxDB point Error: %+v\n", err)
log.Printf("InfluxDB point Error: %+v\n", err)
return
}
if opts.Debug {
fmt.Printf("InfluxDB pt: %+v\n", pt)
}
bp.AddPoint(pt)
// Write the batch
err = c.Write(bp)
if err != nil {
fmt.Printf("InfluxDB write Error: %+v\n", err)
log.Printf("InfluxDB write Error: %+v\n", err)
return
}
}

View file

@ -1,28 +0,0 @@
/opt/mongodb/mongod/log/*.log
{
daily
rotate 10
maxsize 10M
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
postrotate
/usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true
endscript
}
/opt/llmongo/log/*.log
{
daily
maxsize 5M
rotate 8
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
}

146
main.go
View file

@ -6,67 +6,78 @@ import (
"fmt"
"log"
"os"
"regexp"
"sync"
"path"
"path/filepath"
"time"
_ "github.com/nats-io/go-nats"
)
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
Timeout bool
Debug bool
Version bool
}
const (
_Version = "v5.0.0b6"
_Producer = 0
_Consumer = 1
_Remover = 2
_VERSION = "v2.0.0"
)
var (
loop bool
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
done chan bool
consume chan loginsList
remove chan loginsList
loop = true
ttl = time.Second * 55
done = make(chan bool)
msgs = make(chan string)
counter chan Counterchan
wg sync.WaitGroup
status int
count *Counter
count = 0
errCount = 0
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -T -D -v\n")
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.BoolVar(&opts.Timeout, "T", false, "Timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
}
func stopLoop() {
loop = false
}
func main() {
flag.Usage = usage
flag.Parse()
if opts.Version {
fmt.Println(os.Args[0], _Version)
fmt.Println(os.Args[0], _VERSION)
os.Exit(0)
}
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
flag.Usage()
os.Exit(-1)
}
if opts.Influxdb != "" {
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
if opts.Debug {
fmt.Printf("Influxdb match: %+v\n", match)
}
infhost = match[1]
infdb = match[2] + ":" + match[3]
} else {
opts.Influxdb = ""
}
}
setTerm()
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
@ -76,58 +87,27 @@ func main() {
log.SetOutput(fs)
// pid.PIDFile = opts.Pidfile
// pid.Write(true)
// defer pid.Remove()
pid.Write(true)
defer pid.Remove()
start := time.Now()
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
opts.Month = start.Format("0601")
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
dbs.poolRedis()
defer dbs.rdb.Close()
dbs.connectMongo()
for k := range dbs.mdb {
defer dbs.mdb[k].Close()
defer dbs.mdb.Close()
if opts.Timeout {
time.AfterFunc(ttl, stopLoop)
}
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
}
count = NewCounter()
consume = make(chan loginsList, opts.Queue)
remove = make(chan loginsList, opts.Queue)
loop = true
done = make(chan bool)
counter = make(chan Counterchan)
go count.Run()
go producer()
for i := 0; i < opts.Queue; i++ {
for j := 0; j < len(dbs.mdb); j++ {
go consumer()
}
go remover()
}
<-done
fmt.Printf("Done\n")
close(done)
fmt.Println("Waiting WG")
wg.Wait()
count.SetTime(time.Since(start))
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
if opts.Influxdb != "" {
writeStats(start)
}
fmt.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount)
log.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount)
}

View file

@ -1,15 +0,0 @@
// Testmain.go
package main
import (
"flag"
"fmt"
"testing"
)
func TestOptions(t *testing.T) {
flag.Usage = usage
flag.Parse()
fmt.Printf("Config: %+v\n", opts)
}

View file

@ -1,34 +0,0 @@
################
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var i=1; i<18; i++) {
var col = "ll_1806"+dd(i);
print(col);
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
}
###########
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var m=1; m<=12; m++) {
for (var d=1; d<=31; d++) {
var col = "ll_15"+dd(m)+dd(d);
if (db.getCollection(col).exists != null) {
print(col);
print(db.getCollection(col).totalSize());
}
}
}
####
var ar = db.getCollectionNames()
var len = ar.length
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
####
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})

View file

@ -1,85 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"path"
"path/filepath"
"time"
)
// Options structure
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
ConfigFile string
Timeout time.Duration
Debug bool
Test bool
Version bool
MaxError int
Influxdb string
Month string
Queue int
Retention float64
Consul string
Port int
}
var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
)
func usage() {
fmt.Println(`Usage: llmongo
-m <mongodb [ip:port,ip:port]>
-d <mongodb database [dbname]>
-r <redisdb [ip:port]>
-t <redis keys ttl>
-l <logfile>
-T <running timeout>
-i <influxdb [localname@ip:port]>
-C <consul [ip:port]>
-P <check port> [port] ## used by consul to check services ##
-q <parallel consumer>
-R <retention>
-v <version>
-D <debug>
-DD <Test>`)
fmt.Println()
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
flag.IntVar(&opts.Port, "P", 10500, "service check port")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
}

10
pid.go
View file

@ -1,7 +1,6 @@
// pid
package main
/*
import (
"bytes"
"fmt"
@ -17,7 +16,6 @@ var (
pid = PID{}
)
// PID structure
type PID struct {
PID string
PIDFile string
@ -44,8 +42,11 @@ func (p *PID) readCmd() bool {
return false
}
cmd := bytes.Trim(bcmd, "\x00")
if !strings.Contains(string(cmd), opts.Exe) {
if strings.Contains(string(cmd), opts.Exe) {
return true
} else {
fmt.Printf("PID %s used by %s\n", pid, cmd)
return true
}
return true
}
@ -77,11 +78,10 @@ func (p *PID) Write(l bool) {
fpid.Close()
}
// Remove cancella il PIDFile
// Cancella il PIDFile
func (p *PID) Remove() {
err := os.Remove(p.PIDFile)
if err != nil {
fmt.Println("RM file error: ", err.Error())
}
}
*/

View file

@ -3,90 +3,30 @@ package main
import (
"fmt"
"log"
// "strconv"
"time"
"github.com/garyburd/redigo/redis"
"log"
)
type produced struct {
user string
logins []string
}
type loginsList struct {
user string
logins []string
err bool
}
func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
for loop {
wg.Wait()
status = _Producer
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))
if opts.Debug {
log.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err)
fmt.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err)
}
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("LLINDEX empty: %v\n", err)
log.Printf("LLINDEX empty: %v\n\r", err)
fmt.Printf("LLINDEX empty: %v\n\r", err)
}
log.Printf("LLINDEX empty: %v\n", err)
break
}
// estrae tutti i logins dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
if err != nil {
if opts.Debug {
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
msgs <- user
}
log.Printf("LRANGE: %+v - %+v\n", err, logs)
}
if opts.Debug {
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
if len(logs) > 0 {
counter <- Counterchan{
tipo: "user",
val: 1,
}
if opts.Debug {
fmt.Printf("PROD: %+v\n", time.Since(start))
}
if opts.Test {
log.Printf("PROD: %s - %d\n", user, len(logs))
}
wg.Add(1)
counter <- Counterchan{
tipo: "wg",
val: 1,
}
counter <- Counterchan{
tipo: "log",
val: len(logs),
}
consume <- loginsList{
user: user,
logins: logs,
err: false,
}
}
}
done <- true
}

View file

@ -1,51 +0,0 @@
// finalizer
package main
import (
"fmt"
"time"
)
func remover() {
var conn = dbs.rdb.Get()
defer conn.Close()
for {
rem := <-remove
status = _Remover
start := time.Now()
if !rem.err {
for i := range rem.logins {
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", rem.logins[i])
}
}
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
if rem.err {
if opts.Debug {
fmt.Printf("SADD: %s\n", rem.user)
}
conn.Send("sadd", "llindex", rem.user)
if count.GetErr() >= opts.MaxError {
exit()
}
}
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush()
if opts.Debug {
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
}
counter <- Counterchan{
tipo: "wg",
val: -1,
}
wg.Done()
}
}

View file

@ -1,75 +0,0 @@
package main
/*
import (
"fmt"
"log"
// "strings"
rt "gopkg.in/dancannon/gorethink.v2"
)
type Rethink struct {
rtdb *rt.Session
}
func NewRethinkDB(cluster []string) (*Rethink, error) {
var (
err error
session *rt.Session
)
if len(cluster) > 1 {
session, err = rt.Connect(rt.ConnectOpts{
Addresses: cluster,
InitialCap: 10,
MaxOpen: 10,
})
} else {
session, err = rt.Connect(rt.ConnectOpts{
Address: cluster[0],
InitialCap: 10,
MaxOpen: 10,
})
}
if err != nil {
log.Printf("RethinkDB ERROR: %+v\n", err.Error())
return nil, err
}
return &Rethink{
rtdb: session,
}, nil
}
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTinsert: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb)
//resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTMulti: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) Close() {
r.rtdb.Close()
}
*/

View file

@ -1,39 +0,0 @@
// sigterm
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func kill() {
log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status)
wg.Done()
counter <- Counterchan{
tipo: "wg",
val: -1,
}
done <- true
}
func exit() {
log.Printf("EXIT %d\n", status)
fmt.Printf("EXIT %d\n", status)
loop = false
time.AfterFunc(time.Second*20, kill)
}
func setTerm() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
exit()
}()
}

View file

@ -24,9 +24,4 @@
#
# m h dom mon dow command
### lastlogin import
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
### lastlogin create indexes
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1
*/10 * * * * /opt/llmongo/start.sh > /dev/null

View file

@ -1,63 +0,0 @@
// xymon.go
package main
/*
import (
"fmt"
"log"
"net"
"strings"
"time"
)
func sendStatus() {
var msg string
host := strings.Replace(opts.Hostname, ".", ",", -1)
if count.GetErr() > 0 {
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "yellow", time.Now(), "errors", count.GetErr())
} else {
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "green", time.Now(), "logins", count.GetLog())
}
if opts.Debug {
fmt.Println("Sending: ", msg)
}
xymonSend([]byte(msg))
}
func xymonSend(msg []byte) error {
var server string
if strings.Contains(opts.Xymon, ":") {
server = opts.Xymon
} else {
server = opts.Xymon + ":1984"
}
cl, err := net.Dial("tcp", server)
if err != nil {
fmt.Printf("xymon connect error: ", err)
log.Printf("xymon connect error: ", err)
return err
}
defer cl.Close()
// fmt.Printf("xymon: %s - localhost: %s\n", cl.RemoteAddr(), cl.LocalAddr())
_, err = cl.Write(msg)
if err != nil {
fmt.Printf("xymon write error: ", err)
log.Printf("xymon write error: ", err)
return err
}
// fmt.Printf("written: %d\n", tot)
return nil
}
*/