package main import ( "fmt" "io" "time" "github.com/gin-gonic/gin" "log" ) type SurferLive struct { Name string `json:"name"` Color string `json:"color"` Score string `json:"score"` Priority string `json:"priority"` } type Message struct { Surfers []SurferLive `json:"surfers"` Priority []string `json:"priority"` Heat Heat `json:"heat"` Duration string `json:"duration"` Source string `json:"source"` Msg string `json:"msg"` Mode string `json:"mode"` } type Client struct { Chan ClientChan IP IPAddress Mode Mode } type ClientChan chan Message type IPAddress string type SseStream struct { // Events are pushed to this channel by the main events-gathering routine Message chan Message // New client connections NewClients chan Client //chan string // Closed client connections ClosedClients chan ClientChan // Total client connections TotalClients map[ClientChan]IPAddress //bool StatusPriority []string Duration time.Duration Start bool Heat *Heat } // Initialize event and Start procnteessing requests func NewServer() (sse *SseStream) { sse = &SseStream{ Message: make(chan Message), NewClients: make(chan Client), ClosedClients: make(chan ClientChan), TotalClients: make(map[ClientChan]IPAddress), Start: false, Heat: &Heat{}, } go sse.listen() go sse.timer() return } // It Listens all incoming requests from clients. // Handles addition and removal of clients and broadcast messages to clients. func (stream *SseStream) listen() { for { select { // Add new available client case client := <-stream.NewClients: stream.TotalClients[client.Chan] = client.IP log.Printf("Client added. %d - %s registered clients", len(stream.TotalClients), client.IP) // Remove closed client case client := <-stream.ClosedClients: ip := stream.TotalClients[client] delete(stream.TotalClients, client) close(client) log.Printf("Removed client. %d - %s registered clients", len(stream.TotalClients), ip) // Broadcast message to client case eventMsg := <-stream.Message: for clientMessageChan := range stream.TotalClients { clientMessageChan <- eventMsg log.Printf("Message %+v sent to %s", eventMsg, stream.TotalClients[clientMessageChan]) } } } } func HeadersMiddleware() gin.HandlerFunc { return func(c *gin.Context) { c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Transfer-Encoding", "chunked") c.Next() } } func (stream *SseStream) serveHTTP() gin.HandlerFunc { return func(c *gin.Context) { // Initialize client channel clientChan := make(ClientChan) cli := Client{ Chan: clientChan, IP: IPAddress(c.ClientIP()), Mode: Priority, } // Send new connection to event server stream.NewClients <- cli defer func() { // Send closed connection to event server stream.ClosedClients <- clientChan }() c.Set("clientChan", clientChan) c.Next() } } func (stream *SseStream) retvalSSE() gin.HandlerFunc { return func(c *gin.Context) { if len(stream.StatusPriority) > 0 { stream.SendPriority(stream.StatusPriority) log.Printf("update priority %+v", stream.StatusPriority) } v, ok := c.Get("clientChan") if !ok { return } clientChan, ok := v.(ClientChan) if !ok { return } c.Stream(func(w io.Writer) bool { // Stream message to client from message channel if msg, ok := <-clientChan; ok { c.SSEvent("message", msg) return true } return false }) } } func (stream *SseStream) SendPriority(pri []string) { stream.Message <- Message{ Priority: pri, Mode: Priority.String(), } } func (stream *SseStream) timer() { for { if stream.Start { timer := time.NewTimer(stream.Duration) select { case <-timer.C: stream.Start = false msg := Message{ Msg: "stop", Mode: Stop.String(), } stream.Message <- msg log.Printf("stop timer %+v", stream.Duration) stream.Heat.Status = "ended" continue default: if len(stream.TotalClients) > 0 { if stream.Duration >= 0 { currentTimer := fmt.Sprintf("%v", formatTime(stream.Duration)) msg := Message{ Duration: currentTimer, Mode: Time.String(), } // Send current time to clients message channel stream.Message <- msg stream.Duration = stream.Duration - time.Second time.Sleep(time.Second * 1) } else { timer.Stop() } } } } } } func formatTime(d time.Duration) string { minutes := int(d.Minutes()) % 60 seconds := int(d.Seconds()) % 60 return fmt.Sprintf("%02d:%02d", minutes, seconds) }