Compare commits
55 commits
Author | SHA1 | Date | |
---|---|---|---|
abca7e21e7 | |||
ee28cf2d89 | |||
5c45a79d12 | |||
f1f38601d5 | |||
a035171edb | |||
2d3d9e43d3 | |||
a2fc00df74 | |||
5bc0076ecf | |||
58234ad80a | |||
3e3e36f81e | |||
8e1b5033f1 | |||
d8137b631f | |||
f56f51bb27 | |||
927aac308f | |||
90177973fe | |||
1ec55417d1 | |||
254405b1fb | |||
6572d95834 | |||
e1f9da40e4 | |||
a78109af0d | |||
18c1e2012a | |||
c33e3065ec | |||
8cac688ebd | |||
1535b27918 | |||
ee55ecd2f2 | |||
806d5f0a6a | |||
1e55b6e6f0 | |||
3ecbd95719 | |||
905d7ca936 | |||
102d0fc973 | |||
26e25328b9 | |||
5a9f5f5008 | |||
7a52478c63 | |||
6896b447c5 | |||
ae82c15f70 | |||
b02fe14d57 | |||
bb2caec10f | |||
92c78583ce | |||
c244dc8922 | |||
ca0605108a | |||
d5ffeacb9a | |||
be694768fb | |||
50b09f7050 | |||
58518c4b6a | |||
8228f63fff | |||
1b36fde70a | |||
8bff28e0dc | |||
36f6aa8a83 | |||
a3a02eac55 | |||
94606006a8 | |||
b095dea871 | |||
d7d68f4bf4 | |||
![]() |
c2a5e09323 | ||
8e384215b7 | |||
947b4dec64 |
25 changed files with 835 additions and 354 deletions
11
Docker/Dockerfile
Normal file
11
Docker/Dockerfile
Normal file
|
@ -0,0 +1,11 @@
|
|||
#FROM busybox:latest
|
||||
FROM scratch
|
||||
|
||||
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
|
||||
|
||||
ARG VER
|
||||
ENV VER ${VER:-0.0.0}
|
||||
|
||||
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
|
||||
|
||||
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]
|
3
Docker/build.sh
Executable file
3
Docker/build.sh
Executable file
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
|
||||
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .
|
16
Docker/run.sh
Normal file
16
Docker/run.sh
Normal file
|
@ -0,0 +1,16 @@
|
|||
#!/bin/bash
|
||||
|
||||
docker run \
|
||||
--rm \
|
||||
-v /opt/llmongo/log:/data \
|
||||
--name llmongo_tiscali \
|
||||
--log-opt max-size=2m \
|
||||
--log-opt max-file=5 \
|
||||
mikif70/llmongo:4.2.1 \
|
||||
-l /data/llmongo.log \
|
||||
-r redis-ll.mail.tiscali.sys:6379 \
|
||||
-m 10.39.80.189:27017 \
|
||||
-d lastlogin \
|
||||
-i enginedb1@10.39.109.107:8086 \
|
||||
-T 60s \
|
||||
$@
|
80
backup.sh
Normal file
80
backup.sh
Normal file
|
@ -0,0 +1,80 @@
|
|||
#!/bin/bash
|
||||
|
||||
MONGODB="secondary.mongod.service.mail:27017"
|
||||
DATAPATH="/mnt/mongobck/NEW_EXPORT"
|
||||
|
||||
NAME="mtools"
|
||||
DB="lastlogin"
|
||||
|
||||
opt()
|
||||
{
|
||||
case "$1" in
|
||||
-d)
|
||||
shift
|
||||
DATE=$1
|
||||
shift
|
||||
opt $*
|
||||
;;
|
||||
-n)
|
||||
shift
|
||||
NAME=$1
|
||||
shift
|
||||
opt $*
|
||||
;;
|
||||
-db)
|
||||
shift
|
||||
DB=$1
|
||||
shift
|
||||
opt $*
|
||||
;;
|
||||
-pwd)
|
||||
shift
|
||||
DATAPATH=${PWD}
|
||||
shift
|
||||
opt $*
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
echo "Running... $(date)"
|
||||
|
||||
if [ -n "$1" ]
|
||||
then
|
||||
opt $*
|
||||
fi
|
||||
|
||||
echo "Opts: DB-$DB DATE-$DATE"
|
||||
|
||||
if [ -z "${DATE}" ]
|
||||
then
|
||||
DATE=$(date -d "yesterday" +%Y%m%d)
|
||||
fi
|
||||
|
||||
YDAY=$(date -d "${DATE}" +%Y-%m-%d)
|
||||
TODAY=$(date -d "${DATE} + 1 day" +%Y-%m-%d)
|
||||
QUERY="{date: {\$gte: ISODate(\"${YDAY}T00:00:00Z\"), \$lt: ISODate(\"${TODAY}T0
|
||||
0:00:00Z\")}}"
|
||||
|
||||
FNAME=$(date -d "${DATE}" +%y%m%d)
|
||||
CNAME=$(date -d "${DATE}" +%y%m)
|
||||
|
||||
echo "Query: ${QUERY}"
|
||||
|
||||
if [ ! -d "${DATAPATH}/${DB}/${CNAME}" ]; then
|
||||
mkdir -p ${DATAPATH}/${DB}/${CNAME}
|
||||
fi
|
||||
|
||||
/usr/bin/docker run \
|
||||
--rm \
|
||||
-h mtools \
|
||||
--name ${NAME} \
|
||||
-v ${DATAPATH}:/data \
|
||||
mikif70/mongotools:3.4.5 mongoexport \
|
||||
--host "${MONGODB}" \
|
||||
--db $DB \
|
||||
--collection "ll_${FNAME}" \
|
||||
--type "csv" \
|
||||
--fields "_id,user,protocol,ip,date,insert,country" \
|
||||
--query "${QUERY}" \
|
||||
--readPreference "secondary" \
|
||||
--out "/data/${DB}/${CNAME}/ll_${FNAME}.csv"
|
5
build.sh
Executable file
5
build.sh
Executable file
|
@ -0,0 +1,5 @@
|
|||
#!/bin/bash
|
||||
|
||||
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
|
||||
|
||||
mv lastlogin_mongodb-$(git describe --tags) Docker/
|
44
consul.go
Normal file
44
consul.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// Consul client structure
|
||||
type Consul struct {
|
||||
RedisTTL time.Duration
|
||||
Timeout time.Duration
|
||||
MaxError int
|
||||
Influxdb string
|
||||
Month string
|
||||
Retention float64
|
||||
MongoURI string
|
||||
Database string
|
||||
RedisURI string
|
||||
Client *api.Client
|
||||
}
|
||||
|
||||
// NewClient New consul client
|
||||
func NewClient() *Consul {
|
||||
client, err := api.NewClient(api.DefaultConfig())
|
||||
if err != nil {
|
||||
log.Fatalf("Consul error: %+v\n", err)
|
||||
if opts.Debug {
|
||||
fmt.Printf("Consul error: %+v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
consul := &Consul{
|
||||
Client: client,
|
||||
RedisTTL: opts.RedisTTL,
|
||||
Timeout: opts.Timeout,
|
||||
MaxError: opts.MaxError,
|
||||
Influxdb: opts.Influxdb,
|
||||
}
|
||||
|
||||
return consul
|
||||
}
|
181
consumer.go
181
consumer.go
|
@ -2,116 +2,187 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
// "github.com/garyburd/redigo/redis"
|
||||
// "gopkg.in/mgo.v2/bson"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/globalsign/mgo"
|
||||
// "gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
type consumed struct {
|
||||
user string
|
||||
error bool
|
||||
logins []string
|
||||
empty bool
|
||||
}
|
||||
|
||||
func contains(s []Ips, e string) bool {
|
||||
for _, a := range s {
|
||||
if a.Ip == e {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
func hash256(val []byte) string {
|
||||
|
||||
h := sha256.New()
|
||||
h.Write(val)
|
||||
|
||||
return hex.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
func consumer(id int) {
|
||||
func hash160(val []byte) string {
|
||||
|
||||
h := sha1.New()
|
||||
h.Write(val)
|
||||
|
||||
return hex.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
func hash128(val []byte) string {
|
||||
|
||||
h := md5.New()
|
||||
h.Write(val)
|
||||
|
||||
return hex.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
// protocol:timestamp:ip:country
|
||||
|
||||
func consumer() {
|
||||
|
||||
for {
|
||||
|
||||
prod := <-consume[id]
|
||||
|
||||
cons := consumed{
|
||||
user: prod.user,
|
||||
logins: make([]string, 0),
|
||||
error: false,
|
||||
empty: true,
|
||||
}
|
||||
prod := <-consume
|
||||
|
||||
start := time.Now()
|
||||
|
||||
status = _Consumer
|
||||
|
||||
var bulk = make(map[string][]*mgo.Bulk)
|
||||
var allLogins = make(map[string]MongoLogin)
|
||||
|
||||
for i := range prod.logins {
|
||||
login := prod.logins[i]
|
||||
// se la riga di login e' vuota
|
||||
if login == "" {
|
||||
log.Println("Login empty: ", prod.user)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
sval := strings.Split(login, ":")
|
||||
// se il formato della riga di login non e' corretto
|
||||
if sval[1] == "" {
|
||||
log.Println("Login format error: ", login, prod.user)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
// se il timestamp della riga di login non e' corretto
|
||||
date, err := strconv.ParseInt(sval[1], 10, 64)
|
||||
if err != nil {
|
||||
log.Printf("Date Error: %+v - %s - %s\n", err, prod.user, login)
|
||||
cons.logins = append(cons.logins, login)
|
||||
continue
|
||||
}
|
||||
|
||||
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
|
||||
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
|
||||
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
|
||||
if opts.Debug {
|
||||
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
|
||||
}
|
||||
|
||||
if !opts.Test {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// verifica se esiste la country
|
||||
if len(sval) <= 3 {
|
||||
sval = append(sval, "NONE")
|
||||
}
|
||||
|
||||
// genera l' _ID con user + timestamp + ip
|
||||
mlID := hash160([]byte(fmt.Sprintf("%s%s%s", prod.user, time.Unix(date, 0).Format("20060102T15"), sval[2]))) // Format("20060102T150405")
|
||||
ml := MongoLogin{
|
||||
ID: mlID,
|
||||
User: prod.user,
|
||||
Protocol: sval[0],
|
||||
Ip: sval[2],
|
||||
IP: sval[2],
|
||||
Date: time.Unix(date, 0),
|
||||
Insert: time.Now(),
|
||||
Country: sval[3],
|
||||
}
|
||||
|
||||
if opts.Month != ml.Date.Format("0601") {
|
||||
lt := dbs.mdb.DB(dbs.Database).C(fmt.Sprintf("lastlogin_%s", ml.Date.Format("0601")))
|
||||
err = lt.Insert(ml)
|
||||
if err != nil {
|
||||
log.Printf("Insert error: %+v - %s - %s\n", err, cons.user, lt.FullName)
|
||||
count.AddErr()
|
||||
cons.error = true
|
||||
continue
|
||||
}
|
||||
if opts.Debug {
|
||||
log.Printf("%s - %+v\n", lt.FullName, ml)
|
||||
}
|
||||
} else {
|
||||
// inserisce il login su Mongodb
|
||||
err = dbs.ll.Insert(ml)
|
||||
if err != nil {
|
||||
log.Printf("Insert error: %+v - %s\n", err, cons.user)
|
||||
count.AddErr()
|
||||
cons.error = true
|
||||
continue
|
||||
}
|
||||
if opts.Debug {
|
||||
log.Printf("%+v\n", ml)
|
||||
}
|
||||
}
|
||||
allLogins[mlID] = ml
|
||||
|
||||
cons.logins = append(cons.logins, login)
|
||||
}
|
||||
|
||||
count.AddLog(len(prod.logins))
|
||||
for _, val := range allLogins {
|
||||
dt := fmt.Sprintf("ll_%s", val.Date.Format("060102")) // stdYear+stdZeroMonth+stdZeroDay
|
||||
if _, ok := bulk[dt]; !ok {
|
||||
for j := range dbs.mdb {
|
||||
b := dbs.mdb[j].DB(dbs.Database).C(dt).Bulk()
|
||||
b.Unordered()
|
||||
bulk[dt] = append(bulk[dt], b)
|
||||
}
|
||||
}
|
||||
for _, bl := range bulk[dt] {
|
||||
bl.Insert(val)
|
||||
}
|
||||
}
|
||||
|
||||
if opts.MaxLogins > -1 && len(prod.logins) < opts.MaxLogins {
|
||||
cons.empty = false
|
||||
for _, val := range bulk {
|
||||
for j, bl := range val {
|
||||
result, err := bl.Run()
|
||||
if j == 0 {
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "E11000") {
|
||||
fmt.Printf("Err: %+v\n", err)
|
||||
prod.err = true
|
||||
counter <- Counterchan{
|
||||
tipo: "err",
|
||||
val: len(prod.logins),
|
||||
}
|
||||
if opts.Test || opts.Debug {
|
||||
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
counter <- Counterchan{
|
||||
tipo: "dup",
|
||||
val: strings.Count(err.Error(), "E11000"),
|
||||
}
|
||||
if opts.Debug {
|
||||
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if opts.Test {
|
||||
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
|
||||
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
|
||||
}
|
||||
counter <- Counterchan{
|
||||
tipo: "ins",
|
||||
val: len(allLogins),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("CONS (%d): user=%s logins=%d in %v - active=%d\n", id, prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
|
||||
fmt.Printf("CONS: user=%s logins=%d in %v - active=%d\n", prod.user, len(prod.logins), time.Since(start), dbs.rdb.ActiveCount())
|
||||
}
|
||||
|
||||
remove[id] <- cons
|
||||
if opts.Debug && len(prod.logins) > 10 {
|
||||
fmt.Printf("LOGS: %+v\n", prod.logins)
|
||||
}
|
||||
|
||||
count.AddConsumerTime(time.Since(start))
|
||||
if !prod.err {
|
||||
counter <- Counterchan{
|
||||
tipo: "rem",
|
||||
val: len(prod.logins),
|
||||
}
|
||||
}
|
||||
|
||||
// wg.Done()
|
||||
remove <- prod
|
||||
}
|
||||
}
|
||||
|
|
232
counter.go
232
counter.go
|
@ -6,115 +6,109 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
type Counter struct {
|
||||
mu sync.Mutex
|
||||
user int
|
||||
log int
|
||||
rem int
|
||||
err int
|
||||
dup int
|
||||
time time.Duration
|
||||
wg []int
|
||||
tconsumer float64
|
||||
tremover float64
|
||||
tproducer float64
|
||||
maxconsumer float64
|
||||
maxremover float64
|
||||
minconsumer float64
|
||||
minremover float64
|
||||
// Counterchan counter structure
|
||||
type Counterchan struct {
|
||||
tipo string
|
||||
val int
|
||||
}
|
||||
|
||||
// Counter structure
|
||||
type Counter struct {
|
||||
mu sync.Mutex
|
||||
user int
|
||||
log int
|
||||
insert int
|
||||
rem int
|
||||
err int
|
||||
dup int
|
||||
time time.Duration
|
||||
wg int
|
||||
}
|
||||
|
||||
// NewCounter iniitialized Counter structure
|
||||
func NewCounter() *Counter {
|
||||
return &Counter{
|
||||
user: 0,
|
||||
log: 0,
|
||||
err: 0,
|
||||
rem: 0,
|
||||
dup: 0,
|
||||
time: 0,
|
||||
maxconsumer: 0,
|
||||
maxremover: 0,
|
||||
minconsumer: 9999999999,
|
||||
minremover: 9999999999,
|
||||
tconsumer: 0,
|
||||
tremover: 0,
|
||||
tproducer: 0,
|
||||
wg: make([]int, opts.Concurrent),
|
||||
user: 0,
|
||||
log: 0,
|
||||
insert: 0,
|
||||
err: 0,
|
||||
rem: 0,
|
||||
dup: 0,
|
||||
time: 0,
|
||||
wg: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Counter) AddUser() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// Run open channel to receive counter
|
||||
func (c *Counter) Run() {
|
||||
for {
|
||||
tocount := <-counter
|
||||
|
||||
switch tocount.tipo {
|
||||
case "user":
|
||||
c.addUser()
|
||||
case "dup":
|
||||
c.addDuplicate(tocount.val)
|
||||
case "ins":
|
||||
c.addInsert(tocount.val)
|
||||
case "log":
|
||||
c.addLog(tocount.val)
|
||||
case "rem":
|
||||
c.addRem(tocount.val)
|
||||
case "wg":
|
||||
if tocount.val > 0 {
|
||||
c.addWG()
|
||||
} else {
|
||||
c.delWG()
|
||||
}
|
||||
case "err":
|
||||
c.addErr(tocount.val)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddUser increment number of users managed
|
||||
func (c *Counter) addUser() {
|
||||
c.user++
|
||||
}
|
||||
|
||||
func (c *Counter) AddDuplicate() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.dup++
|
||||
// AddDuplicate increment number of duplicates log
|
||||
func (c *Counter) addDuplicate(add int) {
|
||||
c.dup += add
|
||||
}
|
||||
|
||||
func (c *Counter) AddLog(add int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// AddInsert increment number of inserted rows
|
||||
func (c *Counter) addInsert(add int) {
|
||||
c.insert += add
|
||||
}
|
||||
|
||||
// AddLog increment number of log's rows managed
|
||||
func (c *Counter) addLog(add int) {
|
||||
c.log += add
|
||||
}
|
||||
|
||||
func (c *Counter) AddRem(add int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
//AddRem increment removed logs row
|
||||
func (c *Counter) addRem(add int) {
|
||||
c.rem += add
|
||||
}
|
||||
|
||||
func (c *Counter) AddWG(id int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.wg[id]++
|
||||
// AddWG ...
|
||||
func (c *Counter) addWG() {
|
||||
c.wg++
|
||||
}
|
||||
|
||||
func (c *Counter) AddErr() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.err++
|
||||
// AddErr ...
|
||||
func (c *Counter) addErr(add int) {
|
||||
c.err += add
|
||||
}
|
||||
|
||||
func (c *Counter) AddProducerTime(time time.Duration) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.tproducer = (c.tproducer + time.Seconds()) / 2.0
|
||||
}
|
||||
|
||||
func (c *Counter) AddConsumerTime(time time.Duration) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.tconsumer = (c.tconsumer + time.Seconds()) / 2.0
|
||||
if c.maxconsumer < time.Seconds() {
|
||||
c.maxconsumer = time.Seconds()
|
||||
}
|
||||
if c.minconsumer > time.Seconds() {
|
||||
c.minconsumer = time.Seconds()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Counter) AddRemoverTime(time time.Duration) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.tremover = (c.tremover + time.Seconds()) / 2.0
|
||||
if c.maxremover < time.Seconds() {
|
||||
c.maxremover = time.Seconds()
|
||||
}
|
||||
if c.minremover > time.Seconds() {
|
||||
c.minremover = time.Seconds()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Counter) DelWG(id int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.wg[id]--
|
||||
// DelWG ...
|
||||
func (c *Counter) delWG() {
|
||||
c.wg--
|
||||
}
|
||||
|
||||
// GetUser return total users
|
||||
func (c *Counter) GetUser() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -122,6 +116,7 @@ func (c *Counter) GetUser() (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
// GetDup return total duplicated logins
|
||||
func (c *Counter) GetDup() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -129,6 +124,7 @@ func (c *Counter) GetDup() (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
// GetLog return total log's rows
|
||||
func (c *Counter) GetLog() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -136,6 +132,15 @@ func (c *Counter) GetLog() (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
// GetInsert return total inserted rows
|
||||
func (c *Counter) GetInsert() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.insert
|
||||
return
|
||||
}
|
||||
|
||||
// GetErr return total errors
|
||||
func (c *Counter) GetErr() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -143,6 +148,7 @@ func (c *Counter) GetErr() (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
// GetRem return total removed log's rows
|
||||
func (c *Counter) GetRem() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -150,13 +156,15 @@ func (c *Counter) GetRem() (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetWG(id int) (ret int) {
|
||||
// GetWG ...
|
||||
func (c *Counter) GetWG() (ret int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.wg[id]
|
||||
ret = c.wg
|
||||
return
|
||||
}
|
||||
|
||||
// GetTime ...
|
||||
func (c *Counter) GetTime() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -164,55 +172,7 @@ func (c *Counter) GetTime() (ret float64) {
|
|||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetConsumerTime() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.tconsumer
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetRemoverTime() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.tremover
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetProducerTime() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.tproducer
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetMaxConsumer() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.maxconsumer
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetMinConsumer() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.minconsumer
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetMaxRemover() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.maxremover
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Counter) GetMinRemover() (ret float64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
ret = c.minremover
|
||||
return
|
||||
}
|
||||
|
||||
// SetTime ...
|
||||
func (c *Counter) SetTime(t time.Duration) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
|
15
day_init.js
Normal file
15
day_init.js
Normal file
|
@ -0,0 +1,15 @@
|
|||
// v1.2
|
||||
var db = connect("192.168.0.1:27017/lastlogin");
|
||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
||||
var now = new Date();
|
||||
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
|
||||
print("creating index: ll_",col.slice(2));
|
||||
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
|
||||
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
|
||||
var db = connect("192.168.0.1:27017/katamail");
|
||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
||||
var now = new Date();
|
||||
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
|
||||
print("creating index: ll_",col.slice(2));
|
||||
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
|
||||
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
|
69
dbs.go
69
dbs.go
|
@ -2,57 +2,71 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/garyburd/redigo/redis"
|
||||
// "github.com/fzzy/radix/redis"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
|
||||
"github.com/globalsign/mgo"
|
||||
// "gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
var (
|
||||
dbs = Dbs{
|
||||
MongoUri: "mongodb://127.0.0.1:27018",
|
||||
RedisUri: "127.0.0.1:6379",
|
||||
Database: "lastlogin",
|
||||
RedisURI: "",
|
||||
Database: "",
|
||||
}
|
||||
)
|
||||
|
||||
// Dbs structure
|
||||
type Dbs struct {
|
||||
MongoUri string
|
||||
MongoURI string
|
||||
Database string
|
||||
RedisUri string
|
||||
RedisURI string
|
||||
rdb *redis.Pool //*redis.Client
|
||||
mdb *mgo.Session
|
||||
ll *mgo.Collection
|
||||
// us *mgo.Collection
|
||||
mdb []*mgo.Session
|
||||
}
|
||||
|
||||
// MongoLogin structure
|
||||
type MongoLogin struct {
|
||||
User string `json:"user"`
|
||||
Protocol string `json:"protocol"`
|
||||
Ip string `json:"ip"`
|
||||
Date time.Time `json:"date"`
|
||||
Insert time.Time `json:"insert"`
|
||||
ID string `json:"_id" bson:"_id" gorethink:"id"`
|
||||
User string `json:"user" bson:"user" gorethink:"user"`
|
||||
Protocol string `json:"protocol" bson:"protocol" gorethink:"protocol"`
|
||||
IP string `json:"ip" bson:"ip" gorethink:"ip"`
|
||||
Date time.Time `json:"date" bson:"date" gorethink:"date"`
|
||||
Insert time.Time `json:"insert" bson:"insert" gorethink:"insert"`
|
||||
Country string `json:"country" bson:"country" gorethink:"country"`
|
||||
}
|
||||
|
||||
// Ips structure
|
||||
type Ips struct {
|
||||
Ip string `json:"ip"`
|
||||
IP string `json:"ip"`
|
||||
}
|
||||
|
||||
// UserLogin structure
|
||||
type UserLogin struct {
|
||||
User string `json:"user"`
|
||||
Date time.Time `json:"date"`
|
||||
Lock bool `json:"lock"`
|
||||
}
|
||||
|
||||
// Index structure
|
||||
type Index struct {
|
||||
User string `json:"user"`
|
||||
Date time.Time `json:"date"`
|
||||
}
|
||||
|
||||
func (db *Dbs) isMongodb() bool {
|
||||
if db.MongoURI != "" {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (db *Dbs) poolRedis() {
|
||||
|
||||
dbs.rdb = &redis.Pool{
|
||||
|
@ -61,7 +75,7 @@ func (db *Dbs) poolRedis() {
|
|||
Wait: true,
|
||||
IdleTimeout: 1 * time.Second,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
c, err := redis.Dial("tcp", db.RedisUri)
|
||||
c, err := redis.Dial("tcp", db.RedisURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -76,12 +90,17 @@ func (db *Dbs) poolRedis() {
|
|||
}
|
||||
|
||||
func (db *Dbs) connectMongo() {
|
||||
var err error
|
||||
db.mdb, err = mgo.Dial(db.MongoUri)
|
||||
if err != nil {
|
||||
log.Println("Mongodb connect Error: ", err.Error())
|
||||
os.Exit(-3)
|
||||
}
|
||||
|
||||
db.ll = db.mdb.DB(db.Database).C(fmt.Sprintf("lastlogin_%s", opts.Month))
|
||||
mongoList := strings.Split(db.MongoURI, "|")
|
||||
|
||||
for m := range mongoList {
|
||||
nm, err := mgo.Dial(fmt.Sprintf("mongodb://%s", mongoList[m]))
|
||||
if err != nil {
|
||||
log.Println("Mongodb connect Error: ", err.Error())
|
||||
os.Exit(-3)
|
||||
}
|
||||
nm.SetSocketTimeout(5 * time.Second)
|
||||
nm.SetSyncTimeout(5 * time.Second)
|
||||
db.mdb = append(db.mdb, nm)
|
||||
}
|
||||
}
|
||||
|
|
18
docker-compose/docker-compose.yml
Normal file
18
docker-compose/docker-compose.yml
Normal file
|
@ -0,0 +1,18 @@
|
|||
version: '2'
|
||||
services:
|
||||
mongodb:
|
||||
image: mikif70/mongodb:3.6.9
|
||||
ports:
|
||||
- 27017:27017
|
||||
container_name: ll_mongod
|
||||
volumes:
|
||||
- ./mongod:/data
|
||||
depends_on:
|
||||
- redis
|
||||
redis:
|
||||
image: "redis:alpine"
|
||||
container_name: ll_redis
|
||||
ports:
|
||||
- 6379:6379
|
||||
volumes:
|
||||
- ./redis:/data
|
9
docker-compose/mongo.sh
Executable file
9
docker-compose/mongo.sh
Executable file
|
@ -0,0 +1,9 @@
|
|||
#!/bin/bash
|
||||
|
||||
if [ -z $1 ]; then
|
||||
HOST=192.168.0.1:27017
|
||||
else
|
||||
HOST=${1}
|
||||
fi
|
||||
|
||||
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}
|
16
docker-compose/test.sh
Executable file
16
docker-compose/test.sh
Executable file
|
@ -0,0 +1,16 @@
|
|||
#!/bin/bash
|
||||
|
||||
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
|
||||
|
||||
docker run \
|
||||
--rm \
|
||||
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
|
||||
--name llmongo \
|
||||
mikif70/llmongo:$(git -C .. describe --tags) \
|
||||
-l /data/llmongo.log \
|
||||
-r 192.168.0.1:6379 \
|
||||
-m 192.168.0.1:27017 \
|
||||
-d lastlogin \
|
||||
-i test@10.39.253.206:8086 \
|
||||
-T 80s \
|
||||
$@
|
52
influxdb.go
52
influxdb.go
|
@ -3,22 +3,30 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
influxdb "github.com/influxdata/influxdb/client/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
infdb string
|
||||
infhost string
|
||||
)
|
||||
|
||||
func writeStats(start time.Time) {
|
||||
if opts.Debug {
|
||||
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
|
||||
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
|
||||
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
|
||||
}
|
||||
|
||||
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
|
||||
Addr: opts.Influxdb,
|
||||
Addr: fmt.Sprintf("http://%s", infdb),
|
||||
Timeout: 2 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
fmt.Printf("InfluxDB connect Error: %+v\n", err)
|
||||
log.Printf("InfluxDB connect Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
@ -28,34 +36,38 @@ func writeStats(start time.Time) {
|
|||
Precision: "s",
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
fmt.Printf("InfluxDB batch Error: %+v\n", err)
|
||||
log.Printf("InfluxDB batch Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
tags := map[string]string{"server": opts.Hostname, "domain": dbs.Database}
|
||||
tags := map[string]string{"server": infhost, "domain": dbs.Database}
|
||||
fields := map[string]interface{}{
|
||||
"user": count.GetUser(),
|
||||
"log": count.GetLog(),
|
||||
"err": count.GetErr(),
|
||||
"rem": count.GetRem(),
|
||||
"dup": count.GetDup(),
|
||||
"stop": count.GetTime(),
|
||||
"consumer": count.GetConsumerTime(),
|
||||
"producer": count.GetProducerTime(),
|
||||
"remover": count.GetRemoverTime(),
|
||||
"max_consumer": count.GetMaxConsumer(),
|
||||
"min_consumer": count.GetMinConsumer(),
|
||||
"max_remover": count.GetMaxRemover(),
|
||||
"min_remover": count.GetMinRemover(),
|
||||
"user": count.GetUser(),
|
||||
"log": count.GetLog(),
|
||||
"err": count.GetErr(),
|
||||
"rem": count.GetRem(),
|
||||
"dup": count.GetDup(),
|
||||
"stop": count.GetTime(),
|
||||
}
|
||||
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
fmt.Printf("InfluxDB point Error: %+v\n", err)
|
||||
log.Printf("InfluxDB point Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("InfluxDB pt: %+v\n", pt)
|
||||
}
|
||||
|
||||
bp.AddPoint(pt)
|
||||
|
||||
// Write the batch
|
||||
c.Write(bp)
|
||||
err = c.Write(bp)
|
||||
if err != nil {
|
||||
fmt.Printf("InfluxDB write Error: %+v\n", err)
|
||||
log.Printf("InfluxDB write Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
28
logrotate.cfg
Normal file
28
logrotate.cfg
Normal file
|
@ -0,0 +1,28 @@
|
|||
/opt/mongodb/mongod/log/*.log
|
||||
{
|
||||
daily
|
||||
rotate 10
|
||||
maxsize 10M
|
||||
compress
|
||||
delaycompress
|
||||
missingok
|
||||
notifempty
|
||||
sharedscripts
|
||||
copytruncate
|
||||
postrotate
|
||||
/usr/bin/docker kill -s SIGUSR1 mongod 2> /dev/null || true
|
||||
endscript
|
||||
}
|
||||
|
||||
/opt/llmongo/log/*.log
|
||||
{
|
||||
daily
|
||||
maxsize 5M
|
||||
rotate 8
|
||||
compress
|
||||
delaycompress
|
||||
missingok
|
||||
notifempty
|
||||
sharedscripts
|
||||
copytruncate
|
||||
}
|
101
main.go
101
main.go
|
@ -6,23 +6,33 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
const (
|
||||
_VERSION = "v2.8.1"
|
||||
_Version = "v5.0.0b6"
|
||||
_Producer = 0
|
||||
_Consumer = 1
|
||||
_Remover = 2
|
||||
)
|
||||
|
||||
var (
|
||||
loop []bool
|
||||
loop bool
|
||||
|
||||
done []chan bool
|
||||
consume []chan produced
|
||||
remove []chan consumed
|
||||
done chan bool
|
||||
consume chan loginsList
|
||||
remove chan loginsList
|
||||
|
||||
counter chan Counterchan
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
status int
|
||||
|
||||
count *Counter
|
||||
)
|
||||
|
||||
|
@ -31,15 +41,27 @@ func main() {
|
|||
flag.Parse()
|
||||
|
||||
if opts.Version {
|
||||
fmt.Println(os.Args[0], _VERSION)
|
||||
fmt.Println(os.Args[0], _Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if opts.Hostname == "" {
|
||||
var err error
|
||||
opts.Hostname, err = os.Hostname()
|
||||
if err != nil {
|
||||
fmt.Println("Hostname error: ", err.Error())
|
||||
if dbs.MongoURI == "" || dbs.RedisURI == "" || dbs.Database == "" {
|
||||
flag.Usage()
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
if opts.Influxdb != "" {
|
||||
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
|
||||
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
|
||||
if re.MatchString(opts.Influxdb) {
|
||||
match := re.FindStringSubmatch(opts.Influxdb)
|
||||
if opts.Debug {
|
||||
fmt.Printf("Influxdb match: %+v\n", match)
|
||||
}
|
||||
infhost = match[1]
|
||||
infdb = match[2] + ":" + match[3]
|
||||
} else {
|
||||
opts.Influxdb = ""
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -54,13 +76,13 @@ func main() {
|
|||
|
||||
log.SetOutput(fs)
|
||||
|
||||
pid.PIDFile = opts.Pidfile
|
||||
pid.Write(true)
|
||||
defer pid.Remove()
|
||||
// pid.PIDFile = opts.Pidfile
|
||||
// pid.Write(true)
|
||||
// defer pid.Remove()
|
||||
|
||||
start := time.Now()
|
||||
fmt.Printf("Start: %+v\n", opts)
|
||||
log.Printf("Start: %+v\n", opts)
|
||||
fmt.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
|
||||
log.Printf("Start:\t%+v\n\t%+v\n", opts, dbs)
|
||||
|
||||
opts.Month = start.Format("0601")
|
||||
|
||||
|
@ -68,7 +90,9 @@ func main() {
|
|||
defer dbs.rdb.Close()
|
||||
|
||||
dbs.connectMongo()
|
||||
defer dbs.mdb.Close()
|
||||
for k := range dbs.mdb {
|
||||
defer dbs.mdb[k].Close()
|
||||
}
|
||||
|
||||
if opts.Timeout > 0 {
|
||||
time.AfterFunc(opts.Timeout, exit)
|
||||
|
@ -76,43 +100,34 @@ func main() {
|
|||
|
||||
count = NewCounter()
|
||||
|
||||
for i := 0; i < opts.Concurrent; i++ {
|
||||
consume = append(consume, make(chan produced))
|
||||
remove = append(remove, make(chan consumed))
|
||||
loop = append(loop, true)
|
||||
done = append(done, make(chan bool))
|
||||
consume = make(chan loginsList, opts.Queue)
|
||||
remove = make(chan loginsList, opts.Queue)
|
||||
loop = true
|
||||
done = make(chan bool)
|
||||
counter = make(chan Counterchan)
|
||||
|
||||
go producer(i)
|
||||
go consumer(i)
|
||||
go remover(i)
|
||||
go count.Run()
|
||||
go producer()
|
||||
for i := 0; i < opts.Queue; i++ {
|
||||
for j := 0; j < len(dbs.mdb); j++ {
|
||||
go consumer()
|
||||
}
|
||||
go remover()
|
||||
}
|
||||
|
||||
for i := 0; i < opts.Concurrent; i++ {
|
||||
<-done[i]
|
||||
fmt.Printf("Done %d\n", i)
|
||||
close(done[i])
|
||||
}
|
||||
<-done
|
||||
fmt.Printf("Done\n")
|
||||
close(done)
|
||||
|
||||
fmt.Println("Waiting WG")
|
||||
for i := 0; i < opts.Concurrent; i++ {
|
||||
fmt.Printf("ID (%d): %d\n", i, count.GetWG(i))
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
count.SetTime(time.Since(start))
|
||||
|
||||
fmt.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup())
|
||||
log.Printf("Stop %v - user: %d - login: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetErr(), count.GetRem(), count.GetDup())
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("Time: \n\tConsumer: max: %.5f - min: %.5f - med: %.5f\n\tRemover: max: %.5f - min: %.5f - med: %.5f\n\tProducer: %.5f\n", count.GetMaxConsumer(), count.GetMinConsumer(), count.GetConsumerTime(), count.GetMaxRemover(), count.GetMinRemover(), count.GetRemoverTime(), count.GetProducerTime())
|
||||
}
|
||||
fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
|
||||
log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup())
|
||||
|
||||
if opts.Influxdb != "" {
|
||||
writeStats(start)
|
||||
}
|
||||
|
||||
if opts.Xymon != "" {
|
||||
sendStatus()
|
||||
}
|
||||
}
|
||||
|
|
34
mongo_scripts.txt
Normal file
34
mongo_scripts.txt
Normal file
|
@ -0,0 +1,34 @@
|
|||
|
||||
################
|
||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
||||
|
||||
for (var i=1; i<18; i++) {
|
||||
var col = "ll_1806"+dd(i);
|
||||
print(col);
|
||||
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
|
||||
}
|
||||
|
||||
###########
|
||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
||||
|
||||
for (var m=1; m<=12; m++) {
|
||||
for (var d=1; d<=31; d++) {
|
||||
var col = "ll_15"+dd(m)+dd(d);
|
||||
if (db.getCollection(col).exists != null) {
|
||||
print(col);
|
||||
print(db.getCollection(col).totalSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
####
|
||||
var ar = db.getCollectionNames()
|
||||
var len = ar.length
|
||||
|
||||
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
|
||||
|
||||
|
||||
####
|
||||
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})
|
||||
|
53
options.go
53
options.go
|
@ -1,11 +1,8 @@
|
|||
// options
|
||||
package main
|
||||
|
||||
import (
|
||||
// "encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
// "io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -13,6 +10,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// Options structure
|
||||
type Options struct {
|
||||
RedisTTL time.Duration
|
||||
CurrentPath string
|
||||
|
@ -20,28 +18,42 @@ type Options struct {
|
|||
LogFile string
|
||||
ConfigFile string
|
||||
Timeout time.Duration
|
||||
MaxLogins int
|
||||
Debug bool
|
||||
Test bool
|
||||
Version bool
|
||||
Concurrent int
|
||||
MaxError int
|
||||
Xymon string
|
||||
Influxdb string
|
||||
Hostname string
|
||||
Month string
|
||||
Pidfile string
|
||||
Queue int
|
||||
Retention float64
|
||||
Consul string
|
||||
Port int
|
||||
}
|
||||
|
||||
var (
|
||||
opts = Options{
|
||||
RedisTTL: time.Hour * 11688, // 16 mesi
|
||||
LogFile: "log/llmongo.log",
|
||||
MaxLogins: -1,
|
||||
RedisTTL: time.Hour * 11688, // 16 mesi
|
||||
LogFile: "log/llmongo.log",
|
||||
}
|
||||
)
|
||||
|
||||
func usage() {
|
||||
fmt.Println("Usage: llmongo -m <mongo uri> -r <redis uri> -t <redis keys ttl> -l <logfile> -b <concurrent thread> -T <running ttl> -x <xymon server> -H <hostname> -i <influxdb uri> -v\n")
|
||||
fmt.Println(`Usage: llmongo
|
||||
-m <mongodb [ip:port,ip:port]>
|
||||
-d <mongodb database [dbname]>
|
||||
-r <redisdb [ip:port]>
|
||||
-t <redis keys ttl>
|
||||
-l <logfile>
|
||||
-T <running timeout>
|
||||
-i <influxdb [localname@ip:port]>
|
||||
-C <consul [ip:port]>
|
||||
-P <check port> [port] ## used by consul to check services ##
|
||||
-q <parallel consumer>
|
||||
-R <retention>
|
||||
-v <version>
|
||||
-D <debug>
|
||||
-DD <Test>`)
|
||||
fmt.Println()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
|
@ -53,22 +65,21 @@ func init() {
|
|||
}
|
||||
|
||||
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
|
||||
opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
|
||||
opts.Exe = path.Base(os.Args[0])
|
||||
|
||||
flag.StringVar(&opts.Xymon, "x", "", "xymon server")
|
||||
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
|
||||
flag.StringVar(&opts.Hostname, "H", "", "hostname")
|
||||
flag.StringVar(&dbs.MongoUri, "m", dbs.MongoUri, "Mongodb")
|
||||
flag.StringVar(&dbs.Database, "d", dbs.Database, "Mongodb Database")
|
||||
flag.StringVar(&dbs.RedisUri, "r", dbs.RedisUri, "Redis")
|
||||
flag.StringVar(&dbs.MongoURI, "m", "", "Mongodb")
|
||||
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
|
||||
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
|
||||
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
|
||||
flag.IntVar(&opts.MaxLogins, "L", opts.MaxLogins, "Max lastlogins")
|
||||
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
|
||||
flag.IntVar(&opts.Port, "P", 10500, "service check port")
|
||||
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
|
||||
flag.BoolVar(&opts.Version, "v", false, "Version")
|
||||
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
|
||||
flag.BoolVar(&opts.Debug, "D", false, "Debug")
|
||||
flag.IntVar(&opts.Concurrent, "c", 1, "Concurrent thread")
|
||||
flag.BoolVar(&opts.Test, "DD", false, "Test")
|
||||
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
|
||||
flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")
|
||||
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
|
||||
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
|
||||
}
|
||||
|
|
10
pid.go
10
pid.go
|
@ -1,6 +1,7 @@
|
|||
// pid
|
||||
package main
|
||||
|
||||
/*
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
@ -16,6 +17,7 @@ var (
|
|||
pid = PID{}
|
||||
)
|
||||
|
||||
// PID structure
|
||||
type PID struct {
|
||||
PID string
|
||||
PIDFile string
|
||||
|
@ -42,11 +44,8 @@ func (p *PID) readCmd() bool {
|
|||
return false
|
||||
}
|
||||
cmd := bytes.Trim(bcmd, "\x00")
|
||||
if strings.Contains(string(cmd), opts.Exe) {
|
||||
return true
|
||||
} else {
|
||||
if !strings.Contains(string(cmd), opts.Exe) {
|
||||
fmt.Printf("PID %s used by %s\n", pid, cmd)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -78,10 +77,11 @@ func (p *PID) Write(l bool) {
|
|||
fpid.Close()
|
||||
}
|
||||
|
||||
// Cancella il PIDFile
|
||||
// Remove cancella il PIDFile
|
||||
func (p *PID) Remove() {
|
||||
err := os.Remove(p.PIDFile)
|
||||
if err != nil {
|
||||
fmt.Println("RM file error: ", err.Error())
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
71
producer.go
71
producer.go
|
@ -4,7 +4,7 @@ package main
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
// "strconv"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
|
@ -15,59 +15,78 @@ type produced struct {
|
|||
logins []string
|
||||
}
|
||||
|
||||
func producer(id int) {
|
||||
type loginsList struct {
|
||||
user string
|
||||
logins []string
|
||||
err bool
|
||||
}
|
||||
|
||||
func producer() {
|
||||
conn := dbs.rdb.Get()
|
||||
defer conn.Close()
|
||||
|
||||
for loop[id] {
|
||||
for loop {
|
||||
|
||||
wg.Wait()
|
||||
status = _Producer
|
||||
|
||||
start := time.Now()
|
||||
// estrae un userid dalla lista degli utenti che hanno fatto login
|
||||
user, err := redis.String(conn.Do("spop", "llindex"))
|
||||
// if opts.Debug {
|
||||
// log.Printf("SPOP: %+v - %+v\n", user, err)
|
||||
// fmt.Printf("SPOP: %+v - %+v\n", user, err)
|
||||
// }
|
||||
// se non ci sono piu' userid esce
|
||||
if err != nil {
|
||||
if opts.Debug {
|
||||
fmt.Printf("LLINDEX empty: %v\n", err)
|
||||
}
|
||||
log.Printf("LLINDEX empty: %v\n", err)
|
||||
//loop[id] = false
|
||||
//done[id] <- true
|
||||
break
|
||||
}
|
||||
|
||||
// estrae tutti i login dell'utente "user"
|
||||
logs, err := redis.Strings(conn.Do("lrange", user, "1", strconv.Itoa(opts.MaxLogins)))
|
||||
// estrae tutti i logins dell'utente "user"
|
||||
logs, err := redis.Strings(conn.Do("lrange", user, "0", "-1"))
|
||||
if err != nil {
|
||||
if opts.Debug {
|
||||
fmt.Printf("LRANGE: %+v - %+v\n", err, logs)
|
||||
}
|
||||
log.Printf("LRANGE: %+v - %+v\n", err, logs)
|
||||
}
|
||||
// if opts.Debug {
|
||||
// fmt.Printf("LRANGE: %s - %d\n", user, len(logs))
|
||||
// log.Printf("LRANGE: %s - %d\n", user, len(logs))
|
||||
// }
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
|
||||
// log.Printf("PROD (%d): user=%s login=%d in %v - conn=%d\n", id, user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
|
||||
fmt.Printf("PROD: user=%s login=%d in %v - conn=%d\n", user, len(logs), time.Since(start), dbs.rdb.ActiveCount())
|
||||
}
|
||||
|
||||
count.AddUser()
|
||||
wg.Add(1)
|
||||
count.AddWG(id)
|
||||
if len(logs) > 0 {
|
||||
counter <- Counterchan{
|
||||
tipo: "user",
|
||||
val: 1,
|
||||
}
|
||||
|
||||
consume[id] <- produced{
|
||||
user: user,
|
||||
logins: logs,
|
||||
if opts.Debug {
|
||||
fmt.Printf("PROD: %+v\n", time.Since(start))
|
||||
}
|
||||
|
||||
if opts.Test {
|
||||
log.Printf("PROD: %s - %d\n", user, len(logs))
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
counter <- Counterchan{
|
||||
tipo: "wg",
|
||||
val: 1,
|
||||
}
|
||||
|
||||
counter <- Counterchan{
|
||||
tipo: "log",
|
||||
val: len(logs),
|
||||
}
|
||||
|
||||
consume <- loginsList{
|
||||
user: user,
|
||||
logins: logs,
|
||||
err: false,
|
||||
}
|
||||
}
|
||||
|
||||
count.AddProducerTime(time.Since(start))
|
||||
}
|
||||
|
||||
done[id] <- true
|
||||
done <- true
|
||||
}
|
||||
|
|
35
remover.go
35
remover.go
|
@ -3,31 +3,32 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
// "log"
|
||||
|
||||
"time"
|
||||
)
|
||||
|
||||
func remover(id int) {
|
||||
func remover() {
|
||||
|
||||
var conn = dbs.rdb.Get()
|
||||
defer conn.Close()
|
||||
|
||||
for {
|
||||
rem := <-remove[id]
|
||||
rem := <-remove
|
||||
|
||||
// wg.Add(1)
|
||||
status = _Remover
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for i := range rem.logins {
|
||||
login := rem.logins[i]
|
||||
// cancella da Redis la riga di login inserita partendo da 1
|
||||
conn.Send("lrem", rem.user, "1", login)
|
||||
if !rem.err {
|
||||
for i := range rem.logins {
|
||||
// cancella da Redis la riga di login inserita partendo da 1
|
||||
conn.Send("lrem", rem.user, "1", rem.logins[i])
|
||||
}
|
||||
}
|
||||
|
||||
// se ci sono errori o non e' vuota la lista di logins reinserisce lo user
|
||||
if rem.error || !rem.empty {
|
||||
if rem.err {
|
||||
if opts.Debug {
|
||||
fmt.Printf("SADD (%d): %s\n", id, rem.user)
|
||||
fmt.Printf("SADD: %s\n", rem.user)
|
||||
}
|
||||
conn.Send("sadd", "llindex", rem.user)
|
||||
if count.GetErr() >= opts.MaxError {
|
||||
|
@ -36,13 +37,15 @@ func remover(id int) {
|
|||
}
|
||||
conn.Send("expire", rem.user, opts.RedisTTL.Seconds())
|
||||
conn.Flush()
|
||||
count.AddRem(len(rem.logins))
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("LREM (%d): %s - %d - %+v\n", id, rem.user, len(rem.logins), time.Since(start))
|
||||
fmt.Printf("LREM: %s - %d - %+v\n", rem.user, len(rem.logins), time.Since(start))
|
||||
}
|
||||
|
||||
counter <- Counterchan{
|
||||
tipo: "wg",
|
||||
val: -1,
|
||||
}
|
||||
wg.Done()
|
||||
count.DelWG(id)
|
||||
|
||||
count.AddRemoverTime(time.Since(start))
|
||||
}
|
||||
}
|
||||
|
|
75
rethinkdb.go
Normal file
75
rethinkdb.go
Normal file
|
@ -0,0 +1,75 @@
|
|||
package main
|
||||
|
||||
/*
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
// "strings"
|
||||
|
||||
rt "gopkg.in/dancannon/gorethink.v2"
|
||||
)
|
||||
|
||||
type Rethink struct {
|
||||
rtdb *rt.Session
|
||||
}
|
||||
|
||||
func NewRethinkDB(cluster []string) (*Rethink, error) {
|
||||
var (
|
||||
err error
|
||||
session *rt.Session
|
||||
)
|
||||
|
||||
if len(cluster) > 1 {
|
||||
session, err = rt.Connect(rt.ConnectOpts{
|
||||
Addresses: cluster,
|
||||
InitialCap: 10,
|
||||
MaxOpen: 10,
|
||||
})
|
||||
} else {
|
||||
session, err = rt.Connect(rt.ConnectOpts{
|
||||
Address: cluster[0],
|
||||
InitialCap: 10,
|
||||
MaxOpen: 10,
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Printf("RethinkDB ERROR: %+v\n", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Rethink{
|
||||
rtdb: session,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Rethink) Insert(login MongoLogin) (rt.WriteResponse, error) {
|
||||
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(login).RunWrite(r.rtdb)
|
||||
if opts.Debug {
|
||||
fmt.Printf("RTinsert: %+v\n", resp)
|
||||
}
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (r *Rethink) MultiInsert(logins []MongoLogin) (rt.WriteResponse, error) {
|
||||
resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins, rt.InsertOpts{Durability: "soft"}).RunWrite(r.rtdb)
|
||||
//resp, err := rt.DB("tiscali").Table("lastlogin").Insert(logins).RunWrite(r.rtdb)
|
||||
if opts.Debug {
|
||||
fmt.Printf("RTMulti: %+v\n", resp)
|
||||
}
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (r *Rethink) Close() {
|
||||
r.rtdb.Close()
|
||||
}
|
||||
*/
|
22
sigterm.go
22
sigterm.go
|
@ -7,15 +7,25 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func exit() {
|
||||
for i := 0; i < opts.Concurrent; i++ {
|
||||
log.Println("EXIT ", i)
|
||||
fmt.Println("EXIT ", i)
|
||||
loop[i] = false
|
||||
// done[i] <- true
|
||||
func kill() {
|
||||
log.Printf("KILL %d\n", status)
|
||||
fmt.Printf("KILL %d\n", status)
|
||||
wg.Done()
|
||||
counter <- Counterchan{
|
||||
tipo: "wg",
|
||||
val: -1,
|
||||
}
|
||||
done <- true
|
||||
}
|
||||
|
||||
func exit() {
|
||||
log.Printf("EXIT %d\n", status)
|
||||
fmt.Printf("EXIT %d\n", status)
|
||||
loop = false
|
||||
time.AfterFunc(time.Second*20, kill)
|
||||
}
|
||||
|
||||
func setTerm() {
|
||||
|
|
|
@ -24,4 +24,9 @@
|
|||
#
|
||||
# m h dom mon dow command
|
||||
|
||||
*/10 * * * * /opt/llmongo/start.sh > /dev/null
|
||||
### lastlogin import
|
||||
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
|
||||
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
|
||||
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
|
||||
### lastlogin create indexes
|
||||
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1
|
||||
|
|
2
xymon.go
2
xymon.go
|
@ -1,6 +1,7 @@
|
|||
// xymon.go
|
||||
package main
|
||||
|
||||
/*
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
@ -59,3 +60,4 @@ func xymonSend(msg []byte) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
|
|
Loading…
Add table
Reference in a new issue