Priority/backend.old/sse.go
2023-12-20 16:49:33 +01:00

216 lines
4.7 KiB
Go

package main
import (
"fmt"
"io"
"time"
"github.com/gin-gonic/gin"
"log"
)
type SurferLive struct {
Name string `json:"name"`
Color string `json:"color"`
Score string `json:"score"`
Priority string `json:"priority"`
}
type Message struct {
Surfers []SurferLive `json:"surfers"`
Priority []string `json:"priority"`
Heat Heat `json:"heat"`
Duration string `json:"duration"`
Source string `json:"source"`
Msg string `json:"msg"`
Mode string `json:"mode"`
}
type Client struct {
Chan ClientChan
IP IPAddress
Mode Mode
}
type ClientChan chan Message
type IPAddress string
type SseStream struct {
// Events are pushed to this channel by the main events-gathering routine
Message chan Message
// New client connections
NewClients chan Client //chan string
// Closed client connections
ClosedClients chan ClientChan
// Total client connections
TotalClients map[ClientChan]IPAddress //bool
StatusPriority []string
Duration time.Duration
Start bool
Heat *Heat
}
// Initialize event and Start procnteessing requests
func NewServer() (sse *SseStream) {
sse = &SseStream{
Message: make(chan Message),
NewClients: make(chan Client),
ClosedClients: make(chan ClientChan),
TotalClients: make(map[ClientChan]IPAddress),
Start: false,
Heat: &Heat{},
}
go sse.listen()
go sse.timer()
return
}
// It Listens all incoming requests from clients.
// Handles addition and removal of clients and broadcast messages to clients.
func (stream *SseStream) listen() {
for {
select {
// Add new available client
case client := <-stream.NewClients:
stream.TotalClients[client.Chan] = client.IP
log.Printf("Client added. %d - %s registered clients", len(stream.TotalClients), client.IP)
// Remove closed client
case client := <-stream.ClosedClients:
ip := stream.TotalClients[client]
delete(stream.TotalClients, client)
close(client)
log.Printf("Removed client. %d - %s registered clients", len(stream.TotalClients), ip)
// Broadcast message to client
case eventMsg := <-stream.Message:
for clientMessageChan := range stream.TotalClients {
clientMessageChan <- eventMsg
log.Printf("Message %+v sent to %s", eventMsg, stream.TotalClients[clientMessageChan])
}
}
}
}
func HeadersMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Next()
}
}
func (stream *SseStream) serveHTTP() gin.HandlerFunc {
return func(c *gin.Context) {
// Initialize client channel
clientChan := make(ClientChan)
cli := Client{
Chan: clientChan,
IP: IPAddress(c.ClientIP()),
Mode: Priority,
}
// Send new connection to event server
stream.NewClients <- cli
defer func() {
// Send closed connection to event server
stream.ClosedClients <- clientChan
}()
c.Set("clientChan", clientChan)
c.Next()
}
}
func (stream *SseStream) retvalSSE() gin.HandlerFunc {
return func(c *gin.Context) {
if len(stream.StatusPriority) > 0 {
stream.SendPriority(stream.StatusPriority)
log.Printf("update priority %+v", stream.StatusPriority)
}
v, ok := c.Get("clientChan")
if !ok {
return
}
clientChan, ok := v.(ClientChan)
if !ok {
return
}
c.Stream(func(w io.Writer) bool {
// Stream message to client from message channel
if msg, ok := <-clientChan; ok {
c.SSEvent("message", msg)
return true
}
return false
})
}
}
func (stream *SseStream) SendPriority(pri []string) {
stream.Message <- Message{
Priority: pri,
Mode: Priority.String(),
}
}
func (stream *SseStream) timer() {
for {
if stream.Start {
timer := time.NewTimer(stream.Duration)
select {
case <-timer.C:
stream.Start = false
msg := Message{
Msg: "stop",
Mode: Stop.String(),
}
stream.Message <- msg
log.Printf("stop timer %+v", stream.Duration)
stream.Heat.Status = "ended"
continue
default:
if len(stream.TotalClients) > 0 {
if stream.Duration >= 0 {
currentTimer := fmt.Sprintf("%v", formatTime(stream.Duration))
msg := Message{
Duration: currentTimer,
Mode: Time.String(),
}
// Send current time to clients message channel
stream.Message <- msg
stream.Duration = stream.Duration - time.Second
time.Sleep(time.Second * 1)
} else {
timer.Stop()
}
}
}
}
}
}
func formatTime(d time.Duration) string {
minutes := int(d.Minutes()) % 60
seconds := int(d.Seconds()) % 60
return fmt.Sprintf("%02d:%02d", minutes, seconds)
}