216 lines
4.7 KiB
Go
216 lines
4.7 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
|
|
"log"
|
|
)
|
|
|
|
type SurferLive struct {
|
|
Name string `json:"name"`
|
|
Color string `json:"color"`
|
|
Score string `json:"score"`
|
|
Priority string `json:"priority"`
|
|
}
|
|
|
|
type Message struct {
|
|
Surfers []SurferLive `json:"surfers"`
|
|
Priority []string `json:"priority"`
|
|
Heat Heat `json:"heat"`
|
|
Duration string `json:"duration"`
|
|
Source string `json:"source"`
|
|
Msg string `json:"msg"`
|
|
Mode string `json:"mode"`
|
|
}
|
|
|
|
type Client struct {
|
|
Chan ClientChan
|
|
IP IPAddress
|
|
Mode Mode
|
|
}
|
|
|
|
type ClientChan chan Message
|
|
|
|
type IPAddress string
|
|
|
|
type SseStream struct {
|
|
// Events are pushed to this channel by the main events-gathering routine
|
|
Message chan Message
|
|
|
|
// New client connections
|
|
NewClients chan Client //chan string
|
|
|
|
// Closed client connections
|
|
ClosedClients chan ClientChan
|
|
|
|
// Total client connections
|
|
TotalClients map[ClientChan]IPAddress //bool
|
|
|
|
StatusPriority []string
|
|
Duration time.Duration
|
|
Start bool
|
|
Heat *Heat
|
|
}
|
|
|
|
// Initialize event and Start procnteessing requests
|
|
func NewServer() (sse *SseStream) {
|
|
sse = &SseStream{
|
|
Message: make(chan Message),
|
|
NewClients: make(chan Client),
|
|
ClosedClients: make(chan ClientChan),
|
|
TotalClients: make(map[ClientChan]IPAddress),
|
|
Start: false,
|
|
Heat: &Heat{},
|
|
}
|
|
|
|
go sse.listen()
|
|
|
|
go sse.timer()
|
|
|
|
return
|
|
}
|
|
|
|
// It Listens all incoming requests from clients.
|
|
// Handles addition and removal of clients and broadcast messages to clients.
|
|
func (stream *SseStream) listen() {
|
|
for {
|
|
select {
|
|
// Add new available client
|
|
case client := <-stream.NewClients:
|
|
stream.TotalClients[client.Chan] = client.IP
|
|
log.Printf("Client added. %d - %s registered clients", len(stream.TotalClients), client.IP)
|
|
|
|
// Remove closed client
|
|
case client := <-stream.ClosedClients:
|
|
ip := stream.TotalClients[client]
|
|
delete(stream.TotalClients, client)
|
|
close(client)
|
|
log.Printf("Removed client. %d - %s registered clients", len(stream.TotalClients), ip)
|
|
|
|
// Broadcast message to client
|
|
case eventMsg := <-stream.Message:
|
|
for clientMessageChan := range stream.TotalClients {
|
|
clientMessageChan <- eventMsg
|
|
log.Printf("Message %+v sent to %s", eventMsg, stream.TotalClients[clientMessageChan])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func HeadersMiddleware() gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
|
c.Writer.Header().Set("Cache-Control", "no-cache")
|
|
c.Writer.Header().Set("Connection", "keep-alive")
|
|
c.Writer.Header().Set("Transfer-Encoding", "chunked")
|
|
c.Next()
|
|
}
|
|
}
|
|
|
|
func (stream *SseStream) serveHTTP() gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
// Initialize client channel
|
|
clientChan := make(ClientChan)
|
|
|
|
cli := Client{
|
|
Chan: clientChan,
|
|
IP: IPAddress(c.ClientIP()),
|
|
Mode: Priority,
|
|
}
|
|
|
|
// Send new connection to event server
|
|
stream.NewClients <- cli
|
|
|
|
defer func() {
|
|
// Send closed connection to event server
|
|
stream.ClosedClients <- clientChan
|
|
}()
|
|
|
|
c.Set("clientChan", clientChan)
|
|
|
|
c.Next()
|
|
}
|
|
}
|
|
|
|
func (stream *SseStream) retvalSSE() gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
|
|
if len(stream.StatusPriority) > 0 {
|
|
stream.SendPriority(stream.StatusPriority)
|
|
log.Printf("update priority %+v", stream.StatusPriority)
|
|
}
|
|
|
|
v, ok := c.Get("clientChan")
|
|
if !ok {
|
|
return
|
|
}
|
|
clientChan, ok := v.(ClientChan)
|
|
if !ok {
|
|
return
|
|
}
|
|
c.Stream(func(w io.Writer) bool {
|
|
// Stream message to client from message channel
|
|
if msg, ok := <-clientChan; ok {
|
|
c.SSEvent("message", msg)
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
}
|
|
}
|
|
|
|
func (stream *SseStream) SendPriority(pri []string) {
|
|
stream.Message <- Message{
|
|
Priority: pri,
|
|
Mode: Priority.String(),
|
|
}
|
|
}
|
|
|
|
func (stream *SseStream) timer() {
|
|
for {
|
|
if stream.Start {
|
|
timer := time.NewTimer(stream.Duration)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
stream.Start = false
|
|
msg := Message{
|
|
Msg: "stop",
|
|
Mode: Stop.String(),
|
|
}
|
|
stream.Message <- msg
|
|
log.Printf("stop timer %+v", stream.Duration)
|
|
stream.Heat.Status = "ended"
|
|
continue
|
|
default:
|
|
if len(stream.TotalClients) > 0 {
|
|
if stream.Duration >= 0 {
|
|
currentTimer := fmt.Sprintf("%v", formatTime(stream.Duration))
|
|
|
|
msg := Message{
|
|
Duration: currentTimer,
|
|
Mode: Time.String(),
|
|
}
|
|
|
|
// Send current time to clients message channel
|
|
stream.Message <- msg
|
|
stream.Duration = stream.Duration - time.Second
|
|
time.Sleep(time.Second * 1)
|
|
} else {
|
|
timer.Stop()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func formatTime(d time.Duration) string {
|
|
minutes := int(d.Minutes()) % 60
|
|
seconds := int(d.Seconds()) % 60
|
|
return fmt.Sprintf("%02d:%02d", minutes, seconds)
|
|
}
|