Priority/backend/ssestream.go

127 lines
2.6 KiB
Go
Raw Normal View History

2023-12-21 10:52:56 +00:00
package main
import (
"io"
"log"
2024-01-07 14:18:05 +00:00
"net/http"
2024-01-05 13:31:33 +00:00
"slices"
"strconv"
2024-01-07 14:18:05 +00:00
"time"
2023-12-21 10:52:56 +00:00
"github.com/gin-gonic/gin"
)
2024-01-07 14:18:05 +00:00
var events = []string{"message", "timer", "priority", "cmd"}
2023-12-21 10:52:56 +00:00
type Message struct {
Event string `json:"event"`
Data string `json:"data"`
Cmd string `json:"cmd"`
Id string `json:"id"`
}
type ClientChan chan Message
type Client struct {
Id string `json:"id"`
Ip string `json:"ip"`
Chan ClientChan `json:"chan"`
Events []string `json:"events"`
}
type SseStream struct {
2024-01-07 14:18:05 +00:00
Clients []Client `json:"clients"`
MsgId map[string]int `json:"msgid"`
Events []string `json:"events"`
HeatRunning bool `json:"heat_running"`
HeatTimer time.Duration `json:"heat_timer"`
2023-12-21 10:52:56 +00:00
}
func InitSse() *SseStream {
2024-01-07 14:18:05 +00:00
sse := &SseStream{
Clients: make([]Client, 0),
MsgId: map[string]int{},
Events: events,
HeatRunning: false,
HeatTimer: 0,
2023-12-21 10:52:56 +00:00
}
2024-01-07 14:18:05 +00:00
for i := range events {
sse.MsgId[events[i]] = 0
}
return sse
2023-12-21 10:52:56 +00:00
}
func (sse *SseStream) SseHeadersMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Next()
}
}
func (sse *SseStream) Stream(c *gin.Context) {
client := &Client{
Ip: c.Request.RemoteAddr,
Chan: make(ClientChan),
2024-01-06 20:59:21 +00:00
Events: c.QueryArray("event"),
2023-12-21 10:52:56 +00:00
}
2024-01-04 15:47:44 +00:00
log.Printf("events: %+v", client.Events)
2023-12-21 10:52:56 +00:00
client.Id = generateClientId(client.Ip, client.Events, 8)
sse.AddClient(*client)
defer func() {
sse.RemoveClient(client)
close(client.Chan)
log.Printf("Client %s disconnected", client.Ip)
}()
log.Printf("Client %s connected", client.Ip)
c.Stream(func(w io.Writer) bool {
msg, ok := <-client.Chan
if !ok {
return false
}
c.SSEvent(msg.Event, msg)
return true
})
}
func (sse *SseStream) AddClient(client Client) {
sse.Clients = append(sse.Clients, client)
}
func (sse *SseStream) RemoveClient(client *Client) {
for i, c := range sse.Clients {
if c.Id == client.Id {
sse.Clients = append(sse.Clients[:i], sse.Clients[i+1:]...)
return
}
}
}
2024-01-05 13:31:33 +00:00
func (sse *SseStream) Send(msg Message) {
sse.MsgId[msg.Event]++
msg.Id = strconv.Itoa(sse.MsgId[msg.Event])
for _, client := range sse.Clients {
if slices.Contains(client.Events, msg.Event) || slices.Contains(client.Events, "*") {
client.Chan <- msg
2024-01-07 14:18:05 +00:00
log.Printf("sent: %+v -> %+v", msg, client.Ip)
2024-01-05 13:31:33 +00:00
}
}
}
2024-01-07 14:18:05 +00:00
func (sse *SseStream) SendMsg(c *gin.Context) {
var msg Message
c.ShouldBind(&msg)
sse.Send(msg)
c.JSON(http.StatusOK, gin.H{"status": "msg"})
}