diff --git a/consul.go b/consul.go new file mode 100644 index 0000000..a959a24 --- /dev/null +++ b/consul.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/hashicorp/consul/api" +) + +// Consul client structure +type Consul struct { + RedisTTL time.Duration + Timeout time.Duration + MaxError int + Influxdb string + Month string + Retention float64 + MongoURI string + Database string + RedisURI string + Client *api.Client +} + +// NewClient New consul client +func NewClient() *Consul { + client, err := api.NewClient(api.DefaultConfig()) + if err != nil { + log.Fatalf("Consul error: %+v\n", err) + if opts.Debug { + fmt.Printf("Consul error: %+v\n", err) + } + } + + consul := &Consul{ + Client: client, + RedisTTL: opts.RedisTTL, + Timeout: opts.Timeout, + MaxError: opts.MaxError, + Influxdb: opts.Influxdb, + } + + return consul +} diff --git a/consumer.go b/consumer.go index 5a68f9e..0c4b7f8 100644 --- a/consumer.go +++ b/consumer.go @@ -12,7 +12,8 @@ import ( "strings" "time" - "gopkg.in/mgo.v2" + "github.com/globalsign/mgo" + // "gopkg.in/mgo.v2" ) type consumed struct { @@ -139,7 +140,7 @@ func consumer() { tipo: "err", val: len(prod.logins), } - if opts.Test { + if opts.Test || opts.Debug { log.Printf("ERR: %s - %+v\n", prod.user, prod.logins) } continue @@ -148,17 +149,18 @@ func consumer() { tipo: "dup", val: strings.Count(err.Error(), "E11000"), } + if opts.Debug { + log.Printf("DUP: %s - %+v\n", prod.user, prod.logins) + } } } else { if opts.Test { log.Printf("OK: %s - %+v\n", prod.user, prod.logins) - } - if opts.Debug { log.Printf("BulkResult: %s - %+v\n", prod.user, result) } counter <- Counterchan{ tipo: "ins", - val: len(prod.logins), + val: len(allLogins), } } } diff --git a/counter.go b/counter.go index f9505d2..1b616a0 100644 --- a/counter.go +++ b/counter.go @@ -6,6 +6,7 @@ import ( "time" ) +// Counterchan counter structure type Counterchan struct { tipo string val int @@ -38,6 +39,7 @@ func NewCounter() *Counter { } } +// Run open channel to receive counter func (c *Counter) Run() { for { tocount := <-counter diff --git a/dbs.go b/dbs.go index f0bec39..4e28cf5 100644 --- a/dbs.go +++ b/dbs.go @@ -10,7 +10,8 @@ import ( "github.com/garyburd/redigo/redis" - "gopkg.in/mgo.v2" + "github.com/globalsign/mgo" + // "gopkg.in/mgo.v2" ) var ( diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 67eeacb..cf96102 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -1,7 +1,7 @@ version: '2' services: mongodb: - image: mikif70/mongodb:3.4.7 + image: mikif70/mongodb:3.6.9 ports: - 27017:27017 container_name: ll_mongod @@ -15,4 +15,4 @@ services: ports: - 6379:6379 volumes: - - ./redis:/data \ No newline at end of file + - ./redis:/data diff --git a/docker-compose/mongo.sh b/docker-compose/mongo.sh index 4c17756..e7fadef 100755 --- a/docker-compose/mongo.sh +++ b/docker-compose/mongo.sh @@ -6,4 +6,4 @@ else HOST=${1} fi -docker run --rm -it mikif70/mongotools:3.4.7 mongo ${HOST} +docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST} diff --git a/docker-compose/test.sh b/docker-compose/test.sh index 87d6691..233ebbf 100755 --- a/docker-compose/test.sh +++ b/docker-compose/test.sh @@ -11,5 +11,6 @@ docker run \ -r 192.168.0.1:6379 \ -m 192.168.0.1:27017 \ -d lastlogin \ + -i test@10.39.253.206:8086 \ -T 80s \ $@ diff --git a/influxdb.go b/influxdb.go index e0cdae4..be821d2 100644 --- a/influxdb.go +++ b/influxdb.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "log" "time" influxdb "github.com/influxdata/influxdb/client/v2" @@ -15,7 +16,8 @@ var ( func writeStats(start time.Time) { if opts.Debug { - fmt.Printf("writing to influxdb server: %s", opts.Influxdb) + fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb) + fmt.Printf("host: %s -- addr: %s\n", infhost, infdb) } c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{ @@ -23,7 +25,8 @@ func writeStats(start time.Time) { Timeout: 2 * time.Second, }) if err != nil { - fmt.Printf("Error: %+v\n", err) + fmt.Printf("InfluxDB connect Error: %+v\n", err) + log.Printf("InfluxDB connect Error: %+v\n", err) return } defer c.Close() @@ -33,7 +36,8 @@ func writeStats(start time.Time) { Precision: "s", }) if err != nil { - fmt.Printf("Error: %+v\n", err) + fmt.Printf("InfluxDB batch Error: %+v\n", err) + log.Printf("InfluxDB batch Error: %+v\n", err) return } @@ -48,12 +52,22 @@ func writeStats(start time.Time) { } pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start) if err != nil { - fmt.Printf("Error: %+v\n", err) + fmt.Printf("InfluxDB point Error: %+v\n", err) + log.Printf("InfluxDB point Error: %+v\n", err) return } + if opts.Debug { + fmt.Printf("InfluxDB pt: %+v\n", pt) + } + bp.AddPoint(pt) // Write the batch - c.Write(bp) + err = c.Write(bp) + if err != nil { + fmt.Printf("InfluxDB write Error: %+v\n", err) + log.Printf("InfluxDB write Error: %+v\n", err) + return + } } diff --git a/main.go b/main.go index 66b7216..304e716 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( ) const ( - _Version = "v4.4.5b1" + _Version = "v5.0.0b6" _Producer = 0 _Consumer = 1 _Remover = 2 @@ -51,11 +51,15 @@ func main() { } if opts.Influxdb != "" { - re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`) + var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`) + // re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`) if re.MatchString(opts.Influxdb) { match := re.FindStringSubmatch(opts.Influxdb) + if opts.Debug { + fmt.Printf("Influxdb match: %+v\n", match) + } infhost = match[1] - infdb = match[2] + infdb = match[2] + ":" + match[3] } else { opts.Influxdb = "" } diff --git a/mongo_scripts.txt b/mongo_scripts.txt new file mode 100644 index 0000000..6dc7834 --- /dev/null +++ b/mongo_scripts.txt @@ -0,0 +1,34 @@ + +################ +var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} + +for (var i=1; i<18; i++) { + var col = "ll_1806"+dd(i); + print(col); + db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true}); +} + +########### +var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}} + +for (var m=1; m<=12; m++) { + for (var d=1; d<=31; d++) { + var col = "ll_15"+dd(m)+dd(d); + if (db.getCollection(col).exists != null) { + print(col); + print(db.getCollection(col).totalSize()); + } + } +} + + +#### +var ar = db.getCollectionNames() +var len = ar.length + +for (var i=0; i -d - -r + -r -t -l -T - -i + -i + -C + -P [port] ## used by consul to check services ## -q -R -v @@ -69,6 +72,8 @@ func init() { flag.StringVar(&dbs.Database, "d", "", "Mongodb Database") flag.StringVar(&dbs.RedisURI, "r", "", "Redis") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") + flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client") + flag.IntVar(&opts.Port, "P", 10500, "service check port") flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL") flag.BoolVar(&opts.Version, "v", false, "Version") flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")