Compare commits

..

No commits in common. "master" and "v1.1.1" have entirely different histories.

27 changed files with 276 additions and 1472 deletions

View file

@ -1,11 +0,0 @@
#FROM busybox:latest
FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
ARG VER
ENV VER ${VER:-0.0.0}
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]

View file

@ -1,3 +0,0 @@
#!/bin/bash
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .

View file

@ -1,16 +0,0 @@
#!/bin/bash
docker run \
--rm \
-v /opt/llmongo/log:/data \
--name llmongo_tiscali \
--log-opt max-size=2m \
--log-opt max-file=5 \
mikif70/llmongo:4.2.1 \
-l /data/llmongo.log \
-r redis-ll.mail.tiscali.sys:6379 \
-m 10.39.80.189:27017 \
-d lastlogin \
-i enginedb1@10.39.109.107:8086 \
-T 60s \
$@

View file

@ -1,80 +0,0 @@
#!/bin/bash
MONGODB="secondary.mongod.service.mail:27017"
DATAPATH="/mnt/mongobck/NEW_EXPORT"
NAME="mtools"
DB="lastlogin"
opt()
{
case "$1" in
-d)
shift
DATE=$1
shift
opt $*
;;
-n)
shift
NAME=$1
shift
opt $*
;;
-db)
shift
DB=$1
shift
opt $*
;;
-pwd)
shift
DATAPATH=${PWD}
shift
opt $*
;;
esac
}
echo "Running... $(date)"
if [ -n "$1" ]
then
opt $*
fi
echo "Opts: DB-$DB DATE-$DATE"
if [ -z "${DATE}" ]
then
DATE=$(date -d "yesterday" +%Y%m%d)
fi
YDAY=$(date -d "${DATE}" +%Y-%m-%d)
TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d)
QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0
0:00:00Z\")}}"
FNAME=$(date -d "${DATE}" +%y%m%d)
CNAME=$(date -d "${DATE}" +%y%m)
echo "Query: ${QUERY}"
if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then
mkdir -p ${DATAPATH}/${DB}/${CNAME}
fi
/usr/bin/docker run \
--rm \
-h mtools \
--name ${NAME} \
-v ${DATAPATH}:/data \
mikif70/mongotools:3.4.5 mongoexport \
--host "${MONGODB}" \
--db $DB \
--collection "ll_${FNAME}" \
--type "csv" \
--fields "_id,user,protocol,ip,date,insert,country" \
--query "${QUERY}" \
--readPreference "secondary" \
--out "/data/${DB}/${CNAME}/ll_${FNAME}.csv"

View file

@ -1,5 +0,0 @@
#!/bin/bash
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
mv lastlogin_mongodb-$(git describe --tags) Docker/

View file

@ -1,44 +0,0 @@
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
// Consul client structure
type Consul struct {
RedisTTL time.Duration
Timeout time.Duration
MaxError int
Influxdb string
Month string
Retention float64
MongoURI string
Database string
RedisURI string
Client *api.Client
}
// NewClient New consul client
func NewClient() *Consul {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalf("Consul error: %+v\n", err)
if opts.Debug {
fmt.Printf("Consul error: %+v\n", err)
}
}
consul := &Consul{
Client: client,
RedisTTL: opts.RedisTTL,
Timeout: opts.Timeout,
MaxError: opts.MaxError,
Influxdb: opts.Influxdb,
}
return consul
}

View file

@ -1,188 +0,0 @@
// consumer
package main
import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
type consumed struct {
user string
error bool
logins []string
}
func hash256(val []byte) string {
h := sha256.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash160(val []byte) string {
h := sha1.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash128(val []byte) string {
h := md5.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
// protocol:timestamp:ip:country
func consumer() {
for {
prod := <-consume
start := time.Now()
status = _Consumer
var bulk = make(map[string][]*mgo.Bulk)
var allLogins = make(map[string]MongoLogin)
for i := range prod.logins {
login := prod.logins[i]
// se la riga di login e' vuota
if login == "" {
log.Println("Login empty: ", prod.user)
continue
}
sval := strings.Split(login, ":")
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", login, prod.user)
continue
}
// se il timestamp della riga di login non e' corretto
date, err := strconv.ParseInt(sval[1], 10, 64)
if err != nil {
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
continue
}
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
if opts.Debug {
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
}
if !opts.Test {
continue
}
}
// verifica se esiste la country
if len(sval) <= 3 {
sval = append(sval, "NONE")
}
// genera l' _ID con user + timestamp + ip
mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
ml := MongoLogin{
ID: mlID,
User: prod.user,
Protocol: sval[0],
IP: sval[2],
Date: time.Unix(date, 0),
Insert: time.Now(),
Country: sval[3],
}
allLogins[mlID] = ml
}
for _, val := range allLogins {
dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay
if _, ok := bulk[dt]; !ok {
for j := range dbs.mdb {
b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk()
b.Unordered()
bulk[dt] = append(bulk[dt], b)
}
}
for _, bl := range bulk[dt] {
bl.Insert(val)
}
}
for _, val := range bulk {
for j, bl := range val {
result, err := bl.Run()
if j == 0 {
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
prod.err = true
counter <- Counterchan{
tipo: "err",
val: len(prod.logins),
}
if opts.Test || opts.Debug {
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: strings.Count(err.Error(), "E11000"),
}
if opts.Debug {
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins)
}
}
} else {
if opts.Test {
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
}
counter <- Counterchan{
tipo: "ins",
val: len(allLogins),
}
}
}
}
}
if opts.Debug {
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}
if opts.Debug && len(prod.logins) > 10 {
fmt.Printf("LOGS: %+v\n", prod.logins)
}
if !prod.err {
counter <- Counterchan{
tipo: "rem",
val: len(prod.logins),
}
}
// wg.Done()
remove <- prod
}
}

View file

@ -1,180 +0,0 @@
// counter
package main
import (
"sync"
"time"
)
// Counterchan counter structure
type Counterchan struct {
tipo string
val int
}
// Counter structure
type Counter struct {
mu sync.Mutex
user int
log int
insert int
rem int
err int
dup int
time time.Duration
wg int
}
// NewCounter iniitialized Counter structure
func NewCounter() *Counter {
return &Counter{
user: 0,
log: 0,
insert: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: 0,
}
}
// Run open channel to receive counter
func (c *Counter) Run() {
for {
tocount := <-counter
switch tocount.tipo {
case "user":
c.addUser()
case "dup":
c.addDuplicate(tocount.val)
case "ins":
c.addInsert(tocount.val)
case "log":
c.addLog(tocount.val)
case "rem":
c.addRem(tocount.val)
case "wg":
if tocount.val > 0 {
c.addWG()
} else {
c.delWG()
}
case "err":
c.addErr(tocount.val)
}
}
}
// AddUser increment number of users managed
func (c *Counter) addUser() {
c.user++
}
// AddDuplicate increment number of duplicates log
func (c *Counter) addDuplicate(add int) {
c.dup += add
}
// AddInsert increment number of inserted rows
func (c *Counter) addInsert(add int) {
c.insert += add
}
// AddLog increment number of log's rows managed
func (c *Counter) addLog(add int) {
c.log += add
}
//AddRem increment removed logs row
func (c *Counter) addRem(add int) {
c.rem += add
}
// AddWG ...
func (c *Counter) addWG() {
c.wg++
}
// AddErr ...
func (c *Counter) addErr(add int) {
c.err += add
}
// DelWG ...
func (c *Counter) delWG() {
c.wg--
}
// GetUser return total users
func (c *Counter) GetUser() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.user
return
}
// GetDup return total duplicated logins
func (c *Counter) GetDup() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.dup
return
}
// GetLog return total log's rows
func (c *Counter) GetLog() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.log
return
}
// GetInsert return total inserted rows
func (c *Counter) GetInsert() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.insert
return
}
// GetErr return total errors
func (c *Counter) GetErr() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.err
return
}
// GetRem return total removed log's rows
func (c *Counter) GetRem() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.rem
return
}
// GetWG ...
func (c *Counter) GetWG() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.wg
return
}
// GetTime ...
func (c *Counter) GetTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.time.Seconds()
return
}
// SetTime ...
func (c *Counter) SetTime(t time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.time = t
}

View file

@ -1,15 +0,0 @@
// v1.2
var db = connect("192.168.0.1:27017/lastlogin");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
var db = connect("192.168.0.1:27017/katamail");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})

106
dbs.go
View file

@ -1,106 +0,0 @@
// dbs
package main
import (
"fmt"
"log"
"os"
"strings"
"time"
"github.com/garyburd/redigo/redis"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
var (
dbs = Dbs{
RedisURI: "",
Database: "",
}
)
// Dbs structure
type Dbs struct {
MongoURI string
Database string
RedisURI string
rdb *redis.Pool //*redis.Client
mdb []*mgo.Session
}
// MongoLogin structure
type MongoLogin struct {
ID string `json:"_id" bson:"_id" gorethink:"id"`
User string `json:"user" bson:"user" gorethink:"user"`
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
IP string `json:"ip" bson:"ip" gorethink:"ip"`
Date time.Time `json:"date" bson:"date" gorethink:"date"`
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
Country string `json:"country" bson:"country" gorethink:"country"`
}
// Ips structure
type Ips struct {
IP string `json:"ip"`
}
// UserLogin structure
type UserLogin struct {
User string `json:"user"`
Date time.Time `json:"date"`
Lock bool `json:"lock"`
}
// Index structure
type Index struct {
User string `json:"user"`
Date time.Time `json:"date"`
}
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
}
return false
}
func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{
MaxIdle: 128,
MaxActive: 1000,
Wait: true,
IdleTimeout: 1 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", db.RedisURI)
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func (db *Dbs) connectMongo() {
mongoList := strings.Split(db.MongoURI, "|")
for m := range mongoList {
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m]))
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
nm.SetSocketTimeout(5 * time.Second)
nm.SetSyncTimeout(5 * time.Second)
db.mdb = append(db.mdb, nm)
}
}

View file

@ -1,18 +0,0 @@
version: '2'
services:
mongodb:
image: mikif70/mongodb:3.6.9
ports:
- 27017:27017
container_name: ll_mongod
volumes:
- ./mongod:/data
depends_on:
- redis
redis:
image: "redis:alpine"
container_name: ll_redis
ports:
- 6379:6379
volumes:
- ./redis:/data

View file

@ -1,9 +0,0 @@
#!/bin/bash
if [ -z $1 ]; then
HOST=192.168.0.1:27017
else
HOST=${1}
fi
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}

View file

@ -1,16 +0,0 @@
#!/bin/bash
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
docker run \
--rm \
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
--name llmongo \
mikif70/llmongo:$(git -C .. describe --tags) \
-l /data/llmongo.log \
-r 192.168.0.1:6379 \
-m 192.168.0.1:27017 \
-d lastlogin \
-i test@10.39.253.206:8086 \
-T 80s \
$@

View file

@ -1,73 +0,0 @@
// influxdb
package main
import (
"fmt"
"log"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
)
var (
infdb string
infhost string
)
func writeStats(start time.Time) {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: fmt.Sprintf("http://%s", infdb),
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("InfluxDB connect Error: %+v\n", err)
log.Printf("InfluxDB connect Error: %+v\n", err)
return
}
defer c.Close()
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
Database: "dovecot",
Precision: "s",
})
if err != nil {
fmt.Printf("InfluxDB batch Error: %+v\n", err)
log.Printf("InfluxDB batch Error: %+v\n", err)
return
}
tags := map[string]string{"server": infhost, "domain": dbs.Database}
fields := map[string]interface{}{
"user": count.GetUser(),
"log": count.GetLog(),
"err": count.GetErr(),
"rem": count.GetRem(),
"dup": count.GetDup(),
"stop": count.GetTime(),
}
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
if err != nil {
fmt.Printf("InfluxDB point Error: %+v\n", err)
log.Printf("InfluxDB point Error: %+v\n", err)
return
}
if opts.Debug {
fmt.Printf("InfluxDB pt: %+v\n", pt)
}
bp.AddPoint(pt)
// Write the batch
err = c.Write(bp)
if err != nil {
fmt.Printf("InfluxDB write Error: %+v\n", err)
log.Printf("InfluxDB write Error: %+v\n", err)
return
}
}

275
llmongo.go Normal file
View file

@ -0,0 +1,275 @@
// llmongo.go
package main
import (
"flag"
"fmt"
"github.com/fzzy/radix/redis"
"gopkg.in/mgo.v2"
"os"
// "gopkg.in/mgo.v2/bson"
"bytes"
"io/ioutil"
"log"
"path"
"path/filepath"
"strconv"
"strings"
"time"
)
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
PIDFile string
Version bool
}
type Dbs struct {
MongoUri string
RedisUri string
rdb *redis.Client
mdb *mgo.Session
ll *mgo.Collection
}
type MongoLogin struct {
User string `json:"user"`
Protocol string `json:"protocol"`
Ip string `json:"ip"`
Date time.Time `json:"date"`
}
type Index struct {
User string `json:"user"`
Date time.Time `json:"date"`
}
const (
_VERSION = "v1.1.1"
)
var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
dbs = Dbs{
MongoUri: "mongodb://127.0.0.1:27018",
RedisUri: "127.0.0.1:6379",
}
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -v\n")
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
opts.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
}
func (db *Dbs) connectRedis() {
var err error
db.rdb, err = redis.Dial("tcp", db.RedisUri)
if err != nil {
log.Println("Redis connect Error: ", err.Error())
os.Exit(-1)
}
}
func (db *Dbs) connectMongo() {
var err error
db.mdb, err = mgo.Dial(db.MongoUri)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
db.ll = db.mdb.DB("dovecot").C("lastlogin")
}
func checkPID(bpid []byte) bool {
pid := strings.TrimRight(string(bpid), "\n")
fmt.Println("PID: ", pid)
bcmd, err := ioutil.ReadFile(path.Join("/proc", pid, "cmdline"))
if err != nil {
fmt.Println("cmdline error: ", err)
return false
}
cmd := bytes.Trim(bcmd, "\x00")
// fmt.Println(string(cmd), opts.Exe)
if strings.Contains(string(cmd), opts.Exe) {
return true
} else {
fmt.Printf("PID %s used by %s\n", pid, cmd)
return true
}
return true
}
func writePID(pfile string) {
fpid, err := os.OpenFile(pfile, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Println("PID file error: ", err.Error())
os.Exit(-5)
}
fpid.WriteString(strconv.Itoa(os.Getpid()))
fpid.Close()
}
func main() {
flag.Usage = usage
flag.Parse()
if opts.Version {
fmt.Println(os.Args[0], _VERSION)
os.Exit(0)
}
if bpid, err := ioutil.ReadFile(opts.PIDFile); err == nil && checkPID(bpid) {
fmt.Println("Running: ", string(bpid))
os.Exit(-6)
} else {
writePID(opts.PIDFile)
}
defer os.Remove(opts.PIDFile)
// fmt.Println(os.Stat(opts.PIDFile))
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
os.Exit(-4)
}
defer fs.Close()
log.SetOutput(fs)
// log.SetPrefix("[llmongo] ")
start := time.Now()
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
dbs.connectRedis()
defer dbs.rdb.Close()
dbs.connectMongo()
defer dbs.mdb.Close()
// // Estrae la lista degli utenti che hanno fatto login negli ultimi X min.
// llindex := opts.rdb.Cmd("smembers", "llindex")
// lista, err := llindex.List()
// if err != nil {
// log.Panicln("LLINDEX error: ", err.Error())
// }
// // for _, user := range lista {
// // cicla fino a che esistono righe di login
// var wg sync.WaitGroup
count := 0
for {
// estrae un userid dalla lista degli utenti che hanno fatto login
spop := dbs.rdb.Cmd("spop", "llindex")
user, err := spop.Str()
log.Printf("SPOP: %+v %+v\n", spop, user)
// se non ci sono piu' userid esce
if err != nil {
log.Printf("LLINDEX empty: %v\n", err)
break
}
count++
// user := spop.String()
var date int64
var lastval, val string
for {
// Estrae l'ultimo login dell'utente 'user'
val, err = dbs.rdb.Cmd("lindex", user, "-1").Str()
if err != nil {
log.Printf("LINDEX error: %+v\n", err)
// se ha trovato user e righe di login
if lastval != "" {
// reinserisce l'ultimo login e imposta il ttl su Redis
retval := dbs.rdb.Cmd("lpush", user, lastval)
ttl := dbs.rdb.Cmd("expire", user, opts.RedisTTL.Seconds())
log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
}
break
}
// se la riga di login e' vuota
if val == "" {
log.Println("Login empty: ", user)
retval := dbs.rdb.Cmd("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
sval := strings.Split(val, ":")
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", val, user)
retval := dbs.rdb.Cmd("lrem", user, "-1", val)
log.Println("LREM retval: ", user, val, retval)
continue
}
// se il timestamp della riga di login non e' corretto
date, err = strconv.ParseInt(sval[1], 10, 64)
if err != nil {
log.Printf("Date Error: %+v - %s\n", err, user)
continue
}
ml := MongoLogin{
User: user,
Protocol: sval[0],
Ip: sval[2],
Date: time.Unix(date, 0),
}
ind := Index{
User: user,
Date: time.Unix(date, 0),
}
// inserisce il login su Mongodb
_, err := dbs.ll.Upsert(ind, ml)
if err != nil {
log.Printf("Insert error: %+v\n", err)
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
if !strings.Contains(err.Error(), "E11000") {
continue
}
}
// fmt.Println("Info: ", info)
// cancella da Redis la riga di login inserita
retval := dbs.rdb.Cmd("lrem", user, "-1", val)
log.Println("LREM retval: ", retval, user, val)
lastval = val
}
// controlla se ci sono ancora line di login per l'utente 'user'
/*
llen, _ := opts.rdb.Cmd("llen", user).Int64()
if llen <= 1 {
// elimina l'utente dalla lista di quelli che hanno fatto login se ci sono 1 o meno
retval := opts.rdb.Cmd("srem", "llindex", user)
log.Println("SREM retval: ", retval, user)
}
*/
}
fmt.Printf("Stop %v - %d\n", time.Since(start), count)
log.Printf("Stop %v - %d\n", time.Since(start), count)
}

View file

@ -1,28 +0,0 @@
/opt/mongodb/mongod/log/*.log
{
daily
rotate 10
maxsize 10M
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
postrotate
/usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true
endscript
}
/opt/llmongo/log/*.log
{
daily
maxsize 5M
rotate 8
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
}

133
main.go
View file

@ -1,133 +0,0 @@
// llmongo.go
package main
import (
"flag"
"fmt"
"log"
"os"
"regexp"
"sync"
"time"
_ "github.com/nats-io/go-nats"
)
const (
_Version = "v5.0.0b6"
_Producer = 0
_Consumer = 1
_Remover = 2
)
var (
loop bool
done chan bool
consume chan loginsList
remove chan loginsList
counter chan Counterchan
wg sync.WaitGroup
status int
count *Counter
)
func main() {
flag.Usage = usage
flag.Parse()
if opts.Version {
fmt.Println(os.Args[0], _Version)
os.Exit(0)
}
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
flag.Usage()
os.Exit(-1)
}
if opts.Influxdb != "" {
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
if opts.Debug {
fmt.Printf("Influxdb match: %+v\n", match)
}
infhost = match[1]
infdb = match[2] + ":" + match[3]
} else {
opts.Influxdb = ""
}
}
setTerm()
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Log file error: ", err.Error())
os.Exit(-4)
}
defer fs.Close()
log.SetOutput(fs)
// pid.PIDFile = opts.Pidfile
// pid.Write(true)
// defer pid.Remove()
start := time.Now()
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
opts.Month = start.Format("0601")
dbs.poolRedis()
defer dbs.rdb.Close()
dbs.connectMongo()
for k := range dbs.mdb {
defer dbs.mdb[k].Close()
}
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
}
count = NewCounter()
consume = make(chan loginsList, opts.Queue)
remove = make(chan loginsList, opts.Queue)
loop = true
done = make(chan bool)
counter = make(chan Counterchan)
go count.Run()
go producer()
for i := 0; i < opts.Queue; i++ {
for j := 0; j < len(dbs.mdb); j++ {
go consumer()
}
go remover()
}
<-done
fmt.Printf("Done\n")
close(done)
fmt.Println("Waiting WG")
wg.Wait()
count.SetTime(time.Since(start))
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
if opts.Influxdb != "" {
writeStats(start)
}
}

View file

@ -1,15 +0,0 @@
// Testmain.go
package main
import (
"flag"
"fmt"
"testing"
)
func TestOptions(t *testing.T) {
flag.Usage = usage
flag.Parse()
fmt.Printf("Config: %+v\n", opts)
}

View file

@ -1,34 +0,0 @@
################
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var i=1; i<18; i++) {
var col = "ll_1806"+dd(i);
print(col);
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
}
###########
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var m=1; m<=12; m++) {
for (var d=1; d<=31; d++) {
var col = "ll_15"+dd(m)+dd(d);
if (db.getCollection(col).exists != null) {
print(col);
print(db.getCollection(col).totalSize());
}
}
}
####
var ar = db.getCollectionNames()
var len = ar.length
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
####
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})

View file

@ -1,85 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"path"
"path/filepath"
"time"
)
// Options structure
type Options struct {
RedisTTL time.Duration
CurrentPath string
Exe string
LogFile string
ConfigFile string
Timeout time.Duration
Debug bool
Test bool
Version bool
MaxError int
Influxdb string
Month string
Queue int
Retention float64
Consul string
Port int
}
var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
)
func usage() {
fmt.Println(`Usage: llmongo
-m <mongodb [ip:port,ip:port]>
-d <mongodb database [dbname]>
-r <redisdb [ip:port]>
-t <redis keys ttl>
-l <logfile>
-T <running timeout>
-i <influxdb [localname@ip:port]>
-C <consul [ip:port]>
-P <check port> [port] ## used by consul to check services ##
-q <parallel consumer>
-R <retention>
-v <version>
-D <debug>
-DD <Test>`)
fmt.Println()
os.Exit(0)
}
func init() {
var err error
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
flag.IntVar(&opts.Port, "P", 10500, "service check port")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
}

87
pid.go
View file

@ -1,87 +0,0 @@
// pid
package main
/*
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strconv"
"strings"
)
var (
pid = PID{}
)
// PID structure
type PID struct {
PID string
PIDFile string
}
// verifica se esiste il PIDFile;
// se esiste legge il PID e controlla se e' running il processo associato
func (p *PID) check() bool {
bpid, err := ioutil.ReadFile(p.PIDFile)
p.PID = strings.TrimRight(string(bpid), "\n")
if err == nil && p.readCmd() {
return true
}
return false
}
// controlla se esiste il processo associato al PID,
// se il cmd e' lo stesso e se e' in esecuzione.
func (p *PID) readCmd() bool {
bcmd, err := ioutil.ReadFile(path.Join("/proc", p.PID, "cmdline"))
// non esiste la dir relativa al PID su /proc
if err != nil {
fmt.Println("cmdline error: ", err)
return false
}
cmd := bytes.Trim(bcmd, "\x00")
if !strings.Contains(string(cmd), opts.Exe) {
fmt.Printf("PID %s used by %s\n", pid, cmd)
}
return true
}
// scrive il PID nel PIDFile
func (p *PID) Write(l bool) {
if p.check() {
if l {
log.Println("Running: ", p.PID)
} else {
fmt.Println("Running: ", p.PID)
}
os.Exit(-6)
}
fpid, err := os.OpenFile(p.PIDFile, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
if l {
log.Println("PID file error: ", err.Error())
} else {
fmt.Println("PID file error: ", err.Error())
}
os.Exit(-5)
}
fpid.WriteString(strconv.Itoa(os.Getpid()))
fpid.Close()
}
// Remove cancella il PIDFile
func (p *PID) Remove() {
err := os.Remove(p.PIDFile)
if err != nil {
fmt.Println("RM file error: ", err.Error())
}
}
*/

View file

@ -1,92 +0,0 @@
// iterator
package main
import (
"fmt"
"log"
// "strconv"
"time"
"github.com/garyburd/redigo/redis"
)
type produced struct {
user string
logins []string
}
type loginsList struct {
user string
logins []string
err bool
}
func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
for loop {
wg.Wait()
status = _Producer
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("LLINDEX empty: %v\n", err)
}
log.Printf("LLINDEX empty: %v\n", err)
break
}
// estrae tutti i logins dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
if err != nil {
if opts.Debug {
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
}
log.Printf("LRANGE: %+v - %+v\n", err, logs)
}
if opts.Debug {
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
if len(logs) > 0 {
counter <- Counterchan{
tipo: "user",
val: 1,
}
if opts.Debug {
fmt.Printf("PROD: %+v\n", time.Since(start))
}
if opts.Test {
log.Printf("PROD: %s - %d\n", user, len(logs))
}
wg.Add(1)
counter <- Counterchan{
tipo: "wg",
val: 1,
}
counter <- Counterchan{
tipo: "log",
val: len(logs),
}
consume <- loginsList{
user: user,
logins: logs,
err: false,
}
}
}
done <- true
}

View file

@ -1,51 +0,0 @@
// finalizer
package main
import (
"fmt"
"time"
)
func remover() {
var conn = dbs.rdb.Get()
defer conn.Close()
for {
rem := <-remove
status = _Remover
start := time.Now()
if !rem.err {
for i := range rem.logins {
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", rem.logins[i])
}
}
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
if rem.err {
if opts.Debug {
fmt.Printf("SADD: %s\n", rem.user)
}
conn.Send("sadd", "llindex", rem.user)
if count.GetErr() >= opts.MaxError {
exit()
}
}
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush()
if opts.Debug {
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
}
counter <- Counterchan{
tipo: "wg",
val: -1,
}
wg.Done()
}
}

View file

@ -1,75 +0,0 @@
package main
/*
import (
"fmt"
"log"
// "strings"
rt "gopkg.in/dancannon/gorethink.v2"
)
type Rethink struct {
rtdb *rt.Session
}
func NewRethinkDB(cluster []string) (*Rethink, error) {
var (
err error
session *rt.Session
)
if len(cluster) > 1 {
session, err = rt.Connect(rt.ConnectOpts{
Addresses: cluster,
InitialCap: 10,
MaxOpen: 10,
})
} else {
session, err = rt.Connect(rt.ConnectOpts{
Address: cluster[0],
InitialCap: 10,
MaxOpen: 10,
})
}
if err != nil {
log.Printf("RethinkDB ERROR: %+v\n", err.Error())
return nil, err
}
return &Rethink{
rtdb: session,
}, nil
}
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTinsert: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb)
//resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTMulti: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) Close() {
r.rtdb.Close()
}
*/

View file

@ -1,39 +0,0 @@
// sigterm
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func kill() {
log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status)
wg.Done()
counter <- Counterchan{
tipo: "wg",
val: -1,
}
done <- true
}
func exit() {
log.Printf("EXIT %d\n", status)
fmt.Printf("EXIT %d\n", status)
loop = false
time.AfterFunc(time.Second*20, kill)
}
func setTerm() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
exit()
}()
}

View file

@ -24,9 +24,4 @@
#
# m h dom mon dow command
### lastlogin import
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
### lastlogin create indexes
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1
*/10 * * * * /opt/llmongo/start.sh > /dev/null

View file

@ -1,63 +0,0 @@
// xymon.go
package main
/*
import (
"fmt"
"log"
"net"
"strings"
"time"
)
func sendStatus() {
var msg string
host := strings.Replace(opts.Hostname, ".", ",", -1)
if count.GetErr() > 0 {
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "yellow", time.Now(), "errors", count.GetErr())
} else {
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "green", time.Now(), "logins", count.GetLog())
}
if opts.Debug {
fmt.Println("Sending: ", msg)
}
xymonSend([]byte(msg))
}
func xymonSend(msg []byte) error {
var server string
if strings.Contains(opts.Xymon, ":") {
server = opts.Xymon
} else {
server = opts.Xymon + ":1984"
}
cl, err := net.Dial("tcp", server)
if err != nil {
fmt.Printf("xymon connect error: ", err)
log.Printf("xymon connect error: ", err)
return err
}
defer cl.Close()
// fmt.Printf("xymon: %s - localhost: %s\n", cl.RemoteAddr(), cl.LocalAddr())
_, err = cl.Write(msg)
if err != nil {
fmt.Printf("xymon write error: ", err)
log.Printf("xymon write error: ", err)
return err
}
// fmt.Printf("written: %d\n", tot)
return nil
}
*/