Skip to content

Commit a62f447

Browse files
Implement WebSocket and SSE functionality for live log streaming; Add testing SSE viewer tool.
1 parent 4af83f0 commit a62f447

File tree

9 files changed

+790
-23
lines changed

9 files changed

+790
-23
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ go.work.sum
2626
*.py
2727
code/
2828
input/
29+
live/
2930

3031
.DS_Store

docker-compose.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
12
services:
23
runner_controller:
3-
image: ghcr.io/evolutionary-algorithms-on-click/runner_controller_microservice:main
4-
# build:
5-
# context: .
6-
# dockerfile: Dockerfile
4+
# image: ghcr.io/evolutionary-algorithms-on-click/runner_controller_microservice:main
5+
build:
6+
context: .
7+
dockerfile: Dockerfile
78
ports:
89
- "5002:5002"
910
environment:

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ go 1.24.0
55
toolchain go1.24.2
66

77
require (
8+
github.com/coder/websocket v1.8.13
9+
github.com/hpcloud/tail v1.0.0
810
github.com/jackc/pgx/v5 v5.7.4
911
github.com/minio/minio-go/v7 v7.0.90
1012
github.com/rabbitmq/amqp091-go v1.10.0
@@ -15,6 +17,7 @@ require (
1517

1618
require (
1719
github.com/dustin/go-humanize v1.0.1 // indirect
20+
github.com/fsnotify/fsnotify v1.9.0 // indirect
1821
github.com/go-ini/ini v1.67.0 // indirect
1922
github.com/goccy/go-json v0.10.5 // indirect
2023
github.com/google/uuid v1.6.0 // indirect
@@ -32,4 +35,6 @@ require (
3235
golang.org/x/sys v0.32.0 // indirect
3336
golang.org/x/text v0.24.0 // indirect
3437
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect
38+
gopkg.in/fsnotify.v1 v1.4.7 // indirect
39+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
3540
)

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
2+
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
13
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
24
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
35
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
46
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
57
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
8+
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
9+
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
610
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
711
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
812
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
@@ -17,6 +21,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1721
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1822
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1923
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
24+
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
25+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
2026
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
2127
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
2228
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -80,6 +86,10 @@ google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd
8086
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
8187
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
8288
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
89+
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
90+
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
91+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
92+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
8393
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
8494
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
8595
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

main.go

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package main
22

33
import (
4-
// "evolve/config"
4+
"context"
5+
"errors"
56
"evolve/controller"
7+
"evolve/modules/sse"
8+
"evolve/modules/ws"
69
"evolve/routes"
710
"evolve/util"
811
"fmt"
9-
"github.com/rs/cors"
12+
"net"
1013
"net/http"
1114
"os"
15+
"os/signal"
16+
"syscall"
17+
"time"
18+
19+
"github.com/rs/cors"
1220
)
1321

1422
var (
@@ -17,32 +25,91 @@ var (
1725
)
1826

1927
func main() {
20-
2128
PORT = fmt.Sprintf(":%s", os.Getenv("HTTP_PORT"))
29+
if PORT == ":" {
30+
PORT = ":5002"
31+
}
2232
FRONTEND_URL = os.Getenv("FRONTEND_URL")
33+
if FRONTEND_URL == "" {
34+
FRONTEND_URL = "http://localhost:3000"
35+
}
2336

2437
var logger = util.NewLogger()
2538

26-
// Register routes.
27-
http.HandleFunc(routes.TEST, controller.Test)
28-
http.HandleFunc(routes.EA, controller.CreateEA)
29-
http.HandleFunc(routes.GP, controller.CreateGP)
30-
http.HandleFunc(routes.ML, controller.CreateML)
31-
http.HandleFunc(routes.PSO, controller.CreatePSO)
32-
http.HandleFunc(routes.RUNS, controller.UserRuns)
33-
http.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
34-
http.HandleFunc(routes.RUN, controller.UserRun)
39+
// ---- Context and Graceful Shutdown Setup ----
40+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
41+
defer stop()
42+
43+
// ---- Start WebSocket Server ----
44+
wsHub := ws.StartServer(ctx, *logger)
45+
logger.Info("WebSocket Hub started.")
46+
47+
// ---- Register HTTP Routes ----
48+
mux := http.DefaultServeMux
49+
50+
mux.HandleFunc(routes.TEST, controller.Test)
51+
mux.HandleFunc(routes.EA, controller.CreateEA)
52+
mux.HandleFunc(routes.GP, controller.CreateGP)
53+
mux.HandleFunc(routes.ML, controller.CreateML)
54+
mux.HandleFunc(routes.PSO, controller.CreatePSO)
55+
mux.HandleFunc(routes.RUNS, controller.UserRuns)
56+
mux.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
57+
mux.HandleFunc(routes.RUN, controller.UserRun)
58+
59+
// WebSocket Route
60+
wsHandler := ws.GetHandler(ctx, wsHub, *logger)
61+
mux.HandleFunc(routes.LIVE, wsHandler)
62+
logger.Info("WebSocket endpoint registered at /live/")
3563

36-
logger.Info(fmt.Sprintf("Test http server on http://localhost%v/api/test", PORT))
64+
// ---- SSE Route ----
65+
sseHandler := sse.GetSSEHandler(*logger)
66+
mux.HandleFunc(routes.LOGS, sseHandler)
67+
logger.Info("SSE endpoint registered at /runs/logs/")
3768

69+
// ---- Log the test endpoint URL ----
70+
logger.Info(fmt.Sprintf("Test http server on http://localhost%s/api/test", PORT))
71+
72+
// ---- CORS Configuration ----
3873
corsHandler := cors.New(cors.Options{
3974
AllowedOrigins: []string{FRONTEND_URL},
40-
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
41-
AllowedHeaders: []string{"*"},
75+
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, // Ensure GET is allowed for SSE
76+
AllowedHeaders: []string{"*"}, // Allow X-RUN-ID header
4277
AllowCredentials: true,
43-
}).Handler(http.DefaultServeMux)
44-
if err := http.ListenAndServe(PORT, corsHandler); err != nil {
45-
logger.Error(fmt.Sprintf("Failed to start server: %v", err))
46-
return
78+
}).Handler(mux)
79+
80+
// ---- HTTP Server Setup and Start ----
81+
server := &http.Server{
82+
Addr: PORT,
83+
Handler: corsHandler,
84+
ReadTimeout: 10 * time.Second,
85+
WriteTimeout: 0, // Set WriteTimeout to 0 for long-lived SSE connections
86+
IdleTimeout: 0, // Set IdleTimeout to 0 for long-lived SSE connections
87+
BaseContext: func(_ net.Listener) context.Context { return ctx },
4788
}
89+
90+
// Start server in a goroutine
91+
go func() {
92+
logger.Info(fmt.Sprintf("HTTP server starting on %s (Frontend: %s)", server.Addr, FRONTEND_URL))
93+
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
94+
logger.Error(fmt.Sprintf("HTTP server ListenAndServe error: %v", err))
95+
stop()
96+
}
97+
}()
98+
99+
// ---- Wait for Shutdown Signal ----
100+
<-ctx.Done()
101+
102+
// ---- Initiate Graceful Shutdown ----
103+
logger.Info("Shutdown signal received. Starting graceful shutdown...")
104+
105+
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
106+
defer cancelShutdown()
107+
108+
if err := server.Shutdown(shutdownCtx); err != nil {
109+
logger.Error(fmt.Sprintf("HTTP server graceful shutdown failed: %v", err))
110+
} else {
111+
logger.Info("HTTP server shutdown complete.")
112+
}
113+
114+
logger.Info("Server exiting.")
48115
}

modules/sse/server.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Package sse provides Server-Sent Events (SSE) endpoint for streaming log files.
2+
package sse
3+
4+
import (
5+
"evolve/util"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"path/filepath"
10+
"strings"
11+
"time"
12+
13+
"github.com/hpcloud/tail"
14+
)
15+
16+
const (
17+
logDir = "live" // Directory where <runId>.str files are stored.
18+
runIdHeader = "X-RUN-ID" // Header key for the run ID.
19+
retrySeconds = 3 // SSE retry interval suggestion for clients.
20+
)
21+
22+
// GetSSEHandler creates and returns an http.HandlerFunc for the SSE endpoint.
23+
// It takes the application's logger as an argument.
24+
func GetSSEHandler(logger util.Logger) http.HandlerFunc {
25+
return func(w http.ResponseWriter, r *http.Request) {
26+
serveSSE(logger, w, r)
27+
}
28+
}
29+
30+
// serveSSE handles incoming SSE requests.
31+
func serveSSE(logger util.Logger, w http.ResponseWriter, r *http.Request) {
32+
ctx := r.Context() // Get context from the request
33+
34+
// --- 1. Get Run ID from Header ---
35+
runId := r.Header.Get(runIdHeader)
36+
if runId == "" {
37+
logger.Warn(fmt.Sprintf("[SSE Handler] Missing %s header", runIdHeader))
38+
http.Error(w, fmt.Sprintf("Missing %s header", runIdHeader), http.StatusBadRequest)
39+
return
40+
}
41+
42+
// --- Basic Sanitize Run ID (Prevent path traversal) ---
43+
// Ensure runId doesn't contain potentially harmful sequences.
44+
// A more robust solution might involve checking against a list of valid run IDs.
45+
if strings.Contains(runId, "..") || strings.ContainsAny(runId, "/\\") {
46+
logger.Warn(fmt.Sprintf("[SSE Handler] Invalid characters in %s header: %s", runIdHeader, runId))
47+
http.Error(w, "Invalid Run ID format", http.StatusBadRequest)
48+
return
49+
}
50+
51+
logFileName := fmt.Sprintf("%s.str", runId)
52+
logFilePath := filepath.Join(logDir, logFileName)
53+
54+
logger.Info(fmt.Sprintf("[SSE Handler] Request received for runId: %s (File: %s)", runId, logFilePath))
55+
56+
// --- 2. Set SSE Headers ---
57+
w.Header().Set("Content-Type", "text/event-stream")
58+
w.Header().Set("Cache-Control", "no-cache")
59+
w.Header().Set("Connection", "keep-alive")
60+
w.Header().Set("X-Accel-Buffering", "no")
61+
62+
// Suggest a retry interval to the client
63+
fmt.Fprintf(w, "retry: %d\n\n", retrySeconds*1000) // retry is in milliseconds
64+
65+
// --- 3. Get Flusher ---
66+
rc := http.NewResponseController(w)
67+
if rc == nil {
68+
logger.Error(fmt.Sprintf("[SSE Handler] Failed to get ResponseController for runId: %s", runId))
69+
http.Error(w, "Failed to get ResponseController", http.StatusInternalServerError)
70+
return
71+
}
72+
73+
if err := rc.Flush(); err != nil {
74+
logger.Error(fmt.Sprintf("[SSE Handler] Error flushing response for runId %s: %v", runId, err))
75+
http.Error(w, "Error flushing response", http.StatusInternalServerError)
76+
return
77+
}
78+
79+
// --- 4. Configure and Start Tailing ---
80+
tailConfig := tail.Config{
81+
Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}, // Start from the beginning of the file
82+
ReOpen: true, // Re-open file if recreated (log rotation)
83+
MustExist: false, // Don't fail if file doesn't exist yet
84+
Poll: true, // Use polling (more reliable across FS types/network mounts than pure inotify) - tune if needed
85+
Follow: true, // Keep following the file for new lines
86+
// Logger: tail.DiscardingLogger, // Uncomment to disable tail library's internal logging
87+
}
88+
89+
tailer, err := tail.TailFile(logFilePath, tailConfig)
90+
if err != nil {
91+
logger.Error(fmt.Sprintf("[SSE Tailing] Failed to start tailing file %s for runId %s: %v", logFilePath, runId, err))
92+
// Don't send HTTP error here as headers are already sent. Client will retry or disconnect.
93+
return
94+
}
95+
96+
// Ensure tailer is stopped when the handler exits
97+
defer func() {
98+
logger.Info(fmt.Sprintf("[SSE Handler] Stopping tailer for runId: %s", runId))
99+
// Stopping the tailer closes its internal channels
100+
if stopErr := tailer.Stop(); stopErr != nil {
101+
logger.Error(fmt.Sprintf("[SSE Tailing] Error stopping tailer for runId %s: %v", runId, stopErr))
102+
}
103+
}()
104+
105+
logger.Info(fmt.Sprintf("[SSE Handler] Started tailing %s for runId: %s", logFilePath, runId))
106+
107+
// --- 5. Event Loop: Send lines and handle disconnect ---
108+
for {
109+
select {
110+
case <-ctx.Done(): // Client disconnected
111+
logger.Info(fmt.Sprintf("[SSE Handler] Client disconnected for runId: %s", runId))
112+
return // Exit handler, defer will stop tailer
113+
114+
case line, ok := <-tailer.Lines: // New line from file
115+
if !ok {
116+
// Channel closed, tailer might have stopped or encountered an error
117+
tailErr := tailer.Err()
118+
if tailErr != nil && tailErr != io.EOF { // io.EOF might occur normally on stop
119+
logger.Error(fmt.Sprintf("[SSE Tailing] Error during tailing for runId %s: %v", runId, tailErr))
120+
} else {
121+
logger.Info(fmt.Sprintf("[SSE Tailing] Tailer lines channel closed for runId: %s", runId))
122+
}
123+
return // Exit handler
124+
}
125+
126+
fmt.Printf("[SSE Handler] New line for runId %s: %s\n", runId, line.Text) // Debug log
127+
128+
// ---> CHECK FOR END MARKER <---
129+
if line.Text == "__END__" {
130+
logger.Info(fmt.Sprintf("[SSE Handler] Detected END marker for run %s. Closing stream.", runId))
131+
return
132+
}
133+
134+
// Format and send SSE message
135+
// Use fmt.Fprintf to write directly to the ResponseWriter
136+
_, writeErr := fmt.Fprintf(w, "data: %s\n\n", line.Text) // SSE format: "data: content\n\n"
137+
138+
if writeErr != nil {
139+
// Likely client disconnected or network error
140+
logger.Warn(fmt.Sprintf("[SSE Handler] Error writing to client for runId %s: %v", runId, writeErr))
141+
return // Exit handler, defer will stop tailer
142+
}
143+
144+
if err := rc.Flush(); err != nil {
145+
logger.Error(fmt.Sprintf("[SSE Handler] Error flushing response for runId %s: %v", runId, err))
146+
return // Exit handler, defer will stop tailer
147+
}
148+
149+
time.Sleep(50 * time.Millisecond)
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)