Compare commits

..

26 commits
nats ... master

Author SHA1 Message Date
abca7e21e7 influxdb write verbose 2018-12-12 14:57:08 +01:00
ee28cf2d89 influxdb verbose errors 2018-12-12 14:35:13 +01:00
5c45a79d12 split host@addr:port 2018-12-12 13:33:26 +01:00
f1f38601d5 influxdb hostname|ip 2018-12-12 13:21:39 +01:00
a035171edb testing new mgo driver 2018-12-10 16:25:40 +01:00
2d3d9e43d3 added 2018-11-27 15:11:59 +01:00
a2fc00df74 update 2018-11-27 15:10:19 +01:00
5bc0076ecf auto discovery (by consul) 2018-11-22 12:29:08 +01:00
58234ad80a v4.4.5 2018-11-07 16:50:53 +01:00
3e3e36f81e counter INS = len(AllLogins) 2018-11-07 16:47:38 +01:00
8e1b5033f1 counter INS = len(bulk) 2018-11-07 16:44:03 +01:00
d8137b631f debug insert b2 2018-10-25 17:26:59 +02:00
f56f51bb27 ins = len(logins) 2018-10-25 17:19:34 +02:00
927aac308f debug info (BulkResult) 2018-10-25 16:59:19 +02:00
90177973fe use BulkRun Results to count insert 2018-10-25 16:50:49 +02:00
1ec55417d1 count insert 2018-10-25 12:41:58 +02:00
254405b1fb in test importa anche i login piu' vecchi di 6 mesi 2018-10-24 15:19:02 +02:00
6572d95834 aggiunta l'opzione -R <retention> default = 15552000 (6 mesi) 2018-10-24 09:33:08 +02:00
e1f9da40e4 - build automatico con tag versione
- docker-compose per ambiente di test
2018-10-23 09:26:55 +02:00
a78109af0d aggiunto il controllo della data: se > 6mesi salta 2018-10-23 09:01:27 +02:00
18c1e2012a added expire index 2018-07-19 11:50:18 +02:00
c33e3065ec update 2018-07-18 13:29:44 +02:00
8cac688ebd Merge branch 'master' into nats 2018-06-12 17:49:20 +02:00
947b4dec64 . 2016-06-06 10:21:47 +02:00
86c835f02b add debug output 2016-06-01 16:59:43 +02:00
467b9d93ac manda a influxdb i tempi medi,max e min di ogni worker ( producer, consumer, remover) 2016-06-01 16:44:09 +02:00
16 changed files with 212 additions and 28 deletions

View file

@ -3,6 +3,9 @@ FROM scratch
MAINTAINER Michele Fadda "<mikif70@gmail.com>"
COPY lastlogin_mongodb-v4.3.3 /bin/lastlogin_mongodb
ARG VER
ENV VER ${VER:-0.0.0}
COPY lastlogin_mongodb-${VER} /bin/lastlogin_mongodb
ENTRYPOINT [ "/bin/lastlogin_mongodb" ]

View file

@ -1,3 +1,3 @@
#!/bin/bash
docker build -t mikif70/llmongo:4.3.3 .
docker build -t mikif70/llmongo:$(git -C .. describe --tags) -t repo.ism.tiscali.sys:5000/mikif70/llmongo:$(git -C .. describe --tags) --build-arg VER=$(git -C .. describe --tags) .

View file

@ -1,3 +1,5 @@
#!/bin/bash
CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo .
CGO_ENABLED=0 GOOS=linux go build -o lastlogin_mongodb-$(git describe --tags) -a -installsuffix cgo .
mv lastlogin_mongodb-$(git describe --tags) Docker/

44
consul.go Normal file
View file

@ -0,0 +1,44 @@
package main
import (
"fmt"
"log"
"time"
"github.com/hashicorp/consul/api"
)
// Consul client structure
type Consul struct {
RedisTTL time.Duration
Timeout time.Duration
MaxError int
Influxdb string
Month string
Retention float64
MongoURI string
Database string
RedisURI string
Client *api.Client
}
// NewClient New consul client
func NewClient() *Consul {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalf("Consul error: %+v\n", err)
if opts.Debug {
fmt.Printf("Consul error: %+v\n", err)
}
}
consul := &Consul{
Client: client,
RedisTTL: opts.RedisTTL,
Timeout: opts.Timeout,
MaxError: opts.MaxError,
Influxdb: opts.Influxdb,
}
return consul
}

View file

@ -12,7 +12,8 @@ import (
"strings"
"time"
"gopkg.in/mgo.v2"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
type consumed struct {
@ -80,6 +81,18 @@ func consumer() {
continue
}
// se la data e' piu vecchia di RETENTION (15552000 sec) la scarta
if time.Since(time.Unix(date, 0)).Seconds()-opts.Retention >= 0 {
log.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
if opts.Debug {
fmt.Printf("Date Warning: %+v - %s - %s\n", time.Unix(date, 0), prod.user, login)
}
if !opts.Test {
continue
}
}
// verifica se esiste la country
if len(sval) <= 3 {
sval = append(sval, "NONE")
@ -117,7 +130,7 @@ func consumer() {
for _, val := range bulk {
for j, bl := range val {
_, err := bl.Run()
result, err := bl.Run()
if j == 0 {
if err != nil {
if !strings.Contains(err.Error(), "E11000") {
@ -127,7 +140,7 @@ func consumer() {
tipo: "err",
val: len(prod.logins),
}
if opts.Test {
if opts.Test || opts.Debug {
log.Printf("ERR: %s - %+v\n", prod.user, prod.logins)
}
continue
@ -136,12 +149,19 @@ func consumer() {
tipo: "dup",
val: strings.Count(err.Error(), "E11000"),
}
if opts.Debug {
log.Printf("DUP: %s - %+v\n", prod.user, prod.logins)
}
}
} else {
if opts.Test {
log.Printf("OK: %s - %+v\n", prod.user, prod.logins)
log.Printf("BulkResult: %s - %+v\n", prod.user, result)
}
counter <- Counterchan{
tipo: "ins",
val: len(allLogins),
}
}
}
}

View file

@ -6,6 +6,7 @@ import (
"time"
)
// Counterchan counter structure
type Counterchan struct {
tipo string
val int
@ -38,6 +39,7 @@ func NewCounter() *Counter {
}
}
// Run open channel to receive counter
func (c *Counter) Run() {
for {
tocount := <-counter

View file

@ -1,8 +1,15 @@
// v1.2
var db = connect("192.168.0.1:27017/lastlogin");
//var conn = new Mongo("192.168.0.1");
//var db = conn.getDB("lastlogin");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_"col.slice(2));
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})
var db = connect("192.168.0.1:27017/katamail");
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
var now = new Date();
var col = now.getFullYear() + dd(now.getMonth()+1) + dd(now.getDate())
print("creating index: ll_",col.slice(2));
db.getCollection("ll_"+col.slice(2)).createIndexes([{"date": 1},{"country": 1},{"user": 1,"date": -1}])
db.getCollection("ll_"+col.slice(2)).createIndex({"date": -1},{expireAfterSeconds: 15552000, name: "expire"})

3
dbs.go
View file

@ -10,7 +10,8 @@ import (
"github.com/garyburd/redigo/redis"
"gopkg.in/mgo.v2"
"github.com/globalsign/mgo"
// "gopkg.in/mgo.v2"
)
var (

View file

@ -0,0 +1,18 @@
version: '2'
services:
mongodb:
image: mikif70/mongodb:3.6.9
ports:
- 27017:27017
container_name: ll_mongod
volumes:
- ./mongod:/data
depends_on:
- redis
redis:
image: "redis:alpine"
container_name: ll_redis
ports:
- 6379:6379
volumes:
- ./redis:/data

9
docker-compose/mongo.sh Executable file
View file

@ -0,0 +1,9 @@
#!/bin/bash
if [ -z $1 ]; then
HOST=192.168.0.1:27017
else
HOST=${1}
fi
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}

16
docker-compose/test.sh Executable file
View file

@ -0,0 +1,16 @@
#!/bin/bash
echo "Starting mikif70/llmongo:$(git -C .. describe --tags)"
docker run \
--rm \
-v /opt/WORK/PROJECTS/New_Mail/lastlogin_mongodb/docker-compose/llmongo:/data \
--name llmongo \
mikif70/llmongo:$(git -C .. describe --tags) \
-l /data/llmongo.log \
-r 192.168.0.1:6379 \
-m 192.168.0.1:27017 \
-d lastlogin \
-i test@10.39.253.206:8086 \
-T 80s \
$@

View file

@ -3,6 +3,7 @@ package main
import (
"fmt"
"log"
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
@ -15,7 +16,8 @@ var (
func writeStats(start time.Time) {
if opts.Debug {
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
}
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
@ -23,7 +25,8 @@ func writeStats(start time.Time) {
Timeout: 2 * time.Second,
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
fmt.Printf("InfluxDB connect Error: %+v\n", err)
log.Printf("InfluxDB connect Error: %+v\n", err)
return
}
defer c.Close()
@ -33,7 +36,8 @@ func writeStats(start time.Time) {
Precision: "s",
})
if err != nil {
fmt.Printf("Error: %+v\n", err)
fmt.Printf("InfluxDB batch Error: %+v\n", err)
log.Printf("InfluxDB batch Error: %+v\n", err)
return
}
@ -48,12 +52,22 @@ func writeStats(start time.Time) {
}
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
if err != nil {
fmt.Printf("Error: %+v\n", err)
fmt.Printf("InfluxDB point Error: %+v\n", err)
log.Printf("InfluxDB point Error: %+v\n", err)
return
}
if opts.Debug {
fmt.Printf("InfluxDB pt: %+v\n", pt)
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
err = c.Write(bp)
if err != nil {
fmt.Printf("InfluxDB write Error: %+v\n", err)
log.Printf("InfluxDB write Error: %+v\n", err)
return
}
}

10
main.go
View file

@ -14,7 +14,7 @@ import (
)
const (
_Version = "v4.3.3"
_Version = "v5.0.0b6"
_Producer = 0
_Consumer = 1
_Remover = 2
@ -51,11 +51,15 @@ func main() {
}
if opts.Influxdb != "" {
re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
if re.MatchString(opts.Influxdb) {
match := re.FindStringSubmatch(opts.Influxdb)
if opts.Debug {
fmt.Printf("Influxdb match: %+v\n", match)
}
infhost = match[1]
infdb = match[2]
infdb = match[2] + ":" + match[3]
} else {
opts.Influxdb = ""
}

34
mongo_scripts.txt Normal file
View file

@ -0,0 +1,34 @@
################
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var i=1; i<18; i++) {
var col = "ll_1806"+dd(i);
print(col);
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
}
###########
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
for (var m=1; m<=12; m++) {
for (var d=1; d<=31; d++) {
var col = "ll_15"+dd(m)+dd(d);
if (db.getCollection(col).exists != null) {
print(col);
print(db.getCollection(col).totalSize());
}
}
}
####
var ar = db.getCollectionNames()
var len = ar.length
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
####
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})

View file

@ -1,4 +1,3 @@
// options
package main
import (
@ -25,8 +24,10 @@ type Options struct {
MaxError int
Influxdb string
Month string
// Pidfile string
Queue int
Retention float64
Consul string
Port int
}
var (
@ -45,7 +46,10 @@ func usage() {
-l <logfile>
-T <running timeout>
-i <influxdb [localname@ip:port]>
-C <consul [ip:port]>
-P <check port> [port] ## used by consul to check services ##
-q <parallel consumer>
-R <retention>
-v <version>
-D <debug>
-DD <Test>`)
@ -61,7 +65,6 @@ func init() {
}
opts.LogFile = path.Join(opts.CurrentPath, opts.LogFile)
// opts.Pidfile = path.Join(opts.CurrentPath, "run", path.Base(os.Args[0])+".pid")
opts.Exe = path.Base(os.Args[0])
flag.StringVar(&opts.Influxdb, "i", "", "influxdb server")
@ -69,6 +72,8 @@ func init() {
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
flag.IntVar(&opts.Port, "P", 10500, "service check port")
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
flag.BoolVar(&opts.Version, "v", false, "Version")
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
@ -76,5 +81,5 @@ func init() {
flag.BoolVar(&opts.Test, "DD", false, "Test")
flag.IntVar(&opts.MaxError, "E", 100, "Max Mongodb Error")
flag.IntVar(&opts.Queue, "q", 2, "parallel consumer")
// flag.StringVar(&opts.Pidfile, "p", opts.Pidfile, "pid file")
flag.Float64Var(&opts.Retention, "R", 15552000, "retention")
}

View file

@ -24,4 +24,9 @@
#
# m h dom mon dow command
*/10 * * * * /opt/llmongo/start.sh > /dev/null
### lastlogin import
*/3 * * * * /opt/llmongo/run.sh > /dev/null 2>&1
*/5 * * * * /opt/llmongo/katamail.sh > /dev/null 2>&1
05 * * * * /opt/impmongo/run.sh > /dev/null 2>&1
### lastlogin create indexes
00 01 * * * /opt/mongodb/mongo.sh -script /data/day_init.js >> /opt/mongodb/day_init.log 2>&1