Compare commits
No commits in common. "master" and "v2.0.0" have entirely different histories.
26 changed files with 180 additions and 1241 deletions
|
@ -1,11 +0,0 @@
|
||||||
#FROM busybox:latest
|
|
||||||
FROM scratch
|
|
||||||
|
|
||||||
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
|
|
||||||
|
|
||||||
ARG VER
|
|
||||||
ENV VER ${VER:-0.0.0}
|
|
||||||
|
|
||||||
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
|
|
||||||
|
|
||||||
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]
|
|
|
@ -1,3 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .
|
|
|
@ -1,16 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
docker run \
|
|
||||||
--rm \
|
|
||||||
-v /opt/llmongo/log:/data \
|
|
||||||
--name llmongo_tiscali \
|
|
||||||
--log-opt max-size=2m \
|
|
||||||
--log-opt max-file=5 \
|
|
||||||
mikif70/llmongo:4.2.1 \
|
|
||||||
-l /data/llmongo.log \
|
|
||||||
-r redis-ll.mail.tiscali.sys:6379 \
|
|
||||||
-m 10.39.80.189:27017 \
|
|
||||||
-d lastlogin \
|
|
||||||
-i enginedb1@10.39.109.107:8086 \
|
|
||||||
-T 60s \
|
|
||||||
$@
|
|
80
backup.sh
80
backup.sh
|
@ -1,80 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
MONGODB="secondary.mongod.service.mail:27017"
|
|
||||||
DATAPATH="/mnt/mongobck/NEW_EXPORT"
|
|
||||||
|
|
||||||
NAME="mtools"
|
|
||||||
DB="lastlogin"
|
|
||||||
|
|
||||||
opt()
|
|
||||||
{
|
|
||||||
case "$1" in
|
|
||||||
-d)
|
|
||||||
shift
|
|
||||||
DATE=$1
|
|
||||||
shift
|
|
||||||
opt $*
|
|
||||||
;;
|
|
||||||
-n)
|
|
||||||
shift
|
|
||||||
NAME=$1
|
|
||||||
shift
|
|
||||||
opt $*
|
|
||||||
;;
|
|
||||||
-db)
|
|
||||||
shift
|
|
||||||
DB=$1
|
|
||||||
shift
|
|
||||||
opt $*
|
|
||||||
;;
|
|
||||||
-pwd)
|
|
||||||
shift
|
|
||||||
DATAPATH=${PWD}
|
|
||||||
shift
|
|
||||||
opt $*
|
|
||||||
;;
|
|
||||||
esac
|
|
||||||
}
|
|
||||||
|
|
||||||
echo "Running... $(date)"
|
|
||||||
|
|
||||||
if [ -n "$1" ]
|
|
||||||
then
|
|
||||||
opt $*
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Opts: DB-$DB DATE-$DATE"
|
|
||||||
|
|
||||||
if [ -z "${DATE}" ]
|
|
||||||
then
|
|
||||||
DATE=$(date -d "yesterday" +%Y%m%d)
|
|
||||||
fi
|
|
||||||
|
|
||||||
YDAY=$(date -d "${DATE}" +%Y-%m-%d)
|
|
||||||
TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d)
|
|
||||||
QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0
|
|
||||||
0:00:00Z\")}}"
|
|
||||||
|
|
||||||
FNAME=$(date -d "${DATE}" +%y%m%d)
|
|
||||||
CNAME=$(date -d "${DATE}" +%y%m)
|
|
||||||
|
|
||||||
echo "Query: ${QUERY}"
|
|
||||||
|
|
||||||
if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then
|
|
||||||
mkdir -p ${DATAPATH}/${DB}/${CNAME}
|
|
||||||
fi
|
|
||||||
|
|
||||||
/usr/bin/docker run \
|
|
||||||
--rm \
|
|
||||||
-h mtools \
|
|
||||||
--name ${NAME} \
|
|
||||||
-v ${DATAPATH}:/data \
|
|
||||||
mikif70/mongotools:3.4.5 mongoexport \
|
|
||||||
--host "${MONGODB}" \
|
|
||||||
--db $DB \
|
|
||||||
--collection "ll_${FNAME}" \
|
|
||||||
--type "csv" \
|
|
||||||
--fields "_id,user,protocol,ip,date,insert,country" \
|
|
||||||
--query "${QUERY}" \
|
|
||||||
--readPreference "secondary" \
|
|
||||||
--out "/data/${DB}/${CNAME}/ll_${FNAME}.csv"
|
|
5
build.sh
5
build.sh
|
@ -1,5 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
|
|
||||||
|
|
||||||
mv lastlogin_mongodb-$(git describe --tags) Docker/
|
|
44
consul.go
44
consul.go
|
@ -1,44 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Consul client structure
|
|
||||||
type Consul struct {
|
|
||||||
RedisTTL time.Duration
|
|
||||||
Timeout time.Duration
|
|
||||||
MaxError int
|
|
||||||
Influxdb string
|
|
||||||
Month string
|
|
||||||
Retention float64
|
|
||||||
MongoURI string
|
|
||||||
Database string
|
|
||||||
RedisURI string
|
|
||||||
Client *api.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewClient New consul client
|
|
||||||
func NewClient() *Consul {
|
|
||||||
client, err := api.NewClient(api.DefaultConfig())
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Consul error: %+v\n", err)
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("Consul error: %+v\n", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
consul := &Consul{
|
|
||||||
Client: client,
|
|
||||||
RedisTTL: opts.RedisTTL,
|
|
||||||
Timeout: opts.Timeout,
|
|
||||||
MaxError: opts.MaxError,
|
|
||||||
Influxdb: opts.Influxdb,
|
|
||||||
}
|
|
||||||
|
|
||||||
return consul
|
|
||||||
}
|
|
229
consumer.go
229
consumer.go
|
@ -2,187 +2,90 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
|
||||||
"crypto/sha1"
|
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/garyburd/redigo/redis"
|
||||||
"log"
|
"log"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/globalsign/mgo"
|
|
||||||
// "gopkg.in/mgo.v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type consumed struct {
|
|
||||||
user string
|
|
||||||
error bool
|
|
||||||
logins []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func hash256(val []byte) string {
|
|
||||||
|
|
||||||
h := sha256.New()
|
|
||||||
h.Write(val)
|
|
||||||
|
|
||||||
return hex.EncodeToString(h.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
func hash160(val []byte) string {
|
|
||||||
|
|
||||||
h := sha1.New()
|
|
||||||
h.Write(val)
|
|
||||||
|
|
||||||
return hex.EncodeToString(h.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
func hash128(val []byte) string {
|
|
||||||
|
|
||||||
h := md5.New()
|
|
||||||
h.Write(val)
|
|
||||||
|
|
||||||
return hex.EncodeToString(h.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
// protocol:timestamp:ip:country
|
|
||||||
|
|
||||||
func consumer() {
|
func consumer() {
|
||||||
|
var date int64
|
||||||
|
var lastval string
|
||||||
|
var conn = dbs.rdb.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
user := <-msgs
|
||||||
prod := <-consume
|
// Estrae l'ultimo login dell'utente 'user'
|
||||||
|
val, err := redis.String(conn.Do("LINDEX", user, "-1"))
|
||||||
start := time.Now()
|
if err != nil {
|
||||||
|
if opts.Debug {
|
||||||
status = _Consumer
|
log.Printf("LINDEX error: %+v - %s\n\r", err, val)
|
||||||
|
fmt.Printf("LINDEX error: %+v - %s\n\r", err, val)
|
||||||
var bulk = make(map[string][]*mgo.Bulk)
|
|
||||||
var allLogins = make(map[string]MongoLogin)
|
|
||||||
|
|
||||||
for i := range prod.logins {
|
|
||||||
login := prod.logins[i]
|
|
||||||
// se la riga di login e' vuota
|
|
||||||
if login == "" {
|
|
||||||
log.Println("Login empty: ", prod.user)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
sval := strings.Split(login, ":")
|
// se ha trovato user e righe di login
|
||||||
// se il formato della riga di login non e' corretto
|
if lastval != "" {
|
||||||
if sval[1] == "" {
|
// reinserisce l'ultimo login e imposta il ttl su Redis
|
||||||
log.Println("Login format error: ", login, prod.user)
|
retval, _ := conn.Do("lpush", user, lastval)
|
||||||
continue
|
ttl, _ := conn.Do("expire", user, opts.RedisTTL.Seconds())
|
||||||
}
|
|
||||||
// se il timestamp della riga di login non e' corretto
|
|
||||||
date, err := strconv.ParseInt(sval[1], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
|
|
||||||
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
|
|
||||||
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
|
|
||||||
if opts.Debug {
|
if opts.Debug {
|
||||||
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
|
log.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
|
||||||
}
|
fmt.Println("LPUSH retval: ", retval, ttl, user, lastval, opts.RedisTTL.Seconds())
|
||||||
|
|
||||||
if !opts.Test {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// break
|
||||||
// verifica se esiste la country
|
continue
|
||||||
if len(sval) <= 3 {
|
|
||||||
sval = append(sval, "NONE")
|
|
||||||
}
|
|
||||||
|
|
||||||
// genera l' _ID con user + timestamp + ip
|
|
||||||
mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
|
|
||||||
ml := MongoLogin{
|
|
||||||
ID: mlID,
|
|
||||||
User: prod.user,
|
|
||||||
Protocol: sval[0],
|
|
||||||
IP: sval[2],
|
|
||||||
Date: time.Unix(date, 0),
|
|
||||||
Insert: time.Now(),
|
|
||||||
Country: sval[3],
|
|
||||||
}
|
|
||||||
|
|
||||||
allLogins[mlID] = ml
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
// se la riga di login e' vuota
|
||||||
for _, val := range allLogins {
|
if val == "" {
|
||||||
dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay
|
log.Println("Login empty: ", user)
|
||||||
if _, ok := bulk[dt]; !ok {
|
retval, _ := conn.Do("lrem", user, "-1", val)
|
||||||
for j := range dbs.mdb {
|
log.Println("LREM retval: ", user, val, retval)
|
||||||
b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk()
|
continue
|
||||||
b.Unordered()
|
}
|
||||||
bulk[dt] = append(bulk[dt], b)
|
sval := strings.Split(val, ":")
|
||||||
}
|
// se il formato della riga di login non e' corretto
|
||||||
}
|
if sval[1] == "" {
|
||||||
for _, bl := range bulk[dt] {
|
log.Println("Login format error: ", val, user)
|
||||||
bl.Insert(val)
|
retval, _ := conn.Do("lrem", user, "-1", val)
|
||||||
|
log.Println("LREM retval: ", user, val, retval)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// se il timestamp della riga di login non e' corretto
|
||||||
|
date, err = strconv.ParseInt(sval[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Date Error: %+v - %s\n", err, user)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ml := MongoLogin{
|
||||||
|
User: user,
|
||||||
|
Protocol: sval[0],
|
||||||
|
Ip: sval[2],
|
||||||
|
Date: time.Unix(date, 0),
|
||||||
|
}
|
||||||
|
ind := Index{
|
||||||
|
User: user,
|
||||||
|
Date: time.Unix(date, 0),
|
||||||
|
}
|
||||||
|
// inserisce il login su Mongodb
|
||||||
|
count++
|
||||||
|
_, err = dbs.ll.Upsert(ind, ml)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Insert error: %+v\n", err)
|
||||||
|
// se l'errore non e' "duplicate key error" salta al prossimo senza cancellare niente
|
||||||
|
if !strings.Contains(err.Error(), "E11000") {
|
||||||
|
errCount += 1
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// cancella da Redis la riga di login inserita
|
||||||
for _, val := range bulk {
|
retval, err := conn.Do("lrem", user, "-1", val)
|
||||||
for j, bl := range val {
|
|
||||||
result, err := bl.Run()
|
|
||||||
if j == 0 {
|
|
||||||
if err != nil {
|
|
||||||
if !strings.Contains(err.Error(), "E11000") {
|
|
||||||
fmt.Printf("Err: %+v\n", err)
|
|
||||||
prod.err = true
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "err",
|
|
||||||
val: len(prod.logins),
|
|
||||||
}
|
|
||||||
if opts.Test || opts.Debug {
|
|
||||||
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "dup",
|
|
||||||
val: strings.Count(err.Error(), "E11000"),
|
|
||||||
}
|
|
||||||
if opts.Debug {
|
|
||||||
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if opts.Test {
|
|
||||||
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
|
|
||||||
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
|
|
||||||
}
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "ins",
|
|
||||||
val: len(allLogins),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Debug {
|
if opts.Debug {
|
||||||
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
|
log.Println("LREM retval: ", retval, user, val)
|
||||||
|
fmt.Println("LREM retval: ", retval, user, val)
|
||||||
}
|
}
|
||||||
|
lastval = val
|
||||||
if opts.Debug && len(prod.logins) > 10 {
|
|
||||||
fmt.Printf("LOGS: %+v\n", prod.logins)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !prod.err {
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "rem",
|
|
||||||
val: len(prod.logins),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wg.Done()
|
|
||||||
remove <- prod
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
180
counter.go
180
counter.go
|
@ -1,180 +0,0 @@
|
||||||
// counter
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Counterchan counter structure
|
|
||||||
type Counterchan struct {
|
|
||||||
tipo string
|
|
||||||
val int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Counter structure
|
|
||||||
type Counter struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
user int
|
|
||||||
log int
|
|
||||||
insert int
|
|
||||||
rem int
|
|
||||||
err int
|
|
||||||
dup int
|
|
||||||
time time.Duration
|
|
||||||
wg int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCounter iniitialized Counter structure
|
|
||||||
func NewCounter() *Counter {
|
|
||||||
return &Counter{
|
|
||||||
user: 0,
|
|
||||||
log: 0,
|
|
||||||
insert: 0,
|
|
||||||
err: 0,
|
|
||||||
rem: 0,
|
|
||||||
dup: 0,
|
|
||||||
time: 0,
|
|
||||||
wg: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run open channel to receive counter
|
|
||||||
func (c *Counter) Run() {
|
|
||||||
for {
|
|
||||||
tocount := <-counter
|
|
||||||
|
|
||||||
switch tocount.tipo {
|
|
||||||
case "user":
|
|
||||||
c.addUser()
|
|
||||||
case "dup":
|
|
||||||
c.addDuplicate(tocount.val)
|
|
||||||
case "ins":
|
|
||||||
c.addInsert(tocount.val)
|
|
||||||
case "log":
|
|
||||||
c.addLog(tocount.val)
|
|
||||||
case "rem":
|
|
||||||
c.addRem(tocount.val)
|
|
||||||
case "wg":
|
|
||||||
if tocount.val > 0 {
|
|
||||||
c.addWG()
|
|
||||||
} else {
|
|
||||||
c.delWG()
|
|
||||||
}
|
|
||||||
case "err":
|
|
||||||
c.addErr(tocount.val)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddUser increment number of users managed
|
|
||||||
func (c *Counter) addUser() {
|
|
||||||
c.user++
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddDuplicate increment number of duplicates log
|
|
||||||
func (c *Counter) addDuplicate(add int) {
|
|
||||||
c.dup += add
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddInsert increment number of inserted rows
|
|
||||||
func (c *Counter) addInsert(add int) {
|
|
||||||
c.insert += add
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddLog increment number of log's rows managed
|
|
||||||
func (c *Counter) addLog(add int) {
|
|
||||||
c.log += add
|
|
||||||
}
|
|
||||||
|
|
||||||
//AddRem increment removed logs row
|
|
||||||
func (c *Counter) addRem(add int) {
|
|
||||||
c.rem += add
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddWG ...
|
|
||||||
func (c *Counter) addWG() {
|
|
||||||
c.wg++
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddErr ...
|
|
||||||
func (c *Counter) addErr(add int) {
|
|
||||||
c.err += add
|
|
||||||
}
|
|
||||||
|
|
||||||
// DelWG ...
|
|
||||||
func (c *Counter) delWG() {
|
|
||||||
c.wg--
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetUser return total users
|
|
||||||
func (c *Counter) GetUser() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.user
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetDup return total duplicated logins
|
|
||||||
func (c *Counter) GetDup() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.dup
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLog return total log's rows
|
|
||||||
func (c *Counter) GetLog() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.log
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetInsert return total inserted rows
|
|
||||||
func (c *Counter) GetInsert() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.insert
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetErr return total errors
|
|
||||||
func (c *Counter) GetErr() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRem return total removed log's rows
|
|
||||||
func (c *Counter) GetRem() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.rem
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetWG ...
|
|
||||||
func (c *Counter) GetWG() (ret int) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.wg
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTime ...
|
|
||||||
func (c *Counter) GetTime() (ret float64) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
ret = c.time.Seconds()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetTime ...
|
|
||||||
func (c *Counter) SetTime(t time.Duration) {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
c.time = t
|
|
||||||
}
|
|
15
day_init.js
15
day_init.js
|
@ -1,15 +0,0 @@
|
||||||
// v1.2
|
|
||||||
var db = connect("192.168.0.1:27017/lastlogin");
|
|
||||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
|
||||||
var now = new Date();
|
|
||||||
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
|
|
||||||
print("creating index: ll_",col.slice(2));
|
|
||||||
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
|
|
||||||
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
|
|
||||||
var db = connect("192.168.0.1:27017/katamail");
|
|
||||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
|
||||||
var now = new Date();
|
|
||||||
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
|
|
||||||
print("creating index: ll_",col.slice(2));
|
|
||||||
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
|
|
||||||
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
|
|
91
dbs.go
91
dbs.go
|
@ -2,80 +2,55 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"github.com/garyburd/redigo/redis"
|
||||||
|
// "github.com/fzzy/radix/redis"
|
||||||
|
"gopkg.in/mgo.v2"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/garyburd/redigo/redis"
|
|
||||||
|
|
||||||
"github.com/globalsign/mgo"
|
|
||||||
// "gopkg.in/mgo.v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
dbs = Dbs{
|
dbs = Dbs{
|
||||||
RedisURI: "",
|
MongoUri: "mongodb://127.0.0.1:27018",
|
||||||
Database: "",
|
RedisUri: "127.0.0.1:6379",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// Dbs structure
|
|
||||||
type Dbs struct {
|
type Dbs struct {
|
||||||
MongoURI string
|
MongoUri string
|
||||||
Database string
|
RedisUri string
|
||||||
RedisURI string
|
|
||||||
rdb *redis.Pool //*redis.Client
|
rdb *redis.Pool //*redis.Client
|
||||||
mdb []*mgo.Session
|
mdb *mgo.Session
|
||||||
|
ll *mgo.Collection
|
||||||
|
// us *mgo.Collection
|
||||||
}
|
}
|
||||||
|
|
||||||
// MongoLogin structure
|
|
||||||
type MongoLogin struct {
|
type MongoLogin struct {
|
||||||
ID string `json:"_id" bson:"_id" gorethink:"id"`
|
User string `json:"user"`
|
||||||
User string `json:"user" bson:"user" gorethink:"user"`
|
Protocol string `json:"protocol"`
|
||||||
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
|
Ip string `json:"ip"`
|
||||||
IP string `json:"ip" bson:"ip" gorethink:"ip"`
|
Date time.Time `json:"date"`
|
||||||
Date time.Time `json:"date" bson:"date" gorethink:"date"`
|
|
||||||
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
|
|
||||||
Country string `json:"country" bson:"country" gorethink:"country"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ips structure
|
|
||||||
type Ips struct {
|
|
||||||
IP string `json:"ip"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// UserLogin structure
|
|
||||||
type UserLogin struct {
|
type UserLogin struct {
|
||||||
User string `json:"user"`
|
User string `json:"user"`
|
||||||
Date time.Time `json:"date"`
|
Date time.Time `json:"date"`
|
||||||
Lock bool `json:"lock"`
|
Lock bool `json:"lock"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index structure
|
|
||||||
type Index struct {
|
type Index struct {
|
||||||
User string `json:"user"`
|
User string `json:"user"`
|
||||||
Date time.Time `json:"date"`
|
Date time.Time `json:"date"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Dbs) isMongodb() bool {
|
|
||||||
if db.MongoURI != "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *Dbs) poolRedis() {
|
func (db *Dbs) poolRedis() {
|
||||||
|
|
||||||
dbs.rdb = &redis.Pool{
|
dbs.rdb = &redis.Pool{
|
||||||
MaxIdle: 128,
|
MaxIdle: 3,
|
||||||
MaxActive: 1000,
|
IdleTimeout: 240 * time.Second,
|
||||||
Wait: true,
|
|
||||||
IdleTimeout: 1 * time.Second,
|
|
||||||
Dial: func() (redis.Conn, error) {
|
Dial: func() (redis.Conn, error) {
|
||||||
c, err := redis.Dial("tcp", db.RedisURI)
|
c, err := redis.Dial("tcp", db.RedisUri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -89,18 +64,24 @@ func (db *Dbs) poolRedis() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Dbs) connectMongo() {
|
/*
|
||||||
|
func (db *Dbs) connectRedis() {
|
||||||
mongoList := strings.Split(db.MongoURI, "|")
|
var err error
|
||||||
|
db.rdb, err = redis.Dial("tcp", db.RedisUri)
|
||||||
for m := range mongoList {
|
if err != nil {
|
||||||
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m]))
|
log.Println("Redis connect Error: ", err.Error())
|
||||||
if err != nil {
|
os.Exit(-1)
|
||||||
log.Println("Mongodb connect Error: ", err.Error())
|
|
||||||
os.Exit(-3)
|
|
||||||
}
|
|
||||||
nm.SetSocketTimeout(5 * time.Second)
|
|
||||||
nm.SetSyncTimeout(5 * time.Second)
|
|
||||||
db.mdb = append(db.mdb, nm)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
func (db *Dbs) connectMongo() {
|
||||||
|
var err error
|
||||||
|
db.mdb, err = mgo.Dial(db.MongoUri)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Mongodb connect Error: ", err.Error())
|
||||||
|
os.Exit(-3)
|
||||||
|
}
|
||||||
|
db.ll = db.mdb.DB("dovecot").C("lastlogin")
|
||||||
|
// db.us = db.mdb.DB("dovecot").C("userlogin")
|
||||||
|
}
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
version: '2'
|
|
||||||
services:
|
|
||||||
mongodb:
|
|
||||||
image: mikif70/mongodb:3.6.9
|
|
||||||
ports:
|
|
||||||
- 27017:27017
|
|
||||||
container_name: ll_mongod
|
|
||||||
volumes:
|
|
||||||
- ./mongod:/data
|
|
||||||
depends_on:
|
|
||||||
- redis
|
|
||||||
redis:
|
|
||||||
image: "redis:alpine"
|
|
||||||
container_name: ll_redis
|
|
||||||
ports:
|
|
||||||
- 6379:6379
|
|
||||||
volumes:
|
|
||||||
- ./redis:/data
|
|
|
@ -1,9 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
if [ -z $1 ]; then
|
|
||||||
HOST=192.168.0.1:27017
|
|
||||||
else
|
|
||||||
HOST=${1}
|
|
||||||
fi
|
|
||||||
|
|
||||||
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}
|
|
|
@ -1,16 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
|
|
||||||
|
|
||||||
docker run \
|
|
||||||
--rm \
|
|
||||||
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
|
|
||||||
--name llmongo \
|
|
||||||
mikif70/llmongo:$(git -C .. describe --tags) \
|
|
||||||
-l /data/llmongo.log \
|
|
||||||
-r 192.168.0.1:6379 \
|
|
||||||
-m 192.168.0.1:27017 \
|
|
||||||
-d lastlogin \
|
|
||||||
-i test@10.39.253.206:8086 \
|
|
||||||
-T 80s \
|
|
||||||
$@
|
|
73
influxdb.go
73
influxdb.go
|
@ -1,73 +0,0 @@
|
||||||
// influxdb
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
influxdb "github.com/influxdata/influxdb/client/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
infdb string
|
|
||||||
infhost string
|
|
||||||
)
|
|
||||||
|
|
||||||
func writeStats(start time.Time) {
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
|
|
||||||
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
|
|
||||||
Addr: fmt.Sprintf("http://%s", infdb),
|
|
||||||
Timeout: 2 * time.Second,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("InfluxDB connect Error: %+v\n", err)
|
|
||||||
log.Printf("InfluxDB connect Error: %+v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer c.Close()
|
|
||||||
|
|
||||||
bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
|
|
||||||
Database: "dovecot",
|
|
||||||
Precision: "s",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("InfluxDB batch Error: %+v\n", err)
|
|
||||||
log.Printf("InfluxDB batch Error: %+v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tags := map[string]string{"server": infhost, "domain": dbs.Database}
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"user": count.GetUser(),
|
|
||||||
"log": count.GetLog(),
|
|
||||||
"err": count.GetErr(),
|
|
||||||
"rem": count.GetRem(),
|
|
||||||
"dup": count.GetDup(),
|
|
||||||
"stop": count.GetTime(),
|
|
||||||
}
|
|
||||||
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("InfluxDB point Error: %+v\n", err)
|
|
||||||
log.Printf("InfluxDB point Error: %+v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("InfluxDB pt: %+v\n", pt)
|
|
||||||
}
|
|
||||||
|
|
||||||
bp.AddPoint(pt)
|
|
||||||
|
|
||||||
// Write the batch
|
|
||||||
err = c.Write(bp)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("InfluxDB write Error: %+v\n", err)
|
|
||||||
log.Printf("InfluxDB write Error: %+v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,28 +0,0 @@
|
||||||
/opt/mongodb/mongod/log/*.log
|
|
||||||
{
|
|
||||||
daily
|
|
||||||
rotate 10
|
|
||||||
maxsize 10M
|
|
||||||
compress
|
|
||||||
delaycompress
|
|
||||||
missingok
|
|
||||||
notifempty
|
|
||||||
sharedscripts
|
|
||||||
copytruncate
|
|
||||||
postrotate
|
|
||||||
/usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true
|
|
||||||
endscript
|
|
||||||
}
|
|
||||||
|
|
||||||
/opt/llmongo/log/*.log
|
|
||||||
{
|
|
||||||
daily
|
|
||||||
maxsize 5M
|
|
||||||
rotate 8
|
|
||||||
compress
|
|
||||||
delaycompress
|
|
||||||
missingok
|
|
||||||
notifempty
|
|
||||||
sharedscripts
|
|
||||||
copytruncate
|
|
||||||
}
|
|
148
main.go
148
main.go
|
@ -6,67 +6,78 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"path"
|
||||||
"sync"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/nats-io/go-nats"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
RedisTTL time.Duration
|
||||||
|
CurrentPath string
|
||||||
|
Exe string
|
||||||
|
LogFile string
|
||||||
|
Timeout bool
|
||||||
|
Debug bool
|
||||||
|
Version bool
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
_Version = "v5.0.0b6"
|
_VERSION = "v2.0.0"
|
||||||
_Producer = 0
|
|
||||||
_Consumer = 1
|
|
||||||
_Remover = 2
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
loop bool
|
opts = Options{
|
||||||
|
RedisTTL: time.Hour * 11688, // 16 mesi
|
||||||
|
LogFile: "log/llmongo.log",
|
||||||
|
}
|
||||||
|
|
||||||
done chan bool
|
loop = true
|
||||||
consume chan loginsList
|
ttl = time.Second * 55
|
||||||
remove chan loginsList
|
done = make(chan bool)
|
||||||
|
msgs = make(chan string)
|
||||||
|
|
||||||
counter chan Counterchan
|
count = 0
|
||||||
|
errCount = 0
|
||||||
wg sync.WaitGroup
|
|
||||||
|
|
||||||
status int
|
|
||||||
|
|
||||||
count *Counter
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func usage() {
|
||||||
|
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <ttl> -l <logfile> -T -D -v\n")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
var err error
|
||||||
|
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
|
||||||
|
pid.PIDFile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
|
||||||
|
opts.Exe = path.Base(os.Args[0])
|
||||||
|
|
||||||
|
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
|
||||||
|
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
|
||||||
|
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
|
||||||
|
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis TTL")
|
||||||
|
flag.BoolVar(&opts.Version, "v", false, "Version")
|
||||||
|
flag.BoolVar(&opts.Timeout, "T", false, "Timeout")
|
||||||
|
flag.BoolVar(&opts.Debug, "D", false, "Debug")
|
||||||
|
}
|
||||||
|
|
||||||
|
func stopLoop() {
|
||||||
|
loop = false
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Usage = usage
|
flag.Usage = usage
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
if opts.Version {
|
if opts.Version {
|
||||||
fmt.Println(os.Args[0], _Version)
|
fmt.Println(os.Args[0], _VERSION)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
|
|
||||||
flag.Usage()
|
|
||||||
os.Exit(-1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Influxdb != "" {
|
|
||||||
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
|
|
||||||
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
|
|
||||||
if re.MatchString(opts.Influxdb) {
|
|
||||||
match := re.FindStringSubmatch(opts.Influxdb)
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("Influxdb match: %+v\n", match)
|
|
||||||
}
|
|
||||||
infhost = match[1]
|
|
||||||
infdb = match[2] + ":" + match[3]
|
|
||||||
} else {
|
|
||||||
opts.Influxdb = ""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
setTerm()
|
|
||||||
|
|
||||||
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
|
fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Log file error: ", err.Error())
|
fmt.Println("Log file error: ", err.Error())
|
||||||
|
@ -76,58 +87,27 @@ func main() {
|
||||||
|
|
||||||
log.SetOutput(fs)
|
log.SetOutput(fs)
|
||||||
|
|
||||||
// pid.PIDFile = opts.Pidfile
|
pid.Write(true)
|
||||||
// pid.Write(true)
|
defer pid.Remove()
|
||||||
// defer pid.Remove()
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
|
fmt.Printf("Start: %+v\n", opts)
|
||||||
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
|
log.Printf("Start: %+v\n", opts)
|
||||||
|
|
||||||
opts.Month = start.Format("0601")
|
|
||||||
|
|
||||||
dbs.poolRedis()
|
dbs.poolRedis()
|
||||||
defer dbs.rdb.Close()
|
defer dbs.rdb.Close()
|
||||||
|
|
||||||
dbs.connectMongo()
|
dbs.connectMongo()
|
||||||
for k := range dbs.mdb {
|
defer dbs.mdb.Close()
|
||||||
defer dbs.mdb[k].Close()
|
|
||||||
|
if opts.Timeout {
|
||||||
|
time.AfterFunc(ttl, stopLoop)
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.Timeout > 0 {
|
|
||||||
time.AfterFunc(opts.Timeout, exit)
|
|
||||||
}
|
|
||||||
|
|
||||||
count = NewCounter()
|
|
||||||
|
|
||||||
consume = make(chan loginsList, opts.Queue)
|
|
||||||
remove = make(chan loginsList, opts.Queue)
|
|
||||||
loop = true
|
|
||||||
done = make(chan bool)
|
|
||||||
counter = make(chan Counterchan)
|
|
||||||
|
|
||||||
go count.Run()
|
|
||||||
go producer()
|
go producer()
|
||||||
for i := 0; i < opts.Queue; i++ {
|
go consumer()
|
||||||
for j := 0; j < len(dbs.mdb); j++ {
|
|
||||||
go consumer()
|
|
||||||
}
|
|
||||||
go remover()
|
|
||||||
}
|
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
fmt.Printf("Done\n")
|
|
||||||
close(done)
|
|
||||||
|
|
||||||
fmt.Println("Waiting WG")
|
fmt.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount)
|
||||||
wg.Wait()
|
log.Printf("Stop %v - login: %d - errors: %d\n\r", time.Since(start), count, errCount)
|
||||||
|
|
||||||
count.SetTime(time.Since(start))
|
|
||||||
|
|
||||||
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
|
|
||||||
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
|
|
||||||
|
|
||||||
if opts.Influxdb != "" {
|
|
||||||
writeStats(start)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
15
main_test.go
15
main_test.go
|
@ -1,15 +0,0 @@
|
||||||
// Testmain.go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestOptions(t *testing.T) {
|
|
||||||
flag.Usage = usage
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
fmt.Printf("Config: %+v\n", opts)
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
|
|
||||||
################
|
|
||||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
|
||||||
|
|
||||||
for (var i=1; i<18; i++) {
|
|
||||||
var col = "ll_1806"+dd(i);
|
|
||||||
print(col);
|
|
||||||
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
|
|
||||||
}
|
|
||||||
|
|
||||||
###########
|
|
||||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
|
||||||
|
|
||||||
for (var m=1; m<=12; m++) {
|
|
||||||
for (var d=1; d<=31; d++) {
|
|
||||||
var col = "ll_15"+dd(m)+dd(d);
|
|
||||||
if (db.getCollection(col).exists != null) {
|
|
||||||
print(col);
|
|
||||||
print(db.getCollection(col).totalSize());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
####
|
|
||||||
var ar = db.getCollectionNames()
|
|
||||||
var len = ar.length
|
|
||||||
|
|
||||||
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
|
|
||||||
|
|
||||||
|
|
||||||
####
|
|
||||||
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})
|
|
||||||
|
|
85
options.go
85
options.go
|
@ -1,85 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"path/filepath"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Options structure
|
|
||||||
type Options struct {
|
|
||||||
RedisTTL time.Duration
|
|
||||||
CurrentPath string
|
|
||||||
Exe string
|
|
||||||
LogFile string
|
|
||||||
ConfigFile string
|
|
||||||
Timeout time.Duration
|
|
||||||
Debug bool
|
|
||||||
Test bool
|
|
||||||
Version bool
|
|
||||||
MaxError int
|
|
||||||
Influxdb string
|
|
||||||
Month string
|
|
||||||
Queue int
|
|
||||||
Retention float64
|
|
||||||
Consul string
|
|
||||||
Port int
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
opts = Options{
|
|
||||||
RedisTTL: time.Hour * 11688, // 16 mesi
|
|
||||||
LogFile: "log/llmongo.log",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func usage() {
|
|
||||||
fmt.Println(`Usage: llmongo
|
|
||||||
-m <mongodb [ip:port,ip:port]>
|
|
||||||
-d <mongodb database [dbname]>
|
|
||||||
-r <redisdb [ip:port]>
|
|
||||||
-t <redis keys ttl>
|
|
||||||
-l <logfile>
|
|
||||||
-T <running timeout>
|
|
||||||
-i <influxdb [localname@ip:port]>
|
|
||||||
-C <consul [ip:port]>
|
|
||||||
-P <check port> [port] ## used by consul to check services ##
|
|
||||||
-q <parallel consumer>
|
|
||||||
-R <retention>
|
|
||||||
-v <version>
|
|
||||||
-D <debug>
|
|
||||||
-DD <Test>`)
|
|
||||||
fmt.Println()
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
var err error
|
|
||||||
opts.CurrentPath, err = filepath.Abs(filepath.Dir(os.Args[0]))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
|
|
||||||
opts.Exe = path.Base(os.Args[0])
|
|
||||||
|
|
||||||
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
|
|
||||||
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
|
|
||||||
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
|
|
||||||
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
|
|
||||||
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
|
|
||||||
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
|
|
||||||
flag.IntVar(&opts.Port, "P", 10500, "service check port")
|
|
||||||
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
|
|
||||||
flag.BoolVar(&opts.Version, "v", false, "Version")
|
|
||||||
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
|
|
||||||
flag.BoolVar(&opts.Debug, "D", false, "Debug")
|
|
||||||
flag.BoolVar(&opts.Test, "DD", false, "Test")
|
|
||||||
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
|
|
||||||
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
|
|
||||||
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
|
|
||||||
}
|
|
10
pid.go
10
pid.go
|
@ -1,7 +1,6 @@
|
||||||
// pid
|
// pid
|
||||||
package main
|
package main
|
||||||
|
|
||||||
/*
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -17,7 +16,6 @@ var (
|
||||||
pid = PID{}
|
pid = PID{}
|
||||||
)
|
)
|
||||||
|
|
||||||
// PID structure
|
|
||||||
type PID struct {
|
type PID struct {
|
||||||
PID string
|
PID string
|
||||||
PIDFile string
|
PIDFile string
|
||||||
|
@ -44,8 +42,11 @@ func (p *PID) readCmd() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
cmd := bytes.Trim(bcmd, "\x00")
|
cmd := bytes.Trim(bcmd, "\x00")
|
||||||
if !strings.Contains(string(cmd), opts.Exe) {
|
if strings.Contains(string(cmd), opts.Exe) {
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
fmt.Printf("PID %s used by %s\n", pid, cmd)
|
fmt.Printf("PID %s used by %s\n", pid, cmd)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -77,11 +78,10 @@ func (p *PID) Write(l bool) {
|
||||||
fpid.Close()
|
fpid.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove cancella il PIDFile
|
// Cancella il PIDFile
|
||||||
func (p *PID) Remove() {
|
func (p *PID) Remove() {
|
||||||
err := os.Remove(p.PIDFile)
|
err := os.Remove(p.PIDFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("RM file error: ", err.Error())
|
fmt.Println("RM file error: ", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
76
producer.go
76
producer.go
|
@ -3,90 +3,30 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
// "strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/garyburd/redigo/redis"
|
"github.com/garyburd/redigo/redis"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type produced struct {
|
|
||||||
user string
|
|
||||||
logins []string
|
|
||||||
}
|
|
||||||
|
|
||||||
type loginsList struct {
|
|
||||||
user string
|
|
||||||
logins []string
|
|
||||||
err bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func producer() {
|
func producer() {
|
||||||
conn := dbs.rdb.Get()
|
conn := dbs.rdb.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
for loop {
|
for loop {
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
status = _Producer
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
// estrae un userid dalla lista degli utenti che hanno fatto login
|
// estrae un userid dalla lista degli utenti che hanno fatto login
|
||||||
user, err := redis.String(conn.Do("spop", "llindex"))
|
user, err := redis.String(conn.Do("spop", "llindex"))
|
||||||
|
if opts.Debug {
|
||||||
|
log.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err)
|
||||||
|
fmt.Printf("SPOP: %+v %+v - conn: %d\n\r", user, err)
|
||||||
|
}
|
||||||
// se non ci sono piu' userid esce
|
// se non ci sono piu' userid esce
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if opts.Debug {
|
if opts.Debug {
|
||||||
fmt.Printf("LLINDEX empty: %v\n", err)
|
log.Printf("LLINDEX empty: %v\n\r", err)
|
||||||
|
fmt.Printf("LLINDEX empty: %v\n\r", err)
|
||||||
}
|
}
|
||||||
log.Printf("LLINDEX empty: %v\n", err)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
msgs <- user
|
||||||
// estrae tutti i logins dell'utente "user"
|
|
||||||
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
|
|
||||||
if err != nil {
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
|
|
||||||
}
|
|
||||||
log.Printf("LRANGE: %+v - %+v\n", err, logs)
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(logs) > 0 {
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "user",
|
|
||||||
val: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("PROD: %+v\n", time.Since(start))
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Test {
|
|
||||||
log.Printf("PROD: %s - %d\n", user, len(logs))
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "wg",
|
|
||||||
val: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "log",
|
|
||||||
val: len(logs),
|
|
||||||
}
|
|
||||||
|
|
||||||
consume <- loginsList{
|
|
||||||
user: user,
|
|
||||||
logins: logs,
|
|
||||||
err: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
done <- true
|
done <- true
|
||||||
}
|
}
|
||||||
|
|
51
remover.go
51
remover.go
|
@ -1,51 +0,0 @@
|
||||||
// finalizer
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func remover() {
|
|
||||||
|
|
||||||
var conn = dbs.rdb.Get()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
for {
|
|
||||||
rem := <-remove
|
|
||||||
|
|
||||||
status = _Remover
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
if !rem.err {
|
|
||||||
for i := range rem.logins {
|
|
||||||
// cancella da Redis la riga di login inserita partendo da 1
|
|
||||||
conn.Send("lrem", rem.user, "1", rem.logins[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
|
|
||||||
if rem.err {
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("SADD: %s\n", rem.user)
|
|
||||||
}
|
|
||||||
conn.Send("sadd", "llindex", rem.user)
|
|
||||||
if count.GetErr() >= opts.MaxError {
|
|
||||||
exit()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
|
|
||||||
conn.Flush()
|
|
||||||
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
|
|
||||||
}
|
|
||||||
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "wg",
|
|
||||||
val: -1,
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
}
|
|
||||||
}
|
|
75
rethinkdb.go
75
rethinkdb.go
|
@ -1,75 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
// "strings"
|
|
||||||
|
|
||||||
rt "gopkg.in/dancannon/gorethink.v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Rethink struct {
|
|
||||||
rtdb *rt.Session
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRethinkDB(cluster []string) (*Rethink, error) {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
session *rt.Session
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(cluster) > 1 {
|
|
||||||
session, err = rt.Connect(rt.ConnectOpts{
|
|
||||||
Addresses: cluster,
|
|
||||||
InitialCap: 10,
|
|
||||||
MaxOpen: 10,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
session, err = rt.Connect(rt.ConnectOpts{
|
|
||||||
Address: cluster[0],
|
|
||||||
InitialCap: 10,
|
|
||||||
MaxOpen: 10,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("RethinkDB ERROR: %+v\n", err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Rethink{
|
|
||||||
rtdb: session,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
|
|
||||||
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("RTinsert: %+v\n", resp)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
|
|
||||||
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb)
|
|
||||||
//resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Printf("RTMulti: %+v\n", resp)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Rethink) Close() {
|
|
||||||
r.rtdb.Close()
|
|
||||||
}
|
|
||||||
*/
|
|
39
sigterm.go
39
sigterm.go
|
@ -1,39 +0,0 @@
|
||||||
// sigterm
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func kill() {
|
|
||||||
log.Printf("KILL %d\n", status)
|
|
||||||
fmt.Printf("KILL %d\n", status)
|
|
||||||
wg.Done()
|
|
||||||
counter <- Counterchan{
|
|
||||||
tipo: "wg",
|
|
||||||
val: -1,
|
|
||||||
}
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func exit() {
|
|
||||||
log.Printf("EXIT %d\n", status)
|
|
||||||
fmt.Printf("EXIT %d\n", status)
|
|
||||||
loop = false
|
|
||||||
time.AfterFunc(time.Second*20, kill)
|
|
||||||
}
|
|
||||||
|
|
||||||
func setTerm() {
|
|
||||||
c := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(c, os.Interrupt)
|
|
||||||
signal.Notify(c, syscall.SIGTERM)
|
|
||||||
go func() {
|
|
||||||
<-c
|
|
||||||
exit()
|
|
||||||
}()
|
|
||||||
}
|
|
|
@ -24,9 +24,4 @@
|
||||||
#
|
#
|
||||||
# m h dom mon dow command
|
# m h dom mon dow command
|
||||||
|
|
||||||
### lastlogin import
|
*/10 * * * * /opt/llmongo/start.sh > /dev/null
|
||||||
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
|
|
||||||
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
|
|
||||||
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
|
|
||||||
### lastlogin create indexes
|
|
||||||
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1
|
|
||||||
|
|
63
xymon.go
63
xymon.go
|
@ -1,63 +0,0 @@
|
||||||
// xymon.go
|
|
||||||
package main
|
|
||||||
|
|
||||||
/*
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func sendStatus() {
|
|
||||||
|
|
||||||
var msg string
|
|
||||||
|
|
||||||
host := strings.Replace(opts.Hostname, ".", ",", -1)
|
|
||||||
|
|
||||||
if count.GetErr() > 0 {
|
|
||||||
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "yellow", time.Now(), "errors", count.GetErr())
|
|
||||||
} else {
|
|
||||||
msg = fmt.Sprintf("status %s.llmongo %s %s \n\r%s: %d", host, "green", time.Now(), "logins", count.GetLog())
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Debug {
|
|
||||||
fmt.Println("Sending: ", msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
xymonSend([]byte(msg))
|
|
||||||
}
|
|
||||||
|
|
||||||
func xymonSend(msg []byte) error {
|
|
||||||
|
|
||||||
var server string
|
|
||||||
|
|
||||||
if strings.Contains(opts.Xymon, ":") {
|
|
||||||
server = opts.Xymon
|
|
||||||
} else {
|
|
||||||
server = opts.Xymon + ":1984"
|
|
||||||
}
|
|
||||||
|
|
||||||
cl, err := net.Dial("tcp", server)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("xymon connect error: ", err)
|
|
||||||
log.Printf("xymon connect error: ", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer cl.Close()
|
|
||||||
|
|
||||||
// fmt.Printf("xymon: %s - localhost: %s\n", cl.RemoteAddr(), cl.LocalAddr())
|
|
||||||
|
|
||||||
_, err = cl.Write(msg)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("xymon write error: ", err)
|
|
||||||
log.Printf("xymon write error: ", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// fmt.Printf("written: %d\n", tot)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
*/
|
|
Loading…
Add table
Add a link
Reference in a new issue