Skip to content

Commit cdbd8cf

Browse files
Merge pull request #27 from Evolutionary-Algorithms-On-Click/live-stream-logs-sse
Implement WebSocket and SSE functionality for live log streaming; Add testing SSE viewer tool.
2 parents 4af83f0 + 93e1254 commit cdbd8cf

File tree

9 files changed

+834
-30
lines changed

9 files changed

+834
-30
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ go.work.sum
2626
*.py
2727
code/
2828
input/
29+
live/
2930

3031
.DS_Store

docker-compose.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
12
services:
23
runner_controller:
3-
image: ghcr.io/evolutionary-algorithms-on-click/runner_controller_microservice:main
4-
# build:
5-
# context: .
6-
# dockerfile: Dockerfile
4+
# image: ghcr.io/evolutionary-algorithms-on-click/runner_controller_microservice:main
5+
build:
6+
context: .
7+
dockerfile: Dockerfile
78
ports:
89
- "5002:5002"
910
environment:

go.mod

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@ go 1.24.0
55
toolchain go1.24.2
66

77
require (
8+
github.com/coder/websocket v1.8.13
9+
github.com/hpcloud/tail v1.0.0
810
github.com/jackc/pgx/v5 v5.7.4
9-
github.com/minio/minio-go/v7 v7.0.90
11+
github.com/minio/minio-go/v7 v7.0.91
1012
github.com/rabbitmq/amqp091-go v1.10.0
1113
github.com/rs/cors v1.11.1
12-
google.golang.org/grpc v1.71.1
14+
google.golang.org/grpc v1.72.0
1315
google.golang.org/protobuf v1.36.6
1416
)
1517

1618
require (
1719
github.com/dustin/go-humanize v1.0.1 // indirect
20+
github.com/fsnotify/fsnotify v1.9.0 // indirect
1821
github.com/go-ini/ini v1.67.0 // indirect
1922
github.com/goccy/go-json v0.10.5 // indirect
2023
github.com/google/uuid v1.6.0 // indirect
@@ -31,5 +34,7 @@ require (
3134
golang.org/x/sync v0.13.0 // indirect
3235
golang.org/x/sys v0.32.0 // indirect
3336
golang.org/x/text v0.24.0 // indirect
34-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect
37+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
38+
gopkg.in/fsnotify.v1 v1.4.7 // indirect
39+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
3540
)

go.sum

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
2+
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
13
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
24
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
35
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
46
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
57
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
8+
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
9+
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
610
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
711
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
812
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
@@ -17,6 +21,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1721
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1822
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1923
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
24+
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
25+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
2026
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
2127
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
2228
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -34,8 +40,8 @@ github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY
3440
github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
3541
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
3642
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
37-
github.com/minio/minio-go/v7 v7.0.90 h1:TmSj1083wtAD0kEYTx7a5pFsv3iRYMsOJ6A4crjA1lE=
38-
github.com/minio/minio-go/v7 v7.0.90/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go=
43+
github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5JQc=
44+
github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go=
3945
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4046
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4147
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
@@ -73,13 +79,17 @@ golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
7379
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
7480
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
7581
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
76-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e h1:ztQaXfzEXTmCBvbtWYRhJxW+0iJcz2qXfd38/e9l7bA=
77-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
78-
google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI=
79-
google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
82+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
83+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
84+
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
85+
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
8086
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
8187
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
8288
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
89+
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
90+
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
91+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
92+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
8393
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
8494
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
8595
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

main.go

Lines changed: 84 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package main
22

33
import (
4-
// "evolve/config"
4+
"context"
5+
"errors"
56
"evolve/controller"
7+
"evolve/modules/sse"
8+
"evolve/modules/ws"
69
"evolve/routes"
710
"evolve/util"
811
"fmt"
9-
"github.com/rs/cors"
12+
"net"
1013
"net/http"
1114
"os"
15+
"os/signal"
16+
"syscall"
17+
"time"
18+
19+
"github.com/rs/cors"
1220
)
1321

1422
var (
@@ -17,32 +25,91 @@ var (
1725
)
1826

1927
func main() {
20-
2128
PORT = fmt.Sprintf(":%s", os.Getenv("HTTP_PORT"))
29+
if PORT == ":" {
30+
PORT = ":5002"
31+
}
2232
FRONTEND_URL = os.Getenv("FRONTEND_URL")
33+
if FRONTEND_URL == "" {
34+
FRONTEND_URL = "http://localhost:3000"
35+
}
2336

2437
var logger = util.NewLogger()
2538

26-
// Register routes.
27-
http.HandleFunc(routes.TEST, controller.Test)
28-
http.HandleFunc(routes.EA, controller.CreateEA)
29-
http.HandleFunc(routes.GP, controller.CreateGP)
30-
http.HandleFunc(routes.ML, controller.CreateML)
31-
http.HandleFunc(routes.PSO, controller.CreatePSO)
32-
http.HandleFunc(routes.RUNS, controller.UserRuns)
33-
http.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
34-
http.HandleFunc(routes.RUN, controller.UserRun)
39+
// Context and Graceful Shutdown Setup.
40+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
41+
defer stop()
42+
43+
// Start WebSocket Server.
44+
wsHub := ws.StartServer(ctx, *logger)
45+
logger.Info("WebSocket Hub started.")
46+
47+
// Register HTTP Routes.
48+
mux := http.DefaultServeMux
49+
50+
mux.HandleFunc(routes.TEST, controller.Test)
51+
mux.HandleFunc(routes.EA, controller.CreateEA)
52+
mux.HandleFunc(routes.GP, controller.CreateGP)
53+
mux.HandleFunc(routes.ML, controller.CreateML)
54+
mux.HandleFunc(routes.PSO, controller.CreatePSO)
55+
mux.HandleFunc(routes.RUNS, controller.UserRuns)
56+
mux.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
57+
mux.HandleFunc(routes.RUN, controller.UserRun)
58+
59+
// WebSocket Route.
60+
wsHandler := ws.GetHandler(ctx, wsHub, *logger)
61+
mux.HandleFunc(routes.LIVE, wsHandler)
62+
logger.Info("WebSocket endpoint registered at /live/")
3563

36-
logger.Info(fmt.Sprintf("Test http server on http://localhost%v/api/test", PORT))
64+
// SSE Route.
65+
sseHandler := sse.GetSSEHandler(*logger)
66+
mux.HandleFunc(routes.LOGS, sseHandler)
67+
logger.Info("SSE endpoint registered at /runs/logs/")
3768

69+
// Log the test endpoint URL.
70+
logger.Info(fmt.Sprintf("Test http server on http://localhost%s/api/test", PORT))
71+
72+
// CORS Configuration.
3873
corsHandler := cors.New(cors.Options{
3974
AllowedOrigins: []string{FRONTEND_URL},
4075
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
4176
AllowedHeaders: []string{"*"},
4277
AllowCredentials: true,
43-
}).Handler(http.DefaultServeMux)
44-
if err := http.ListenAndServe(PORT, corsHandler); err != nil {
45-
logger.Error(fmt.Sprintf("Failed to start server: %v", err))
46-
return
78+
}).Handler(mux)
79+
80+
// HTTP Server Setup and Start.
81+
server := &http.Server{
82+
Addr: PORT,
83+
Handler: corsHandler,
84+
ReadTimeout: 10 * time.Second,
85+
WriteTimeout: 0, // Set WriteTimeout to 0 for long-lived SSE connections.
86+
IdleTimeout: 0, // Set IdleTimeout to 0 for long-lived SSE connections.
87+
BaseContext: func(_ net.Listener) context.Context { return ctx },
4788
}
89+
90+
// Start server in a goroutine.
91+
go func() {
92+
logger.Info(fmt.Sprintf("HTTP server starting on %s (Frontend: %s)", server.Addr, FRONTEND_URL))
93+
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
94+
logger.Error(fmt.Sprintf("HTTP server ListenAndServe error: %v", err))
95+
stop()
96+
}
97+
}()
98+
99+
// Wait for Shutdown Signal.
100+
<-ctx.Done()
101+
102+
// Initiate Graceful Shutdown.
103+
logger.Info("Shutdown signal received. Starting graceful shutdown...")
104+
105+
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
106+
defer cancelShutdown()
107+
108+
if err := server.Shutdown(shutdownCtx); err != nil {
109+
logger.Error(fmt.Sprintf("HTTP server graceful shutdown failed: %v", err))
110+
} else {
111+
logger.Info("HTTP server shutdown complete.")
112+
}
113+
114+
logger.Info("Server exiting.")
48115
}

0 commit comments

Comments
 (0)