// llmongo.go package main import ( "flag" "fmt" "log" "os" "sync" "time" ) const ( _Version = "v3.1.1" _Producer = 0 _Consumer = 1 _Remover = 2 ) var ( loop bool done chan bool consume chan produced remove chan consumed counter chan Counterchan wg sync.WaitGroup status int count *Counter ) func main() { flag.Usage = usage flag.Parse() if opts.Version { fmt.Println(os.Args[0], _Version) os.Exit(0) } if opts.Hostname == "" { var err error opts.Hostname, err = os.Hostname() if err != nil { fmt.Println("Hostname error: ", err.Error()) } } setTerm() fs, err := os.OpenFile(opts.LogFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { fmt.Println("Log file error: ", err.Error()) os.Exit(-4) } defer fs.Close() log.SetOutput(fs) pid.PIDFile = opts.Pidfile pid.Write(true) defer pid.Remove() start := time.Now() fmt.Printf("Start: %+v\n", opts) log.Printf("Start: %+v\n", opts) opts.Month = start.Format("0601") dbs.poolRedis() defer dbs.rdb.Close() if dbs.isMongodb() { dbs.connectMongo() defer dbs.mdb.Close() } if dbs.isRethink() { dbs.poolRethink() defer dbs.rtdb.Close() } if opts.Timeout > 0 { time.AfterFunc(opts.Timeout, exit) } count = NewCounter() consume = make(chan produced, opts.Queue) remove = make(chan consumed, opts.Queue) loop = true done = make(chan bool) counter = make(chan Counterchan) go count.Run() go producer() for i := 0; i < opts.Queue; i++ { go consumer() go remover() } <-done fmt.Printf("Done\n") close(done) fmt.Println("Waiting WG") wg.Wait() count.SetTime(time.Since(start)) fmt.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) log.Printf("Stop %v - user: %d - login: %d - insert: %d - errors: %d - rem: %d - duplicate: %d\n\r", count.GetTime(), count.GetUser(), count.GetLog(), count.GetInsert(), count.GetErr(), count.GetRem(), count.GetDup()) if opts.Influxdb != "" { writeStats(start) } }