Skip to content

Commit 8fc0b2f

Browse files
authored
Merge pull request #28 from Evolutionary-Algorithms-On-Click/migrate-live-stream-to-redis-optimize
Integrate Redis for live logs via SSE
2 parents cdbd8cf + 82b9cd6 commit 8fc0b2f

File tree

7 files changed

+298
-412
lines changed

7 files changed

+298
-412
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Runner Controller Microservice
2+
23
Go implementation of the Runner Controller.
34

45
## Setup
@@ -12,19 +13,28 @@ Go implementation of the Runner Controller.
1213
### Installation
1314

1415
1. Install the protobuf-grpc compiler.
16+
1517
```sh
1618
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
1719
export PATH="$PATH:$(go env GOPATH)/bin"
1820
```
21+
1922
2. Export the following environment variables.
23+
2024
```sh
2125
export DATABASE_URL=<cockroachdb_url>
2226
export MINIO_ENDPOINT=<minio_endpoint>
2327
export MINIO_ACCESS_KEY_ID=<minio_access_key>
2428
export MINIO_SECRET_KEY=<minio_secret_key>
2529
export RABBITMQ_URL=<rabbitmq_url>
30+
export FRONTEND_URL=<frontend_url>
31+
export HTTP_PORT=<http_port>
32+
export AUTH_GRPC_ADDRESS=<auth_grpc_address>
33+
export REDIS_URL=<redis_url>
2634
```
35+
2736
3. Run the following command to start the server.
37+
2838
```sh
2939
go run main.go
3040
```
@@ -33,6 +43,7 @@ go run main.go
3343

3444
1. Install protoc compiler
3545
2. Run the following command to generate the go files from the proto files.
46+
3647
```sh
3748
protoc --go_out=./ --go_opt=paths=source_relative \
3849
--go-grpc_out=./ --go-grpc_opt=paths=source_relative \

docker-compose.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ services:
1515
RABBITMQ_URL : amqp://user:[email protected]:5672/
1616
FRONTEND_URL : http://localhost:3000
1717
HTTP_PORT : 5002
18-
AUTH_GRPC_ADDRESS : host.docker.internal:5001
18+
AUTH_GRPC_ADDRESS : host.docker.internal:5001
19+
REDIS_URL: redis://host.docker.internal:6379/0

go.mod

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@ go 1.24.0
55
toolchain go1.24.2
66

77
require (
8-
github.com/coder/websocket v1.8.13
9-
github.com/hpcloud/tail v1.0.0
108
github.com/jackc/pgx/v5 v5.7.4
119
github.com/minio/minio-go/v7 v7.0.91
1210
github.com/rabbitmq/amqp091-go v1.10.0
11+
github.com/redis/go-redis/v9 v9.8.0
1312
github.com/rs/cors v1.11.1
1413
google.golang.org/grpc v1.72.0
1514
google.golang.org/protobuf v1.36.6
1615
)
1716

1817
require (
18+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
19+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
1920
github.com/dustin/go-humanize v1.0.1 // indirect
20-
github.com/fsnotify/fsnotify v1.9.0 // indirect
2121
github.com/go-ini/ini v1.67.0 // indirect
2222
github.com/goccy/go-json v0.10.5 // indirect
2323
github.com/google/uuid v1.6.0 // indirect
@@ -35,6 +35,4 @@ require (
3535
golang.org/x/sys v0.32.0 // indirect
3636
golang.org/x/text v0.24.0 // indirect
3737
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
38-
gopkg.in/fsnotify.v1 v1.4.7 // indirect
39-
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
4038
)

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
2-
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
1+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
2+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
3+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
4+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
5+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
6+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
37
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
48
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
59
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
10+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
11+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
612
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
713
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
8-
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
9-
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
1014
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
1115
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
1216
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
@@ -21,8 +25,6 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
2125
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
2226
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2327
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
24-
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
25-
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
2628
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
2729
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
2830
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -46,6 +48,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
4648
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4749
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
4850
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
51+
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
52+
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
4953
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
5054
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
5155
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
@@ -86,10 +90,6 @@ google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3i
8690
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
8791
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
8892
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
89-
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
90-
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
91-
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
92-
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
9393
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
9494
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
9595
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

main.go

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"evolve/controller"
77
"evolve/modules/sse"
8-
"evolve/modules/ws"
98
"evolve/routes"
109
"evolve/util"
1110
"fmt"
@@ -36,16 +35,19 @@ func main() {
3635

3736
var logger = util.NewLogger()
3837

39-
// Context and Graceful Shutdown Setup.
38+
redisClient, err := sse.GetRedisClient(*logger)
39+
if err != nil {
40+
logger.Error(fmt.Sprintf("Failed to initialize Redis client: %v. Exiting.", err))
41+
os.Exit(1)
42+
}
43+
logger.Info("Redis client initialized successfully.")
44+
45+
// Use context for cancellation signal propagation.
4046
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
4147
defer stop()
4248

43-
// Start WebSocket Server.
44-
wsHub := ws.StartServer(ctx, *logger)
45-
logger.Info("WebSocket Hub started.")
46-
47-
// Register HTTP Routes.
48-
mux := http.DefaultServeMux
49+
// Register HTTP Routes
50+
mux := http.NewServeMux()
4951

5052
mux.HandleFunc(routes.TEST, controller.Test)
5153
mux.HandleFunc(routes.EA, controller.CreateEA)
@@ -56,40 +58,35 @@ func main() {
5658
mux.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
5759
mux.HandleFunc(routes.RUN, controller.UserRun)
5860

59-
// WebSocket Route.
60-
wsHandler := ws.GetHandler(ctx, wsHub, *logger)
61-
mux.HandleFunc(routes.LIVE, wsHandler)
62-
logger.Info("WebSocket endpoint registered at /live/")
63-
64-
// SSE Route.
65-
sseHandler := sse.GetSSEHandler(*logger)
61+
sseHandler := sse.GetSSEHandler(*logger, redisClient)
6662
mux.HandleFunc(routes.LOGS, sseHandler)
67-
logger.Info("SSE endpoint registered at /runs/logs/")
63+
logger.Info(fmt.Sprintf("SSE endpoint registered at %s using Redis Pub/Sub", routes.LOGS))
6864

69-
// Log the test endpoint URL.
7065
logger.Info(fmt.Sprintf("Test http server on http://localhost%s/api/test", PORT))
7166

7267
// CORS Configuration.
7368
corsHandler := cors.New(cors.Options{
7469
AllowedOrigins: []string{FRONTEND_URL},
7570
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
76-
AllowedHeaders: []string{"*"},
71+
AllowedHeaders: []string{"*", "X-RUN-ID"},
72+
ExposedHeaders: []string{},
7773
AllowCredentials: true,
7874
}).Handler(mux)
7975

8076
// HTTP Server Setup and Start.
8177
server := &http.Server{
8278
Addr: PORT,
8379
Handler: corsHandler,
84-
ReadTimeout: 10 * time.Second,
85-
WriteTimeout: 0, // Set WriteTimeout to 0 for long-lived SSE connections.
86-
IdleTimeout: 0, // Set IdleTimeout to 0 for long-lived SSE connections.
80+
ReadTimeout: 15 * time.Second,
81+
WriteTimeout: 0,
82+
IdleTimeout: 0,
8783
BaseContext: func(_ net.Listener) context.Context { return ctx },
8884
}
8985

90-
// Start server in a goroutine.
86+
// Start server in a goroutine so
87+
// it doesn't block the graceful shutdown handling.
9188
go func() {
92-
logger.Info(fmt.Sprintf("HTTP server starting on %s (Frontend: %s)", server.Addr, FRONTEND_URL))
89+
logger.Info(fmt.Sprintf("HTTP server starting on %s (Allowed Frontend Origin: %s)", server.Addr, FRONTEND_URL))
9390
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
9491
logger.Error(fmt.Sprintf("HTTP server ListenAndServe error: %v", err))
9592
stop()
@@ -99,17 +96,26 @@ func main() {
9996
// Wait for Shutdown Signal.
10097
<-ctx.Done()
10198

102-
// Initiate Graceful Shutdown.
10399
logger.Info("Shutdown signal received. Starting graceful shutdown...")
104100

105-
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)
101+
// Create a context with a timeout for the shutdown process.
102+
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 15*time.Second)
106103
defer cancelShutdown()
107104

105+
// Attempt to gracefully shut down the HTTP server.
108106
if err := server.Shutdown(shutdownCtx); err != nil {
109107
logger.Error(fmt.Sprintf("HTTP server graceful shutdown failed: %v", err))
110108
} else {
111109
logger.Info("HTTP server shutdown complete.")
112110
}
113111

112+
// Close Redis Client.
113+
logger.Info("Shutting down Redis client...")
114+
if redisErr := redisClient.Close(); redisErr != nil {
115+
logger.Error(fmt.Sprintf("Redis client shutdown error: %v", redisErr))
116+
} else {
117+
logger.Info("Redis client shutdown complete.")
118+
}
119+
114120
logger.Info("Server exiting.")
115121
}

0 commit comments

Comments
 (0)