+sse test

This commit is contained in:
Miki 2023-12-19 16:56:10 +01:00
parent 19cf309392
commit 2289aa31fd
61 changed files with 2250 additions and 0 deletions

175
sse/be/main.go Normal file
View file

@ -0,0 +1,175 @@
package main
import (
"encoding/json"
"io"
"strconv"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
)
type Message struct {
Event string `json:"event"`
Data string `json:"data"`
Id string `json:"id"`
}
type Event struct {
Message chan Message
NewClient chan chan string
ClosedClient chan chan string
TotalClients map[chan string]bool
}
type ClientChan chan string
func main() {
r := gin.Default()
r.Use(cors.Default())
event := NewServer()
api := r.Group("/api")
api.GET("/sse", HeadersMiddleware(), event.serveHTTP(), event.retvalSSE())
api.GET("/msg", event.SendNewMsg())
api.GET("/ping", event.SendPing())
api.GET("/pong", event.SendPong())
sse := r.Group("/sse")
sse.Static("/", "./static/sse")
app := r.Group("/_app")
app.Static("/", "./static/_app")
static := r.Group("/static")
static.Static("/", "./static/static")
r.StaticFile("/", "./static/index.html")
r.StaticFile("/favicon.png", "./static/favicon.png")
r.ForwardedByClientIP = true
r.SetTrustedProxies([]string{"127.0.0.1"})
r.Run(":8888")
}
func NewServer() *Event {
event := &Event{
Message: make(chan Message),
NewClient: make(chan chan string),
ClosedClient: make(chan chan string),
TotalClients: make(map[chan string]bool),
}
go event.Broadcast()
return event
}
func HeadersMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
headers := c.Writer.Header()
headers.Set("Content-Type", "text/event-stream")
headers.Set("Cache-Control", "no-cache")
headers.Set("Connection", "keep-alive")
headers.Set("Transfer-Encoding", "chunked")
c.Next()
}
}
func (e *Event) serveHTTP() gin.HandlerFunc {
return func(c *gin.Context) {
// Initialize client channel
clientChan := make(ClientChan)
// Send new connection to event server
e.NewClient <- clientChan
defer func() {
// Send closed connection to event server
e.ClosedClient <- clientChan
}()
c.Set("clientChan", clientChan)
c.Next()
}
}
func (e *Event) retvalSSE() gin.HandlerFunc {
return func(c *gin.Context) {
v, ok := c.Get("clientChan")
if !ok {
return
}
clientChan, ok := v.(ClientChan)
if !ok {
return
}
c.Stream(func(w io.Writer) bool {
msg, ok := <-clientChan
if ok {
c.SSEvent("message", msg)
}
return ok
})
}
}
func (e *Event) Broadcast() {
id := 0
for {
select {
case client := <-e.NewClient:
e.TotalClients[client] = true
case client := <-e.ClosedClient:
delete(e.TotalClients, client)
close(client)
case message := <-e.Message:
id++
message.Id = strconv.Itoa(id)
for client := range e.TotalClients {
msg, _ := json.Marshal(message)
client <- string(msg)
}
}
}
}
func (e *Event) SendNewMsg() gin.HandlerFunc {
return func(c *gin.Context) {
e.SendMsg(Message{
Event: "message",
Data: "hello",
Id: "1",
})
}
}
func (e *Event) SendPing() gin.HandlerFunc {
return func(c *gin.Context) {
e.SendMsg(Message{
Event: "ping",
Data: "hello",
Id: "1",
})
}
}
func (e *Event) SendPong() gin.HandlerFunc {
return func(c *gin.Context) {
e.SendMsg(Message{
Event: "pong",
Data: "hello",
Id: "1",
})
}
}
func (e *Event) SendMsg(msg Message) {
e.Message <- msg
}