Compare commits

...

65 commits

Author SHA1 Message Date
abca7e21e7 influxdb write verbose 2018-12-12 14:57:08 +01:00
ee28cf2d89 influxdb verbose errors 2018-12-12 14:35:13 +01:00
5c45a79d12 split host@addr:port 2018-12-12 13:33:26 +01:00
f1f38601d5 influxdb hostname|ip 2018-12-12 13:21:39 +01:00
a035171edb testing new mgo driver 2018-12-10 16:25:40 +01:00
2d3d9e43d3 added 2018-11-27 15:11:59 +01:00
a2fc00df74 update 2018-11-27 15:10:19 +01:00
5bc0076ecf auto discovery (by consul) 2018-11-22 12:29:08 +01:00
58234ad80a v4.4.5 2018-11-07 16:50:53 +01:00
3e3e36f81e counter INS = len(AllLogins) 2018-11-07 16:47:38 +01:00
8e1b5033f1 counter INS = len(bulk) 2018-11-07 16:44:03 +01:00
d8137b631f debug insert b2 2018-10-25 17:26:59 +02:00
f56f51bb27 ins = len(logins) 2018-10-25 17:19:34 +02:00
927aac308f debug info (BulkResult) 2018-10-25 16:59:19 +02:00
90177973fe use BulkRun Results to count insert 2018-10-25 16:50:49 +02:00
1ec55417d1 count insert 2018-10-25 12:41:58 +02:00
254405b1fb in test importa anche i login piu' vecchi di 6 mesi 2018-10-24 15:19:02 +02:00
6572d95834 aggiunta l'opzione -R <retention> default = 15552000 (6 mesi) 2018-10-24 09:33:08 +02:00
e1f9da40e4 - build automatico con tag versione
- docker-compose per ambiente di test
2018-10-23 09:26:55 +02:00
a78109af0d aggiunto il controllo della data: se > 6mesi salta 2018-10-23 09:01:27 +02:00
18c1e2012a added expire index 2018-07-19 11:50:18 +02:00
c33e3065ec update 2018-07-18 13:29:44 +02:00
8cac688ebd Merge branch 'master' into nats 2018-06-12 17:49:20 +02:00
1535b27918 backup giornaliero 2018-06-12 15:27:04 +02:00
ee55ecd2f2 added backup script 2018-06-06 17:28:03 +02:00
806d5f0a6a init day collections 2018-06-05 12:22:00 +02:00
1e55b6e6f0 cambiato il nome della collections (inizia per lettere e non per numeri) 2018-06-04 12:44:06 +02:00
3ecbd95719 versione corretta 2018-05-29 15:48:29 +02:00
905d7ca936 crea una collection per giorno: YYMMDD_ll 2018-05-29 15:44:57 +02:00
102d0fc973 crea una nuova collection ogni giorno 2018-05-25 16:07:22 +02:00
26e25328b9 . 2017-09-21 18:34:00 +02:00
5a9f5f5008 implementazione con NATS ( start ) 2017-09-21 18:28:33 +02:00
7a52478c63 gestisce cluster di mongodb
aggiunte le funzioni di hash con SHA1 e MD5 (default)
2017-06-09 16:08:32 +02:00
6896b447c5 eliminata la gestione del PID non piu' necessaria con Docker 2017-04-07 10:04:26 +02:00
ae82c15f70 aggiornato il Dockerfile
aggiunti gli script per lanciarlo tramite container
2017-03-09 15:37:32 +01:00
b02fe14d57 . 2017-03-09 10:03:46 +01:00
bb2caec10f aggiunta l'opzione -d <database name>
verifica che siano valorizzati gli url dei db e il nome del database
2017-03-07 17:42:53 +01:00
92c78583ce aggiunta la country 2017-03-07 17:03:44 +01:00
c244dc8922 aggiornata la configurazione per influxdb 2017-03-06 12:43:24 +01:00
ca0605108a modificato msg di help 2017-03-03 16:59:18 +01:00
d5ffeacb9a legge tutti i login per utente 2017-02-21 11:23:38 +01:00
be694768fb v4.0.1 2017-02-13 10:37:00 +01:00
50b09f7050 modificato parametro nome DB (mongo) 2017-02-13 10:32:43 +01:00
58518c4b6a modificato il conteggio dei logins gestiti 2017-02-10 17:18:56 +01:00
8228f63fff aggiunta la possibilita' di scrivere su piu' mongodb 2017-02-10 16:56:09 +01:00
1b36fde70a crea l'ID con user+data+ip usando la data nel formato yymmggThh 2017-01-17 11:43:14 +01:00
8bff28e0dc ID => SHA256 2017-01-04 17:40:59 +01:00
36f6aa8a83 counter usa i 'chanel' 2016-12-20 15:37:32 +01:00
a3a02eac55 rethinkDB 2016-11-10 09:01:10 +01:00
94606006a8 aggiunto Dockerfile 2016-11-04 12:00:25 +01:00
b095dea871 aggiunta la gestione errori
dopo 20sec si killa se bloccato dopo il timeout
2016-11-04 11:50:27 +01:00
d7d68f4bf4 implementato bulk insert con _id generato 2016-11-03 17:42:37 +01:00
miki
c2a5e09323 eliminati tutti i warning 2016-10-05 11:55:50 +02:00
8e384215b7 compila staticamente 2016-07-11 09:55:43 +02:00
947b4dec64 . 2016-06-06 10:21:47 +02:00
86c835f02b add debug output 2016-06-01 16:59:43 +02:00
467b9d93ac manda a influxdb i tempi medi,max e min di ogni worker ( producer, consumer, remover) 2016-06-01 16:44:09 +02:00
0d018caa98 . 2016-05-27 12:11:51 +02:00
79f1ce80e3 aggiunta l'opzione per limitare il numero max di logins da gestire per utente 2016-05-27 12:04:12 +02:00
c4064e033e . 2016-05-16 16:22:03 +02:00
dbb57fd727 . 2016-05-16 16:17:53 +02:00
803780ff28 aggiunta l'opzione per il nome del pidfile 2016-05-16 16:12:16 +02:00
e968c82c21 aggiunto come opzione il nome del Database di destinazione 2016-05-16 16:03:23 +02:00
4a160cf55b aggiunge la data di inserimento 2016-05-13 13:28:04 +02:00
6271820f33 il log viene salvato nella collection del rispettivo mese/anno 2016-05-13 12:11:52 +02:00
25 changed files with 844 additions and 269 deletions

11
Docker/Dockerfile Normal file
View file

@ -0,0 +1,11 @@
#FROM busybox:latest
FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
ARG VER
ENV VER ${VER:-0.0.0}
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]

3
Docker/build.sh Executable file
View file

@ -0,0 +1,3 @@
#!/bin/bash
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .

16
Docker/run.sh Normal file
View file

@ -0,0 +1,16 @@
#!/bin/bash
docker run \
--rm \
-v /opt/llmongo/log:/data \
--name llmongo_tiscali \
--log-opt max-size=2m \
--log-opt max-file=5 \
mikif70/llmongo:4.2.1 \
-l /data/llmongo.log \
-r redis-ll.mail.tiscali.sys:6379 \
-m 10.39.80.189:27017 \
-d lastlogin \
-i enginedb1@10.39.109.107:8086 \
-T 60s \
$@

80
backup.sh Normal file
View file

@ -0,0 +1,80 @@
#!/bin/bash
MONGODB="secondary.mongod.service.mail:27017"
DATAPATH="/mnt/mongobck/NEW_EXPORT"
NAME="mtools"
DB="lastlogin"
opt()
{
case "$1" in
-d)
shift
DATE=$1
shift
opt $*
;;
-n)
shift
NAME=$1
shift
opt $*
;;
-db)
shift
DB=$1
shift
opt $*
;;
-pwd)
shift
DATAPATH=${PWD}
shift
opt $*
;;
esac
}
echo "Running... $(date)"
if [ -n "$1" ]
then
opt $*
fi
echo "Opts: DB-$DB DATE-$DATE"
if [ -z "${DATE}" ]
then
DATE=$(date -d "yesterday" +%Y%m%d)
fi
YDAY=$(date -d "${DATE}" +%Y-%m-%d)
TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d)
QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0
0:00:00Z\")}}"
FNAME=$(date -d "${DATE}" +%y%m%d)
CNAME=$(date -d "${DATE}" +%y%m)
echo "Query: ${QUERY}"
if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then
mkdir -p ${DATAPATH}/${DB}/${CNAME}
fi
/usr/bin/docker run \
--rm \
-h mtools \
--name ${NAME} \
-v ${DATAPATH}:/data \
mikif70/mongotools:3.4.5 mongoexport \
--host "${MONGODB}" \
--db $DB \
--collection "ll_${FNAME}" \
--type "csv" \
--fields "_id,user,protocol,ip,date,insert,country" \
--query "${QUERY}" \
--readPreference "secondary" \
--out "/data/${DB}/${CNAME}/ll_${FNAME}.csv"

5
build.sh Executable file
View file

@ -0,0 +1,5 @@
#!/bin/bash
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
mv lastlogin_mongodb-$(git describe --tags) Docker/

44
consul.go Normal file
View file

@ -0,0 +1,44 @@
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
// Consul client structure
type Consul struct {
RedisTTL time.Duration
Timeout time.Duration
MaxError int
Influxdb string
Month string
Retention float64
MongoURI string
Database string
RedisURI string
Client *api.Client
}
// NewClient New consul client
func NewClient() *Consul {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalf("Consul error: %+v\n", err)
if opts.Debug {
fmt.Printf("Consul error: %+v\n", err)
}
}
consul := &Consul{
Client: client,
RedisTTL: opts.RedisTTL,
Timeout: opts.Timeout,
MaxError: opts.MaxError,
Influxdb: opts.Influxdb,
}
return consul
}

View file

@ -2,13 +2,18 @@
package main
import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"fmt"
// "github.com/garyburd/redigo/redis"
// "gopkg.in/mgo.v2/bson"
"log"
"strconv"
"strings"
"time"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
type consumed struct {
@ -17,124 +22,167 @@ type consumed struct {
logins []string
}
func contains(s []Ips, e string) bool {
for _, a := range s {
if a.Ip == e {
return true
}
}
return false
func hash256(val []byte) string {
h := sha256.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func consumer(id int) {
func hash160(val []byte) string {
// var conn = dbs.rdb.Get()
// defer conn.Close()
h := sha1.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
func hash128(val []byte) string {
h := md5.New()
h.Write(val)
return hex.EncodeToString(h.Sum(nil))
}
// protocol:timestamp:ip:country
func consumer() {
for {
prod := <-consume[id]
// wg.Add(1)
// defer wg.Done()
cons := consumed{
user: prod.user,
logins: make([]string, 0),
error: false,
}
prod := <-consume
start := time.Now()
status = _Consumer
var bulk = make(map[string][]*mgo.Bulk)
var allLogins = make(map[string]MongoLogin)
for i := range prod.logins {
login := prod.logins[i]
// se la riga di login e' vuota
if login == "" {
log.Println("Login empty: ", prod.user)
cons.logins = append(cons.logins, login)
// retval, _ := conn.Do("lrem", user, "0", login)
// log.Println("LREM retval: ", user, login, retval)
// return
continue
}
sval := strings.Split(login, ":")
// se il formato della riga di login non e' corretto
if sval[1] == "" {
log.Println("Login format error: ", login, prod.user)
cons.logins = append(cons.logins, login)
// retval, _ := conn.Do("lrem", user, "0", login)
// log.Println("LREM retval: ", user, login, retval)
// return
continue
}
// se il timestamp della riga di login non e' corretto
date, err := strconv.ParseInt(sval[1], 10, 64)
if err != nil {
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
cons.logins = append(cons.logins, login)
// retval, _ := conn.Do("lrem", user, "0", login)
// log.Println("LREM retval: ", user, login, retval)
// return
continue
}
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
if opts.Debug {
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
}
if !opts.Test {
continue
}
}
// verifica se esiste la country
if len(sval) <= 3 {
sval = append(sval, "NONE")
}
// genera l' _ID con user + timestamp + ip
mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
ml := MongoLogin{
ID: mlID,
User: prod.user,
Protocol: sval[0],
Ip: sval[2],
IP: sval[2],
Date: time.Unix(date, 0),
Insert: time.Now(),
Country: sval[3],
}
// cerca se esiste gia' un documento con gli stessi User & Date
/*
docfind := []Ips{}
iter := dbs.ll.Find(bson.M{"user": prod.user, "date": time.Unix(date, 0)}).Select(bson.M{"ip": 1, "_id": 0}).Iter()
iter.All(&docfind)
if len(docfind) > 0 {
count.AddDuplicate()
if !contains(docfind, ml.Ip) {
fmt.Printf("Insert != IP for same date: user=%s - date=%s\n - newip=%s - oldip=%s\n", ml.User, ml.Date, docfind, ml.Ip)
// inserisce il login su Mongodb se gli IP sono !=
err := dbs.ll.Insert(ml)
if err != nil {
log.Printf("Insert error: %+v - %s\n", err, cons.user)
count.AddErr()
cons.error = true
allLogins[mlID] = ml
}
for _, val := range allLogins {
dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay
if _, ok := bulk[dt]; !ok {
for j := range dbs.mdb {
b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk()
b.Unordered()
bulk[dt] = append(bulk[dt], b)
}
}
for _, bl := range bulk[dt] {
bl.Insert(val)
}
}
for _, val := range bulk {
for j, bl := range val {
result, err := bl.Run()
if j == 0 {
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
fmt.Printf("Err: %+v\n", err)
prod.err = true
counter <- Counterchan{
tipo: "err",
val: len(prod.logins),
}
if opts.Test || opts.Debug {
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
}
continue
} else {
counter <- Counterchan{
tipo: "dup",
val: strings.Count(err.Error(), "E11000"),
}
if opts.Debug {
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins)
}
}
} else {
if opts.Test {
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
}
counter <- Counterchan{
tipo: "ins",
val: len(allLogins),
}
}
} else {
*/
// inserisce il login su Mongodb
err = dbs.ll.Insert(ml)
if err != nil {
log.Printf("Insert error: %+v - %s\n", err, cons.user)
count.AddErr()
cons.error = true
continue
}
}
// }
// iter.Close()
if opts.Debug {
log.Printf("%+v - %+v\n", ml)
}
// if i < (len(prod.logins) - 1) {
cons.logins = append(cons.logins, login)
// cancella da Redis la riga di login inserita
// retval, _ := conn.Do("lrem", user, "0", login)
// if opts.Debug {
// log.Println("LREM retval: ", retval, user, login)
// fmt.Println("LREM retval: ", retval, user, login)
// }
// }
}
count.AddLog(len(prod.logins))
if opts.Debug {
fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
// log.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
}
remove[id] <- cons
if opts.Debug && len(prod.logins) > 10 {
fmt.Printf("LOGS: %+v\n", prod.logins)
}
if !prod.err {
counter <- Counterchan{
tipo: "rem",
val: len(prod.logins),
}
}
// wg.Done()
remove <- prod
}
}

View file

@ -6,71 +6,109 @@ import (
"time"
)
type Counter struct {
mu sync.Mutex
user int
log int
rem int
err int
dup int
time time.Duration
wg []int
// Counterchan counter structure
type Counterchan struct {
tipo string
val int
}
// Counter structure
type Counter struct {
mu sync.Mutex
user int
log int
insert int
rem int
err int
dup int
time time.Duration
wg int
}
// NewCounter iniitialized Counter structure
func NewCounter() *Counter {
return &Counter{
user: 0,
log: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: make([]int, opts.Concurrent),
user: 0,
log: 0,
insert: 0,
err: 0,
rem: 0,
dup: 0,
time: 0,
wg: 0,
}
}
func (c *Counter) AddUser() {
c.mu.Lock()
defer c.mu.Unlock()
// Run open channel to receive counter
func (c *Counter) Run() {
for {
tocount := <-counter
switch tocount.tipo {
case "user":
c.addUser()
case "dup":
c.addDuplicate(tocount.val)
case "ins":
c.addInsert(tocount.val)
case "log":
c.addLog(tocount.val)
case "rem":
c.addRem(tocount.val)
case "wg":
if tocount.val > 0 {
c.addWG()
} else {
c.delWG()
}
case "err":
c.addErr(tocount.val)
}
}
}
// AddUser increment number of users managed
func (c *Counter) addUser() {
c.user++
}
func (c *Counter) AddDuplicate() {
c.mu.Lock()
defer c.mu.Unlock()
c.dup++
// AddDuplicate increment number of duplicates log
func (c *Counter) addDuplicate(add int) {
c.dup += add
}
func (c *Counter) AddLog(add int) {
c.mu.Lock()
defer c.mu.Unlock()
// AddInsert increment number of inserted rows
func (c *Counter) addInsert(add int) {
c.insert += add
}
// AddLog increment number of log's rows managed
func (c *Counter) addLog(add int) {
c.log += add
}
func (c *Counter) AddRem(add int) {
c.mu.Lock()
defer c.mu.Unlock()
//AddRem increment removed logs row
func (c *Counter) addRem(add int) {
c.rem += add
}
func (c *Counter) AddWG(id int) {
c.mu.Lock()
defer c.mu.Unlock()
c.wg[id]++
// AddWG ...
func (c *Counter) addWG() {
c.wg++
}
func (c *Counter) AddErr() {
c.mu.Lock()
defer c.mu.Unlock()
c.err++
// AddErr ...
func (c *Counter) addErr(add int) {
c.err += add
}
func (c *Counter) DelWG(id int) {
c.mu.Lock()
defer c.mu.Unlock()
c.wg[id]--
// DelWG ...
func (c *Counter) delWG() {
c.wg--
}
// GetUser return total users
func (c *Counter) GetUser() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
@ -78,6 +116,7 @@ func (c *Counter) GetUser() (ret int) {
return
}
// GetDup return total duplicated logins
func (c *Counter) GetDup() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
@ -85,6 +124,7 @@ func (c *Counter) GetDup() (ret int) {
return
}
// GetLog return total log's rows
func (c *Counter) GetLog() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
@ -92,6 +132,15 @@ func (c *Counter) GetLog() (ret int) {
return
}
// GetInsert return total inserted rows
func (c *Counter) GetInsert() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.insert
return
}
// GetErr return total errors
func (c *Counter) GetErr() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
@ -99,6 +148,7 @@ func (c *Counter) GetErr() (ret int) {
return
}
// GetRem return total removed log's rows
func (c *Counter) GetRem() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
@ -106,13 +156,15 @@ func (c *Counter) GetRem() (ret int) {
return
}
func (c *Counter) GetWG(id int) (ret int) {
// GetWG ...
func (c *Counter) GetWG() (ret int) {
c.mu.Lock()
defer c.mu.Unlock()
ret = c.wg[id]
ret = c.wg
return
}
// GetTime ...
func (c *Counter) GetTime() (ret float64) {
c.mu.Lock()
defer c.mu.Unlock()
@ -120,6 +172,7 @@ func (c *Counter) GetTime() (ret float64) {
return
}
// SetTime ...
func (c *Counter) SetTime(t time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

15
day_init.js Normal file
View file

@ -0,0 +1,15 @@
// v1.2
var db = connect("192.168.0.1:27017/lastlogin");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
var db = connect("192.168.0.1:27017/katamail");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})

80
dbs.go
View file

@ -2,53 +2,71 @@
package main
import (
"github.com/garyburd/redigo/redis"
// "github.com/fzzy/radix/redis"
"fmt"
"log"
"os"
"strings"
"time"
"gopkg.in/mgo.v2"
"github.com/garyburd/redigo/redis"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
var (
dbs = Dbs{
MongoUri: "mongodb://127.0.0.1:27018",
RedisUri: "127.0.0.1:6379",
RedisURI: "",
Database: "",
}
)
// Dbs structure
type Dbs struct {
MongoUri string
RedisUri string
MongoURI string
Database string
RedisURI string
rdb *redis.Pool //*redis.Client
mdb *mgo.Session
ll *mgo.Collection
// us *mgo.Collection
mdb []*mgo.Session
}
// MongoLogin structure
type MongoLogin struct {
User string `json:"user"`
Protocol string `json:"protocol"`
Ip string `json:"ip"`
Date time.Time `json:"date"`
ID string `json:"_id" bson:"_id" gorethink:"id"`
User string `json:"user" bson:"user" gorethink:"user"`
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
IP string `json:"ip" bson:"ip" gorethink:"ip"`
Date time.Time `json:"date" bson:"date" gorethink:"date"`
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
Country string `json:"country" bson:"country" gorethink:"country"`
}
// Ips structure
type Ips struct {
Ip string `json:"ip"`
IP string `json:"ip"`
}
// UserLogin structure
type UserLogin struct {
User string `json:"user"`
Date time.Time `json:"date"`
Lock bool `json:"lock"`
}
// Index structure
type Index struct {
User string `json:"user"`
Date time.Time `json:"date"`
}
func (db *Dbs) isMongodb() bool {
if db.MongoURI != "" {
return true
}
return false
}
func (db *Dbs) poolRedis() {
dbs.rdb = &redis.Pool{
@ -57,7 +75,7 @@ func (db *Dbs) poolRedis() {
Wait: true,
IdleTimeout: 1 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", db.RedisUri)
c, err := redis.Dial("tcp", db.RedisURI)
if err != nil {
return nil, err
}
@ -71,24 +89,18 @@ func (db *Dbs) poolRedis() {
}
/*
func (db *Dbs) connectRedis() {
var err error
db.rdb, err = redis.Dial("tcp", db.RedisUri)
if err != nil {
log.Println("Redis connect Error: ", err.Error())
os.Exit(-1)
}
}
*/
func (db *Dbs) connectMongo() {
var err error
db.mdb, err = mgo.Dial(db.MongoUri)
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
mongoList := strings.Split(db.MongoURI, "|")
for m := range mongoList {
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m]))
if err != nil {
log.Println("Mongodb connect Error: ", err.Error())
os.Exit(-3)
}
nm.SetSocketTimeout(5 * time.Second)
nm.SetSyncTimeout(5 * time.Second)
db.mdb = append(db.mdb, nm)
}
db.ll = db.mdb.DB("lastlogin").C("lastlogin_" + time.Now().Format("0601"))
// db.us = db.mdb.DB("dovecot").C("userlogin")
}

View file

@ -0,0 +1,18 @@
version: '2'
services:
mongodb:
image: mikif70/mongodb:3.6.9
ports:
- 27017:27017
container_name: ll_mongod
volumes:
- ./mongod:/data
depends_on:
- redis
redis:
image: "redis:alpine"
container_name: ll_redis
ports:
- 6379:6379
volumes:
- ./redis:/data

9
docker-compose/mongo.sh Executable file
View file

@ -0,0 +1,9 @@
#!/bin/bash
if [ -z $1 ]; then
HOST=192.168.0.1:27017
else
HOST=${1}
fi
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}

16
docker-compose/test.sh Executable file
View file

@ -0,0 +1,16 @@
#!/bin/bash
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
docker run \
--rm \
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
--name llmongo \
mikif70/llmongo:$(git -C .. describe --tags) \
-l /data/llmongo.log \
-r 192.168.0.1:6379 \
-m 192.168.0.1:27017 \
-d lastlogin \
-i test@10.39.253.206:8086 \
-T 80s \
$@

View file

@ -3,22 +3,30 @@ package main
import (
"fmt"
"log"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
)
var (
infdb string
infhost string
)
func writeStats(start time.Time) {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: opts.Influxdb,
Addr: fmt.Sprintf("http://%s", infdb),
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
fmt.Printf("InfluxDB connect Error: %+v\n", err)
log.Printf("InfluxDB connect Error: %+v\n", err)
return
}
defer c.Close()
@ -28,11 +36,12 @@ func writeStats(start time.Time) {
Precision: "s",
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
fmt.Printf("InfluxDB batch Error: %+v\n", err)
log.Printf("InfluxDB batch Error: %+v\n", err)
return
}
tags := map[string]string{"server": opts.Hostname}
tags := map[string]string{"server": infhost, "domain": dbs.Database}
fields := map[string]interface{}{
"user": count.GetUser(),
"log": count.GetLog(),
@ -43,12 +52,22 @@ func writeStats(start time.Time) {
}
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
if err != nil {
fmt.Printf("Error: %+v\n", err)
fmt.Printf("InfluxDB point Error: %+v\n", err)
log.Printf("InfluxDB point Error: %+v\n", err)
return
}
if opts.Debug {
fmt.Printf("InfluxDB pt: %+v\n", pt)
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
err = c.Write(bp)
if err != nil {
fmt.Printf("InfluxDB write Error: %+v\n", err)
log.Printf("InfluxDB write Error: %+v\n", err)
return
}
}

28
logrotate.cfg Normal file
View file

@ -0,0 +1,28 @@
/opt/mongodb/mongod/log/*.log
{
daily
rotate 10
maxsize 10M
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
postrotate
/usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true
endscript
}
/opt/llmongo/log/*.log
{
daily
maxsize 5M
rotate 8
compress
delaycompress
missingok
notifempty
sharedscripts
copytruncate
}

102
main.go
View file

@ -6,28 +6,33 @@ import (
"fmt"
"log"
"os"
"regexp"
"sync"
"time"
_ "github.com/nats-io/go-nats"
)
const (
_VERSION = "v2.6.1"
_Version = "v5.0.0b6"
_Producer = 0
_Consumer = 1
_Remover = 2
)
var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
loop bool
loop []bool
done chan bool
consume chan loginsList
remove chan loginsList
done []chan bool
consume []chan produced
remove []chan consumed
counter chan Counterchan
wg sync.WaitGroup
status int
count *Counter
)
@ -36,15 +41,27 @@ func main() {
flag.Parse()
if opts.Version {
fmt.Println(os.Args[0], _VERSION)
fmt.Println(os.Args[0], _Version)
os.Exit(0)
}
if opts.Hostname == "" {
var err error
opts.Hostname, err = os.Hostname()
if err != nil {
fmt.Println("Hostname error: ", err.Error())
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
flag.Usage()
os.Exit(-1)
}
if opts.Influxdb != "" {
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
if opts.Debug {
fmt.Printf("Influxdb match: %+v\n", match)
}
infhost = match[1]
infdb = match[2] + ":" + match[3]
} else {
opts.Influxdb = ""
}
}
@ -59,19 +76,23 @@ func main() {
log.SetOutput(fs)
// DA VERIFICARE
pid.Write(true)
defer pid.Remove()
// pid.PIDFile = opts.Pidfile
// pid.Write(true)
// defer pid.Remove()
start := time.Now()
fmt.Printf("Start: %+v\n", opts)
log.Printf("Start: %+v\n", opts)
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
opts.Month = start.Format("0601")
dbs.poolRedis()
defer dbs.rdb.Close()
dbs.connectMongo()
defer dbs.mdb.Close()
for k := range dbs.mdb {
defer dbs.mdb[k].Close()
}
if opts.Timeout > 0 {
time.AfterFunc(opts.Timeout, exit)
@ -79,39 +100,34 @@ func main() {
count = NewCounter()
for i := 0; i < opts.Concurrent; i++ {
consume = append(consume, make(chan produced))
remove = append(remove, make(chan consumed))
loop = append(loop, true)
done = append(done, make(chan bool))
consume = make(chan loginsList, opts.Queue)
remove = make(chan loginsList, opts.Queue)
loop = true
done = make(chan bool)
counter = make(chan Counterchan)
go producer(i)
go consumer(i)
go remover(i)
go count.Run()
go producer()
for i := 0; i < opts.Queue; i++ {
for j := 0; j < len(dbs.mdb); j++ {
go consumer()
}
go remover()
}
for i := 0; i < opts.Concurrent; i++ {
<-done[i]
fmt.Printf("Done %d\n", i)
close(done[i])
}
<-done
fmt.Printf("Done\n")
close(done)
fmt.Println("Waiting WG")
for i := 0; i < opts.Concurrent; i++ {
fmt.Printf("ID (%d): %d\n", i, count.GetWG(i))
}
wg.Wait()
count.SetTime(time.Since(start))
fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup())
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
if opts.Influxdb != "" {
writeStats(start)
}
if opts.Xymon != "" {
sendStatus()
}
}

34
mongo_scripts.txt Normal file
View file

@ -0,0 +1,34 @@
################
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var i=1; i<18; i++) {
var col = "ll_1806"+dd(i);
print(col);
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
}
###########
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var m=1; m<=12; m++) {
for (var d=1; d<=31; d++) {
var col = "ll_15"+dd(m)+dd(d);
if (db.getCollection(col).exists != null) {
print(col);
print(db.getCollection(col).totalSize());
}
}
}
####
var ar = db.getCollectionNames()
var len = ar.length
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
####
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})

View file

@ -1,11 +1,8 @@
// options
package main
import (
// "encoding/json"
"flag"
"fmt"
// "io/ioutil"
"log"
"os"
"path"
@ -13,6 +10,7 @@ import (
"time"
)
// Options structure
type Options struct {
RedisTTL time.Duration
CurrentPath string
@ -21,16 +19,41 @@ type Options struct {
ConfigFile string
Timeout time.Duration
Debug bool
Test bool
Version bool
Concurrent int
MaxError int
Xymon string
Influxdb string
Hostname string
Month string
Queue int
Retention float64
Consul string
Port int
}
var (
opts = Options{
RedisTTL: time.Hour * 11688, // 16 mesi
LogFile: "log/llmongo.log",
}
)
func usage() {
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -b <concurrent thread> -T <running ttl> -x <xymon server> -H <hostname> -i <influxdb uri> -v\n")
fmt.Println(`Usage: llmongo
-m <mongodb [ip:port,ip:port]>
-d <mongodb database [dbname]>
-r <redisdb [ip:port]>
-t <redis keys ttl>
-l <logfile>
-T <running timeout>
-i <influxdb [localname@ip:port]>
-C <consul [ip:port]>
-P <check port> [port] ## used by consul to check services ##
-q <parallel consumer>
-R <retention>
-v <version>
-D <debug>
-DD <Test>`)
fmt.Println()
os.Exit(0)
}
@ -42,19 +65,21 @@ func init() {
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.Xymon, "x", "", "xymon server")
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
flag.StringVar(&opts.Hostname, "H", "", "hostname")
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
flag.IntVar(&opts.Port, "P", 10500, "service check port")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
flag.BoolVar(&opts.Debug, "D", false, "Debug")
flag.IntVar(&opts.Concurrent, "c", 1, "Concurrent thread")
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
}

10
pid.go
View file

@ -1,6 +1,7 @@
// pid
package main
/*
import (
"bytes"
"fmt"
@ -16,6 +17,7 @@ var (
pid = PID{}
)
// PID structure
type PID struct {
PID string
PIDFile string
@ -42,11 +44,8 @@ func (p *PID) readCmd() bool {
return false
}
cmd := bytes.Trim(bcmd, "\x00")
if strings.Contains(string(cmd), opts.Exe) {
return true
} else {
if !strings.Contains(string(cmd), opts.Exe) {
fmt.Printf("PID %s used by %s\n", pid, cmd)
return true
}
return true
}
@ -78,10 +77,11 @@ func (p *PID) Write(l bool) {
fpid.Close()
}
// Cancella il PIDFile
// Remove cancella il PIDFile
func (p *PID) Remove() {
err := os.Remove(p.PIDFile)
if err != nil {
fmt.Println("RM file error: ", err.Error())
}
}
*/

View file

@ -3,9 +3,11 @@ package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"log"
// "strconv"
"time"
"github.com/garyburd/redigo/redis"
)
type produced struct {
@ -13,57 +15,78 @@ type produced struct {
logins []string
}
func producer(id int) {
type loginsList struct {
user string
logins []string
err bool
}
func producer() {
conn := dbs.rdb.Get()
defer conn.Close()
for loop[id] {
for loop {
wg.Wait()
status = _Producer
start := time.Now()
// estrae un userid dalla lista degli utenti che hanno fatto login
user, err := redis.String(conn.Do("spop", "llindex"))
// if opts.Debug {
// log.Printf("SPOP: %+v - %+v\n", user, err)
// fmt.Printf("SPOP: %+v - %+v\n", user, err)
// }
// se non ci sono piu' userid esce
if err != nil {
if opts.Debug {
fmt.Printf("LLINDEX empty: %v\n", err)
}
log.Printf("LLINDEX empty: %v\n", err)
//loop[id] = false
//done[id] <- true
break
}
// estrae tutti i login dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "1", "-1"))
// estrae tutti i logins dell'utente "user"
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
if err != nil {
if opts.Debug {
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
}
log.Printf("LRANGE: %+v - %+v\n", err, logs)
}
// if opts.Debug {
// fmt.Printf("LRANGE: %s - %d\n", user, len(logs))
// log.Printf("LRANGE: %s - %d\n", user, len(logs))
// }
if opts.Debug {
fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
// log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
}
count.AddUser()
wg.Add(1)
count.AddWG(id)
if len(logs) > 0 {
counter <- Counterchan{
tipo: "user",
val: 1,
}
consume[id] <- produced{
user: user,
logins: logs,
if opts.Debug {
fmt.Printf("PROD: %+v\n", time.Since(start))
}
if opts.Test {
log.Printf("PROD: %s - %d\n", user, len(logs))
}
wg.Add(1)
counter <- Counterchan{
tipo: "wg",
val: 1,
}
counter <- Counterchan{
tipo: "log",
val: len(logs),
}
consume <- loginsList{
user: user,
logins: logs,
err: false,
}
}
}
done[id] <- true
done <- true
}

View file

@ -3,28 +3,32 @@ package main
import (
"fmt"
// "log"
"time"
)
func remover(id int) {
func remover() {
var conn = dbs.rdb.Get()
defer conn.Close()
for {
rem := <-remove[id]
rem := <-remove
// wg.Add(1)
status = _Remover
start := time.Now()
for i := range rem.logins {
login := rem.logins[i]
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", login)
if !rem.err {
for i := range rem.logins {
// cancella da Redis la riga di login inserita partendo da 1
conn.Send("lrem", rem.user, "1", rem.logins[i])
}
}
if rem.error {
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
if rem.err {
if opts.Debug {
fmt.Printf("SADD (%d): %s\n", id, rem.user)
fmt.Printf("SADD: %s\n", rem.user)
}
conn.Send("sadd", "llindex", rem.user)
if count.GetErr() >= opts.MaxError {
@ -33,11 +37,15 @@ func remover(id int) {
}
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
conn.Flush()
count.AddRem(len(rem.logins))
if opts.Debug {
fmt.Printf("LREM (%d): %s - %d - %+v\n", id, rem.user, len(rem.logins), time.Since(start))
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
}
counter <- Counterchan{
tipo: "wg",
val: -1,
}
wg.Done()
count.DelWG(id)
}
}

75
rethinkdb.go Normal file
View file

@ -0,0 +1,75 @@
package main
/*
import (
"fmt"
"log"
// "strings"
rt "gopkg.in/dancannon/gorethink.v2"
)
type Rethink struct {
rtdb *rt.Session
}
func NewRethinkDB(cluster []string) (*Rethink, error) {
var (
err error
session *rt.Session
)
if len(cluster) > 1 {
session, err = rt.Connect(rt.ConnectOpts{
Addresses: cluster,
InitialCap: 10,
MaxOpen: 10,
})
} else {
session, err = rt.Connect(rt.ConnectOpts{
Address: cluster[0],
InitialCap: 10,
MaxOpen: 10,
})
}
if err != nil {
log.Printf("RethinkDB ERROR: %+v\n", err.Error())
return nil, err
}
return &Rethink{
rtdb: session,
}, nil
}
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTinsert: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb)
//resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
if opts.Debug {
fmt.Printf("RTMulti: %+v\n", resp)
}
if err != nil {
return resp, err
}
return resp, nil
}
func (r *Rethink) Close() {
r.rtdb.Close()
}
*/

View file

@ -7,15 +7,25 @@ import (
"os"
"os/signal"
"syscall"
"time"
)
func exit() {
for i := 0; i < opts.Concurrent; i++ {
log.Println("EXIT ", i)
fmt.Println("EXIT ", i)
loop[i] = false
// done[i] <- true
func kill() {
log.Printf("KILL %d\n", status)
fmt.Printf("KILL %d\n", status)
wg.Done()
counter <- Counterchan{
tipo: "wg",
val: -1,
}
done <- true
}
func exit() {
log.Printf("EXIT %d\n", status)
fmt.Printf("EXIT %d\n", status)
loop = false
time.AfterFunc(time.Second*20, kill)
}
func setTerm() {

View file

@ -24,4 +24,9 @@
#
# m h dom mon dow command
*/10 * * * * /opt/llmongo/start.sh > /dev/null
### lastlogin import
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
### lastlogin create indexes
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1

View file

@ -1,6 +1,7 @@
// xymon.go
package main
/*
import (
"fmt"
"log"
@ -59,3 +60,4 @@ func xymonSend(msg []byte) error {
return nil
}
*/