Compare commits
9 commits
Author | SHA1 | Date | |
---|---|---|---|
abca7e21e7 | |||
ee28cf2d89 | |||
5c45a79d12 | |||
f1f38601d5 | |||
a035171edb | |||
2d3d9e43d3 | |||
a2fc00df74 | |||
5bc0076ecf | |||
58234ad80a |
11 changed files with 122 additions and 16 deletions
44
consul.go
Normal file
44
consul.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// Consul client structure
|
||||
type Consul struct {
|
||||
RedisTTL time.Duration
|
||||
Timeout time.Duration
|
||||
MaxError int
|
||||
Influxdb string
|
||||
Month string
|
||||
Retention float64
|
||||
MongoURI string
|
||||
Database string
|
||||
RedisURI string
|
||||
Client *api.Client
|
||||
}
|
||||
|
||||
// NewClient New consul client
|
||||
func NewClient() *Consul {
|
||||
client, err := api.NewClient(api.DefaultConfig())
|
||||
if err != nil {
|
||||
log.Fatalf("Consul error: %+v\n", err)
|
||||
if opts.Debug {
|
||||
fmt.Printf("Consul error: %+v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
consul := &Consul{
|
||||
Client: client,
|
||||
RedisTTL: opts.RedisTTL,
|
||||
Timeout: opts.Timeout,
|
||||
MaxError: opts.MaxError,
|
||||
Influxdb: opts.Influxdb,
|
||||
}
|
||||
|
||||
return consul
|
||||
}
|
|
@ -12,7 +12,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2"
|
||||
"github.com/globalsign/mgo"
|
||||
// "gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
type consumed struct {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// Counterchan counter structure
|
||||
type Counterchan struct {
|
||||
tipo string
|
||||
val int
|
||||
|
@ -38,6 +39,7 @@ func NewCounter() *Counter {
|
|||
}
|
||||
}
|
||||
|
||||
// Run open channel to receive counter
|
||||
func (c *Counter) Run() {
|
||||
for {
|
||||
tocount := <-counter
|
||||
|
|
3
dbs.go
3
dbs.go
|
@ -10,7 +10,8 @@ import (
|
|||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
|
||||
"gopkg.in/mgo.v2"
|
||||
"github.com/globalsign/mgo"
|
||||
// "gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
version: '2'
|
||||
services:
|
||||
mongodb:
|
||||
image: mikif70/mongodb:3.4.7
|
||||
image: mikif70/mongodb:3.6.9
|
||||
ports:
|
||||
- 27017:27017
|
||||
container_name: ll_mongod
|
||||
|
@ -15,4 +15,4 @@ services:
|
|||
ports:
|
||||
- 6379:6379
|
||||
volumes:
|
||||
- ./redis:/data
|
||||
- ./redis:/data
|
||||
|
|
|
@ -6,4 +6,4 @@ else
|
|||
HOST=${1}
|
||||
fi
|
||||
|
||||
docker run --rm -it mikif70/mongotools:3.4.7 mongo ${HOST}
|
||||
docker run --rm -it mikif70/mongotools:3.6.9 mongo ${HOST}
|
||||
|
|
|
@ -11,5 +11,6 @@ docker run \
|
|||
-r 192.168.0.1:6379 \
|
||||
-m 192.168.0.1:27017 \
|
||||
-d lastlogin \
|
||||
-i test@10.39.253.206:8086 \
|
||||
-T 80s \
|
||||
$@
|
||||
|
|
24
influxdb.go
24
influxdb.go
|
@ -3,6 +3,7 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
influxdb "github.com/influxdata/influxdb/client/v2"
|
||||
|
@ -15,7 +16,8 @@ var (
|
|||
|
||||
func writeStats(start time.Time) {
|
||||
if opts.Debug {
|
||||
fmt.Printf("writing to influxdb server: %s", opts.Influxdb)
|
||||
fmt.Printf("writing to influxdb server: %s\n", opts.Influxdb)
|
||||
fmt.Printf("host: %s -- addr: %s\n", infhost, infdb)
|
||||
}
|
||||
|
||||
c, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
|
||||
|
@ -23,7 +25,8 @@ func writeStats(start time.Time) {
|
|||
Timeout: 2 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
fmt.Printf("InfluxDB connect Error: %+v\n", err)
|
||||
log.Printf("InfluxDB connect Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
@ -33,7 +36,8 @@ func writeStats(start time.Time) {
|
|||
Precision: "s",
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
fmt.Printf("InfluxDB batch Error: %+v\n", err)
|
||||
log.Printf("InfluxDB batch Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -48,12 +52,22 @@ func writeStats(start time.Time) {
|
|||
}
|
||||
pt, err := influxdb.NewPoint("ll2mongo", tags, fields, start)
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %+v\n", err)
|
||||
fmt.Printf("InfluxDB point Error: %+v\n", err)
|
||||
log.Printf("InfluxDB point Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if opts.Debug {
|
||||
fmt.Printf("InfluxDB pt: %+v\n", pt)
|
||||
}
|
||||
|
||||
bp.AddPoint(pt)
|
||||
|
||||
// Write the batch
|
||||
c.Write(bp)
|
||||
err = c.Write(bp)
|
||||
if err != nil {
|
||||
fmt.Printf("InfluxDB write Error: %+v\n", err)
|
||||
log.Printf("InfluxDB write Error: %+v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
10
main.go
10
main.go
|
@ -14,7 +14,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
_Version = "v4.4.5b4"
|
||||
_Version = "v5.0.0b6"
|
||||
_Producer = 0
|
||||
_Consumer = 1
|
||||
_Remover = 2
|
||||
|
@ -51,11 +51,15 @@ func main() {
|
|||
}
|
||||
|
||||
if opts.Influxdb != "" {
|
||||
re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
|
||||
var re = regexp.MustCompile(`(?m)(\w+)@([\w.]+):(\d+)`)
|
||||
// re, _ := regexp.Compile(`(\w+)@(\d+.\d+.\d+.\d+:\d+)`)
|
||||
if re.MatchString(opts.Influxdb) {
|
||||
match := re.FindStringSubmatch(opts.Influxdb)
|
||||
if opts.Debug {
|
||||
fmt.Printf("Influxdb match: %+v\n", match)
|
||||
}
|
||||
infhost = match[1]
|
||||
infdb = match[2]
|
||||
infdb = match[2] + ":" + match[3]
|
||||
} else {
|
||||
opts.Influxdb = ""
|
||||
}
|
||||
|
|
34
mongo_scripts.txt
Normal file
34
mongo_scripts.txt
Normal file
|
@ -0,0 +1,34 @@
|
|||
|
||||
################
|
||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
||||
|
||||
for (var i=1; i<18; i++) {
|
||||
var col = "ll_1806"+dd(i);
|
||||
print(col);
|
||||
db.getCollection(col).createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true});
|
||||
}
|
||||
|
||||
###########
|
||||
var dd = function(x) { var s = x.toString(); if(s.length > 1){return s;} else { return "0"+s;}}
|
||||
|
||||
for (var m=1; m<=12; m++) {
|
||||
for (var d=1; d<=31; d++) {
|
||||
var col = "ll_15"+dd(m)+dd(d);
|
||||
if (db.getCollection(col).exists != null) {
|
||||
print(col);
|
||||
print(db.getCollection(col).totalSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
####
|
||||
var ar = db.getCollectionNames()
|
||||
var len = ar.length
|
||||
|
||||
for (var i=0; i<len; i++) { if (ar[i].startsWith("ll")) { print(db.getCollection(ar[i]))}}
|
||||
|
||||
|
||||
####
|
||||
db.lastlogin_1703.createIndex({date: -1}, {expireAfterSeconds: 15552000, name: "expire", background: true})
|
||||
|
11
options.go
11
options.go
|
@ -1,4 +1,3 @@
|
|||
// options
|
||||
package main
|
||||
|
||||
import (
|
||||
|
@ -27,6 +26,8 @@ type Options struct {
|
|||
Month string
|
||||
Queue int
|
||||
Retention float64
|
||||
Consul string
|
||||
Port int
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -40,11 +41,13 @@ func usage() {
|
|||
fmt.Println(`Usage: llmongo
|
||||
-m <mongodb [ip:port,ip:port]>
|
||||
-d <mongodb database [dbname]>
|
||||
-r <redisdb [ip:port]>
|
||||
-r <redisdb [ip:port]>
|
||||
-t <redis keys ttl>
|
||||
-l <logfile>
|
||||
-T <running timeout>
|
||||
-i <influxdb [localname@ip:port]>
|
||||
-i <influxdb [localname@ip:port]>
|
||||
-C <consul [ip:port]>
|
||||
-P <check port> [port] ## used by consul to check services ##
|
||||
-q <parallel consumer>
|
||||
-R <retention>
|
||||
-v <version>
|
||||
|
@ -69,6 +72,8 @@ func init() {
|
|||
flag.StringVar(&dbs.Database, "d", "", "Mongodb Database")
|
||||
flag.StringVar(&dbs.RedisURI, "r", "", "Redis")
|
||||
flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename")
|
||||
flag.StringVar(&opts.Consul, "C", opts.Consul, "consul client")
|
||||
flag.IntVar(&opts.Port, "P", 10500, "service check port")
|
||||
flag.DurationVar(&opts.RedisTTL, "t", opts.RedisTTL, "Redis keys TTL")
|
||||
flag.BoolVar(&opts.Version, "v", false, "Version")
|
||||
flag.DurationVar(&opts.Timeout, "T", 0, "Running timeout")
|
||||
|
|
Loading…
Add table
Reference in a new issue