// lastlogin_consolidate package main import ( "flag" "fmt" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "log" "os" "path" "path/filepath" "time" ) const ( _VERSION = "v1.0.9" _tformat = "2006-01-02" _24h = (time.Hour * 23) + (time.Minute * 59) + (time.Second * 59) _10m = (time.Minute * 10) _15m = (time.Minute * 15) ) var ( opts = Options{ MongoSrc: "mongodb://127.0.0.1:27018", LogFile: "log/llconsolidate.log", StartDate: time.Now().Add(-24 * time.Hour).Format(_tformat), Duration: _24h, Interval: _15m, } ) type Options struct { MongoSrc string MongoDst string mdbSrc *mgo.Session mdbDst *mgo.Session ll *mgo.Collection lc *mgo.Collection StartDate string Duration time.Duration Interval time.Duration LogFile string Version bool Debug bool } type LastLogin struct { User string `json: "user"` Protocol string `json: "protocol"` IP string `json: "ip"` Date time.Time `json: "date"` ID string `json: "_id"` } type LastLoginDay struct { User string `json:"user"` Date time.Time `json:"date"` Protocols Protocols `json:"protocols"` IPs []IPs `json:"ips"` } type IPs struct { IP string `json:"ip"` Date time.Time `json:"date"` Protocol string `json:"protocol"` } type Protocols struct { Pop int `json:"pop"` Imap int `json:"imap"` Web int `json:"web"` } type Index struct { User string `json:"user"` Date time.Time `json:"date"` } func usage() { fmt.Println("Usage: lastlogin_consolidate -ms <mongo source mongodb://IP:PORT> -md <mongo destination mongodb://IP:PORT> -l <logfile> -d <date> -dd <duration> -i <interval> -v\n") os.Exit(0) } func init() { current, err := filepath.Abs(filepath.Dir(os.Args[0])) if err != nil { log.Fatal(err) } opts.LogFile = path.Join(current, opts.LogFile) flag.StringVar(&opts.MongoSrc, "ms", opts.MongoSrc, "Mongodb Source") flag.StringVar(&opts.MongoDst, "md", "", "Mongodb Destination") flag.StringVar(&opts.LogFile, "l", opts.LogFile, "Logs filename") flag.StringVar(&opts.StartDate, "d", opts.StartDate, "Date") flag.DurationVar(&opts.Duration, "dd", opts.Duration, "Duration") flag.DurationVar(&opts.Interval, "i", opts.Interval, "Duration") flag.BoolVar(&opts.Version, "v", false, "Version") flag.BoolVar(&opts.Debug, "debug", false, "Debug") } func connectMongo() { if opts.MongoSrc == "" { log.Fatalf("Mongodb URI invalid: '%s'\n", opts.MongoSrc) } var err error //opts.mdbSrc, err = mgo.DialWithTimeout(opts.MongoSrc, time.Minute*5) opts.mdbSrc, err = mgo.Dial(opts.MongoSrc) if err != nil { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } opts.mdbSrc.SetSocketTimeout(0) opts.ll = opts.mdbSrc.DB("dovecot").C("lastlogin") if opts.MongoDst == "" { opts.mdbDst = opts.mdbSrc opts.lc = opts.mdbSrc.DB("dovecot").C("lastlogin_day") } else { opts.mdbDst, err = mgo.Dial(opts.MongoDst) if err != nil { log.Println("Mongodb connect Error: ", err.Error()) os.Exit(-3) } opts.lc = opts.mdbDst.DB("dovecot").C("lastlogin_day") } } func main() { flag.Usage = usage flag.Parse() if opts.Version { fmt.Println(os.Args[0], _VERSION) os.Exit(0) } fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) os.Exit(-4) } log.SetOutput(fs) start := time.Now() fmt.Printf("Start: %+v\n", opts) log.Printf("Start: %+v\n", opts) connectMongo() defer opts.mdbSrc.Close() defer opts.mdbDst.Clone() y, err := time.Parse(_tformat, opts.StartDate) if err != nil { log.Println("Date Error: ", err) os.Exit(-1) } // DEBUG //fmt.Printf("Start %+v\n\r", y) var ys []time.Time var ye []time.Time if opts.Duration <= (time.Hour * 24) { ys = append(ys, time.Date(y.Year(), y.Month(), y.Day(), 0, 0, 0, 0, time.UTC)) ye = append(ye, ys[0].Add(opts.Duration)) } else { for i := 0; i < int(opts.Duration/(time.Hour*24)); i++ { // fmt.Println(i) yt := y.Add(time.Hour * time.Duration(24*i)) // fmt.Println(yt) ys = append(ys, time.Date(yt.Year(), yt.Month(), yt.Day(), 0, 0, 0, 0, time.UTC)) ye = append(ye, ys[i].Add(_24h)) } } // DEBUG if opts.Debug { fmt.Printf("Start: %+v, Stop: %+v\n\r", ys, ye) } countTOT := 0 countOK := 0 for i := range ys { pStart := time.Now() qStart := time.Now() q := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}}).Sort("user") if opts.Debug { fmt.Printf("Find from date: %s\n\r", time.Since(qStart)) } qStart = time.Now() ar := []string{} q.Distinct("user", &ar) if opts.Debug { fmt.Printf("Distinct user: %s\n\r", time.Since(qStart)) } fmt.Printf("Date: %s - %s\n", ys[i], ye[i]) log.Printf("Date: %s - %s\n", ys[i], ye[i]) for u := range ar { countTOT += 1 ll := LastLoginDay{} ll.User = ar[u] ll.Date = ys[i] // DEBUG if opts.Debug { fmt.Printf("User: %s\n\r", ar[u]) } qStart = time.Now() nq := opts.ll.Find(bson.M{"date": bson.M{"$gte": ys[i], "$lte": ye[i]}, "user": ar[u]}).Sort("date") //fmt.Printf("User: %s -> %s\n\r", ar[u], time.Since(qStart) ) iter := nq.Iter() result := LastLogin{} ips := []IPs{} lastip := IPs{} for iter.Next(&result) { if result.IP == lastip.IP && result.Date.Sub(lastip.Date) < opts.Interval { //fmt.Println("IPs: ", result.IP, result.Date, result.Date.Sub(lastip.Date)) } else { ips = append(ips, IPs{IP: result.IP, Date: result.Date, Protocol: result.Protocol}) lastip.Date = result.Date lastip.IP = result.IP } switch result.Protocol { case "pop3", "pop": ll.Protocols.Pop += 1 case "imap": ll.Protocols.Imap += 1 case "web": ll.Protocols.Web += 1 } } if err := iter.Close(); err != nil { log.Println("Iter: ", err) } ll.IPs = ips //fmt.Printf("Upsert %+v\n\r", ll) _, err := opts.lc.Upsert(Index{User: ll.User, Date: ll.Date}, ll) if err != nil { log.Println("Insert error: ", err) } countOK += 1 } fmt.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) log.Printf("Stop %s: %s\n", ys[i], time.Since(pStart)) } fmt.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, time.Since(start)) log.Printf("Stop: OK: %d - TOT: %d - Time: %s\n\r", countOK, countTOT, time.Since(start)) }