=sse: events

This commit is contained in:
Miki 2023-12-20 13:06:41 +01:00
parent f4462e5df1
commit 1f96ed0930
28 changed files with 209 additions and 138 deletions

View file

@ -1,9 +1,12 @@
package main
import (
"encoding/json"
"crypto/sha256"
"fmt"
"io"
"log"
"strconv"
"strings"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
@ -15,14 +18,20 @@ type Message struct {
Id string `json:"id"`
}
type Event struct {
Message chan Message
NewClient chan chan Message
ClosedClient chan chan Message
TotalClients map[chan Message]bool
type ClientChan chan Message
type Client struct {
Id string `json:"id"`
Ip string `json:"ip"`
Chan ClientChan `json:"chan"`
Events []string `json:"events"`
}
type ClientChan chan Message
type Event struct {
Clients []Client `json:"clients"`
Id int `json:"id"`
Events []string `json:"events"`
}
func main() {
@ -32,10 +41,12 @@ func main() {
event := NewServer()
api := r.Group("/api")
api.GET("/sse", HeadersMiddleware(), event.serveHTTP(), event.retvalSSE())
api.GET("/msg", event.SendNewMsg())
api.GET("/ping", event.SendPing())
api.GET("/pong", event.SendPong())
// api.GET("/sse", HeadersMiddleware(), event.serveHTTP(), event.retvalSSE())
api.GET("/sse", HeadersMiddleware(), event.Stream)
api.GET("/msg", event.SendMsg)
api.GET("/message", event.SendMessage)
api.GET("/ping", event.SendPing)
api.GET("/pong", event.SendPong)
sse := r.Group("/sse")
sse.Static("/", "./static/sse")
@ -56,16 +67,16 @@ func main() {
}
func NewServer() *Event {
event := &Event{
Message: make(chan Message),
NewClient: make(chan chan string),
ClosedClient: make(chan chan string),
TotalClients: make(map[chan string]bool),
return &Event{
Clients: []Client{},
Id: 0,
Events: []string{
"ping",
"pong",
"msg",
"message",
},
}
go event.Broadcast()
return event
}
func HeadersMiddleware() gin.HandlerFunc {
@ -79,98 +90,146 @@ func HeadersMiddleware() gin.HandlerFunc {
}
}
func (e *Event) serveHTTP() gin.HandlerFunc {
return func(c *gin.Context) {
// Initialize client channel
clientChan := make(ClientChan)
func (e *Event) Stream(c *gin.Context) {
client := &Client{
Ip: c.ClientIP(),
Chan: make(ClientChan),
Events: c.QueryArray("events"),
}
// Send new connection to event server
e.NewClient <- clientChan
client.Id = Id(client.Ip, client.Events, 8)
defer func() {
// Send closed connection to event server
e.ClosedClient <- clientChan
}()
log.Printf("Client: %+v", client)
c.Set("clientChan", clientChan)
e.Clients = append(e.Clients, *client)
c.Next()
defer func() {
e.Clients = remove(e.Clients, client)
close(client.Chan)
log.Printf("Client %s disconnected - tot: %d", client.Ip, len(e.Clients))
}()
log.Printf("Client %s connected - tot: %d", client.Ip, len(e.Clients))
c.Stream(func(w io.Writer) bool {
msg, ok := <-client.Chan
if ok {
c.SSEvent(msg.Event, msg)
}
return ok
})
}
func (e *Event) Send(msg Message) {
e.Id++
msg.Id = strconv.Itoa(e.Id)
for _, client := range e.Clients {
client.Chan <- msg
}
}
func (e *Event) retvalSSE() gin.HandlerFunc {
return func(c *gin.Context) {
v, ok := c.Get("clientChan")
if !ok {
return
}
clientChan, ok := v.(ClientChan)
if !ok {
return
}
c.Stream(func(w io.Writer) bool {
msg, ok := <-clientChan
if ok {
c.SSEvent("message", msg)
}
return ok
})
}
func (e *Event) SendMessage(c *gin.Context) {
e.Send(Message{
Event: "message",
Data: "Ciao",
})
}
func (e *Event) Broadcast() {
id := 0
for {
select {
case client := <-e.NewClient:
e.TotalClients[client] = true
case client := <-e.ClosedClient:
delete(e.TotalClients, client)
close(client)
case message := <-e.Message:
id++
message.Id = strconv.Itoa(id)
for client := range e.TotalClients {
// msg, _ := json.Marshal(message)
// client <- string(msg)
client <- message
}
func (e *Event) SendMsg(c *gin.Context) {
e.Send(Message{
Event: "msg",
Data: "hello",
})
}
func (e *Event) SendPing(c *gin.Context) {
e.Send(Message{
Event: "ping",
Data: "Ping",
})
}
func (e *Event) SendPong(c *gin.Context) {
e.Send(Message{
Event: "pong",
Data: "Pong",
})
}
func remove(slice []Client, s *Client) []Client {
for i, v := range slice {
if v.Id == s.Id {
return append(slice[:i], slice[i+1:]...)
}
}
return slice
}
func (e *Event) SendNewMsg() gin.HandlerFunc {
return func(c *gin.Context) {
e.SendMsg(Message{
Event: "message",
Data: "hello",
Id: "1",
})
}
func Id(ip string, events []string, len int) string {
str := ip + strings.Join(events, "|")
id := sha256.Sum256([]byte(str))
return fmt.Sprintf("%X", id)[0:len]
}
func (e *Event) SendPing() gin.HandlerFunc {
return func(c *gin.Context) {
e.SendMsg(Message{
Event: "ping",
Data: "hello",
Id: "1",
})
}
}
// func (e *Event) serveHTTP() gin.HandlerFunc {
// return func(c *gin.Context) {
// // Initialize client channel
// clientChan := make(ClientChan)
func (e *Event) SendPong() gin.HandlerFunc {
return func(c *gin.Context) {
e.SendMsg(Message{
Event: "pong",
Data: "hello",
Id: "1",
})
}
}
// // Send new connection to event server
// e.NewClient <- clientChan
func (e *Event) SendMsg(msg Message) {
e.Message <- msg
}
// defer func() {
// // Send closed connection to event server
// e.ClosedClient <- clientChan
// }()
// c.Set("clientChan", clientChan)
// c.Next()
// }
// }
// func (e *Event) retvalSSE() gin.HandlerFunc {
// return func(c *gin.Context) {
// v, ok := c.Get("clientChan")
// if !ok {
// return
// }
// clientChan, ok := v.(ClientChan)
// if !ok {
// return
// }
// c.Stream(func(w io.Writer) bool {
// msg, ok := <-clientChan
// if ok {
// c.SSEvent("message", msg)
// }
// return ok
// })
// }
// }
// func (e *Event) Broadcast() {
// id := 0
// for {
// select {
// case client := <-e.NewClient:
// e.TotalClients[client] = true
// case client := <-e.ClosedClient:
// delete(e.TotalClients, client)
// close(client)
// case message := <-e.Message:
// id++
// for client := range e.TotalClients {
// // msg, _ := json.Marshal(message)
// // client <- string(msg)
// client <- message
// }
// }
// }
// }