Skip to content

Commit 93e1254

Browse files
Make code more readable.
1 parent c22df18 commit 93e1254

File tree

6 files changed

+338
-313
lines changed

6 files changed

+338
-313
lines changed

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ require (
88
github.com/coder/websocket v1.8.13
99
github.com/hpcloud/tail v1.0.0
1010
github.com/jackc/pgx/v5 v5.7.4
11-
github.com/minio/minio-go/v7 v7.0.90
11+
github.com/minio/minio-go/v7 v7.0.91
1212
github.com/rabbitmq/amqp091-go v1.10.0
1313
github.com/rs/cors v1.11.1
14-
google.golang.org/grpc v1.71.1
14+
google.golang.org/grpc v1.72.0
1515
google.golang.org/protobuf v1.36.6
1616
)
1717

@@ -34,7 +34,7 @@ require (
3434
golang.org/x/sync v0.13.0 // indirect
3535
golang.org/x/sys v0.32.0 // indirect
3636
golang.org/x/text v0.24.0 // indirect
37-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect
37+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
3838
gopkg.in/fsnotify.v1 v1.4.7 // indirect
3939
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
4040
)

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY
4040
github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg=
4141
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
4242
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
43-
github.com/minio/minio-go/v7 v7.0.90 h1:TmSj1083wtAD0kEYTx7a5pFsv3iRYMsOJ6A4crjA1lE=
44-
github.com/minio/minio-go/v7 v7.0.90/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go=
43+
github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5JQc=
44+
github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go=
4545
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4646
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4747
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
@@ -79,10 +79,10 @@ golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
7979
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
8080
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
8181
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
82-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e h1:ztQaXfzEXTmCBvbtWYRhJxW+0iJcz2qXfd38/e9l7bA=
83-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
84-
google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI=
85-
google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
82+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
83+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
84+
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
85+
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
8686
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
8787
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
8888
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

main.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ func main() {
3636

3737
var logger = util.NewLogger()
3838

39-
// ---- Context and Graceful Shutdown Setup ----
39+
// Context and Graceful Shutdown Setup.
4040
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
4141
defer stop()
4242

43-
// ---- Start WebSocket Server ----
43+
// Start WebSocket Server.
4444
wsHub := ws.StartServer(ctx, *logger)
4545
logger.Info("WebSocket Hub started.")
4646

47-
// ---- Register HTTP Routes ----
47+
// Register HTTP Routes.
4848
mux := http.DefaultServeMux
4949

5050
mux.HandleFunc(routes.TEST, controller.Test)
@@ -56,38 +56,38 @@ func main() {
5656
mux.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
5757
mux.HandleFunc(routes.RUN, controller.UserRun)
5858

59-
// WebSocket Route
59+
// WebSocket Route.
6060
wsHandler := ws.GetHandler(ctx, wsHub, *logger)
6161
mux.HandleFunc(routes.LIVE, wsHandler)
6262
logger.Info("WebSocket endpoint registered at /live/")
6363

64-
// ---- SSE Route ----
64+
// SSE Route.
6565
sseHandler := sse.GetSSEHandler(*logger)
6666
mux.HandleFunc(routes.LOGS, sseHandler)
6767
logger.Info("SSE endpoint registered at /runs/logs/")
6868

69-
// ---- Log the test endpoint URL ----
69+
// Log the test endpoint URL.
7070
logger.Info(fmt.Sprintf("Test http server on http://localhost%s/api/test", PORT))
7171

72-
// ---- CORS Configuration ----
72+
// CORS Configuration.
7373
corsHandler := cors.New(cors.Options{
7474
AllowedOrigins: []string{FRONTEND_URL},
75-
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, // Ensure GET is allowed for SSE
76-
AllowedHeaders: []string{"*"}, // Allow X-RUN-ID header
75+
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
76+
AllowedHeaders: []string{"*"},
7777
AllowCredentials: true,
7878
}).Handler(mux)
7979

80-
// ---- HTTP Server Setup and Start ----
80+
// HTTP Server Setup and Start.
8181
server := &http.Server{
8282
Addr: PORT,
8383
Handler: corsHandler,
8484
ReadTimeout: 10 * time.Second,
85-
WriteTimeout: 0, // Set WriteTimeout to 0 for long-lived SSE connections
86-
IdleTimeout: 0, // Set IdleTimeout to 0 for long-lived SSE connections
85+
WriteTimeout: 0, // Set WriteTimeout to 0 for long-lived SSE connections.
86+
IdleTimeout: 0, // Set IdleTimeout to 0 for long-lived SSE connections.
8787
BaseContext: func(_ net.Listener) context.Context { return ctx },
8888
}
8989

90-
// Start server in a goroutine
90+
// Start server in a goroutine.
9191
go func() {
9292
logger.Info(fmt.Sprintf("HTTP server starting on %s (Frontend: %s)", server.Addr, FRONTEND_URL))
9393
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
@@ -96,10 +96,10 @@ func main() {
9696
}
9797
}()
9898

99-
// ---- Wait for Shutdown Signal ----
99+
// Wait for Shutdown Signal.
100100
<-ctx.Done()
101101

102-
// ---- Initiate Graceful Shutdown ----
102+
// Initiate Graceful Shutdown.
103103
logger.Info("Shutdown signal received. Starting graceful shutdown...")
104104

105105
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 10*time.Second)

modules/sse/server.go

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// Package sse provides Server-Sent Events (SSE) endpoint for streaming log files.
21
package sse
32

43
import (
@@ -20,8 +19,8 @@ const (
2019
sseDoneEvent = "done" // Event name for the end of the stream.
2120
)
2221

23-
// GetSSEHandler creates and returns an http.HandlerFunc for the SSE endpoint.
24-
// It takes the application's logger as an argument.
22+
// GetSSEHandler creates and returns an
23+
// http.HandlerFunc for the SSE endpoint.
2524
func GetSSEHandler(logger util.Logger) http.HandlerFunc {
2625
return func(w http.ResponseWriter, r *http.Request) {
2726
serveSSE(logger, w, r)
@@ -30,19 +29,17 @@ func GetSSEHandler(logger util.Logger) http.HandlerFunc {
3029

3130
// serveSSE handles incoming SSE requests.
3231
func serveSSE(logger util.Logger, w http.ResponseWriter, r *http.Request) {
33-
ctx := r.Context() // Get context from the request
32+
ctx := r.Context()
3433

35-
// --- 1. Get Run ID from Header ---
34+
// Get Run ID from Header.
3635
runId := r.Header.Get(runIdHeader)
3736
if runId == "" {
3837
logger.Warn(fmt.Sprintf("[SSE Handler] Missing %s header", runIdHeader))
3938
http.Error(w, fmt.Sprintf("Missing %s header", runIdHeader), http.StatusBadRequest)
4039
return
4140
}
4241

43-
// --- Basic Sanitize Run ID (Prevent path traversal) ---
44-
// Ensure runId doesn't contain potentially harmful sequences.
45-
// A more robust solution might involve checking against a list of valid run IDs.
42+
// Basic Sanitize Run ID (Prevent path traversal)
4643
if strings.Contains(runId, "..") || strings.ContainsAny(runId, "/\\") {
4744
logger.Warn(fmt.Sprintf("[SSE Handler] Invalid characters in %s header: %s", runIdHeader, runId))
4845
http.Error(w, "Invalid Run ID format", http.StatusBadRequest)
@@ -54,16 +51,15 @@ func serveSSE(logger util.Logger, w http.ResponseWriter, r *http.Request) {
5451

5552
logger.Info(fmt.Sprintf("[SSE Handler] Request received for runId: %s (File: %s)", runId, logFilePath))
5653

57-
// --- 2. Set SSE Headers ---
54+
// Set SSE Headers.
5855
w.Header().Set("Content-Type", "text/event-stream")
5956
w.Header().Set("Cache-Control", "no-cache")
6057
w.Header().Set("Connection", "keep-alive")
6158
w.Header().Set("X-Accel-Buffering", "no")
6259

63-
// Suggest a retry interval to the client
64-
fmt.Fprintf(w, "retry: %d\n\n", retrySeconds*1000) // retry is in milliseconds
60+
// Suggest a retry interval to the client.
61+
fmt.Fprintf(w, "retry: %ds\n\n", retrySeconds)
6562

66-
// --- 3. Get Flusher ---
6763
rc := http.NewResponseController(w)
6864
if rc == nil {
6965
logger.Error(fmt.Sprintf("[SSE Handler] Failed to get ResponseController for runId: %s", runId))
@@ -77,14 +73,13 @@ func serveSSE(logger util.Logger, w http.ResponseWriter, r *http.Request) {
7773
return
7874
}
7975

80-
// --- 4. Configure and Start Tailing ---
76+
// Configure and Start Tailing.
8177
tailConfig := tail.Config{
82-
Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}, // Start from the beginning of the file
83-
ReOpen: true, // Re-open file if recreated (log rotation)
84-
MustExist: false, // Don't fail if file doesn't exist yet
85-
Poll: true, // Use polling (more reliable across FS types/network mounts than pure inotify) - tune if needed
86-
Follow: true, // Keep following the file for new lines
87-
// Logger: tail.DiscardingLogger, // Uncomment to disable tail library's internal logging
78+
Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart},
79+
ReOpen: true,
80+
MustExist: false,
81+
Poll: true,
82+
Follow: true,
8883
}
8984

9085
tailer, err := tail.TailFile(logFilePath, tailConfig)
@@ -94,70 +89,73 @@ func serveSSE(logger util.Logger, w http.ResponseWriter, r *http.Request) {
9489
return
9590
}
9691

97-
// Ensure tailer is stopped when the handler exits
92+
// Ensure tailer is stopped when the handler exits.
9893
defer func() {
9994
logger.Info(fmt.Sprintf("[SSE Handler] Stopping tailer for runId: %s", runId))
100-
// Stopping the tailer closes its internal channels
95+
// Stopping the tailer closes its internal channels.
10196
if stopErr := tailer.Stop(); stopErr != nil {
10297
logger.Error(fmt.Sprintf("[SSE Tailing] Error stopping tailer for runId %s: %v", runId, stopErr))
10398
}
10499
}()
105100

106101
logger.Info(fmt.Sprintf("[SSE Handler] Started tailing %s for runId: %s", logFilePath, runId))
107102

108-
// --- 5. Event Loop: Send lines and handle disconnect ---
103+
// Event Loop - Send lines and handle disconnect.
109104
for {
110105
select {
111-
case <-ctx.Done(): // Client disconnected
106+
case <-ctx.Done():
107+
// Client disconnected.
112108
logger.Info(fmt.Sprintf("[SSE Handler] Client disconnected for runId: %s", runId))
113-
return // Exit handler, defer will stop tailer
109+
return
114110

115-
case line, ok := <-tailer.Lines: // New line from file
111+
case line, ok := <-tailer.Lines:
116112
if !ok {
117-
// Channel closed, tailer might have stopped or encountered an error
113+
// Channel closed, tailer might
114+
// have stopped or encountered an error.
118115
tailErr := tailer.Err()
119-
if tailErr != nil && tailErr != io.EOF { // io.EOF might occur normally on stop
116+
if tailErr != nil && tailErr != io.EOF {
120117
logger.Error(fmt.Sprintf("[SSE Tailing] Error during tailing for runId %s: %v", runId, tailErr))
121118
} else {
122119
logger.Info(fmt.Sprintf("[SSE Tailing] Tailer lines channel closed for runId: %s", runId))
123120
}
124-
return // Exit handler
121+
return
125122
}
126123

127-
fmt.Printf("[SSE Handler] New line for runId %s: %s\n", runId, line.Text) // Debug log
124+
// Debug log.
125+
fmt.Printf("[SSE Handler] New line for runId %s: %s\n", runId, line.Text)
128126

129-
// ---> CHECK FOR END MARKER <---
130127
if line.Text == "__END__" {
131128
logger.Info(fmt.Sprintf("[SSE Handler] Detected END marker for run %s. Sending '%s' event and closing stream.", runId, sseDoneEvent))
132-
// Send the specific "done" event
129+
130+
// Send SSE event indicating stream end.
133131
_, writeErr := fmt.Fprintf(w, "event: %s\ndata: {\"message\": \"Stream ended.\"}\n\n", sseDoneEvent)
134132
if writeErr != nil {
135-
// Log error but still attempt to flush and return
136133
logger.Warn(fmt.Sprintf("[SSE Handler] Error writing '%s' event for runId %s: %v", sseDoneEvent, runId, writeErr))
137134
}
138-
// Attempt to flush the final event
135+
136+
// Attempt to flush the final event.
139137
if err := rc.Flush(); err != nil {
140138
logger.Error(fmt.Sprintf("[SSE Handler] Error flushing final event for runId %s: %v", runId, err))
141-
return // Exit handler, defer will stop tailer
139+
return
142140
}
143-
return // <<< EXIT HANDLER HERE after sending event
141+
return
144142
}
145143

146-
// Format and send SSE message
147-
// Use fmt.Fprintf to write directly to the ResponseWriter
148-
_, writeErr := fmt.Fprintf(w, "data: %s\n\n", line.Text) // SSE format: "data: content\n\n"
144+
// Format and send SSE message.
145+
// SSE format: "data: content\n\n"
146+
_, writeErr := fmt.Fprintf(w, "data: %s\n\n", line.Text)
149147

150148
if writeErr != nil {
151-
// Likely client disconnected or network error
152149
logger.Warn(fmt.Sprintf("[SSE Handler] Error writing to client for runId %s: %v", runId, writeErr))
153-
return // Exit handler, defer will stop tailer
150+
return
154151
}
155152

156153
if err := rc.Flush(); err != nil {
157154
logger.Error(fmt.Sprintf("[SSE Handler] Error flushing response for runId %s: %v", runId, err))
158-
return // Exit handler, defer will stop tailer
155+
return
159156
}
160157

158+
// Sleep to prevent overwhelming the client.
161159
time.Sleep(50 * time.Millisecond)
162160
}
163161
}

0 commit comments

Comments
 (0)