Skip to content

Commit 8cc8ad6

Browse files
Merge pull request #33 from Naganathan05/main
Migration to Redis List from RabbitMQ
2 parents 8fc0b2f + 1b1bf83 commit 8cc8ad6

File tree

8 files changed

+73
-93
lines changed

8 files changed

+73
-93
lines changed

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ Go implementation of the Runner Controller.
99
1. GoLang - Install GoLang.
1010
2. MinIO - Start a MinIO server instance.
1111
3. CockroachDB - Start a CockroachDB instance.
12+
4. Redis - Start a Redis instance.
13+
14+
(or) Use Docker Compose to start the services.
15+
16+
```sh
17+
docker compose up -d
18+
```
1219

1320
### Installation
1421

@@ -26,11 +33,11 @@ export DATABASE_URL=<cockroachdb_url>
2633
export MINIO_ENDPOINT=<minio_endpoint>
2734
export MINIO_ACCESS_KEY_ID=<minio_access_key>
2835
export MINIO_SECRET_KEY=<minio_secret_key>
29-
export RABBITMQ_URL=<rabbitmq_url>
3036
export FRONTEND_URL=<frontend_url>
3137
export HTTP_PORT=<http_port>
3238
export AUTH_GRPC_ADDRESS=<auth_grpc_address>
3339
export REDIS_URL=<redis_url>
40+
export REDIS_QUEUE_NAME=<redis_queue_name>
3441
```
3542

3643
3. Run the following command to start the server.

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ services:
1212
MINIO_ENDPOINT : host.docker.internal:9000
1313
MINIO_ACCESS_KEY_ID : minioadmin
1414
MINIO_SECRET_KEY : minioadmin
15-
RABBITMQ_URL : amqp://user:[email protected]:5672/
1615
FRONTEND_URL : http://localhost:3000
1716
HTTP_PORT : 5002
1817
AUTH_GRPC_ADDRESS : host.docker.internal:5001
19-
REDIS_URL: redis://host.docker.internal:6379/0
18+
REDIS_URL: redis://host.docker.internal:6379/0
19+
REDIS_QUEUE_NAME: task_queue

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ toolchain go1.24.2
77
require (
88
github.com/jackc/pgx/v5 v5.7.4
99
github.com/minio/minio-go/v7 v7.0.91
10-
github.com/rabbitmq/amqp091-go v1.10.0
1110
github.com/redis/go-redis/v9 v9.8.0
1211
github.com/rs/cors v1.11.1
1312
google.golang.org/grpc v1.72.0

go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5
4646
github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go=
4747
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4848
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
49-
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
50-
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
5149
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
5250
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
5351
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
@@ -71,8 +69,6 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce
7169
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
7270
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
7371
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
74-
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
75-
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
7672
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
7773
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
7874
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=

main.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func main() {
3535

3636
var logger = util.NewLogger()
3737

38-
redisClient, err := sse.GetRedisClient(*logger)
38+
err := util.InitRedisClient(*logger)
3939
if err != nil {
4040
logger.Error(fmt.Sprintf("Failed to initialize Redis client: %v. Exiting.", err))
4141
os.Exit(1)
@@ -58,7 +58,7 @@ func main() {
5858
mux.HandleFunc(routes.SHARE_RUN, controller.ShareRun)
5959
mux.HandleFunc(routes.RUN, controller.UserRun)
6060

61-
sseHandler := sse.GetSSEHandler(*logger, redisClient)
61+
sseHandler := sse.GetSSEHandler(*logger)
6262
mux.HandleFunc(routes.LOGS, sseHandler)
6363
logger.Info(fmt.Sprintf("SSE endpoint registered at %s using Redis Pub/Sub", routes.LOGS))
6464

@@ -110,12 +110,7 @@ func main() {
110110
}
111111

112112
// Close Redis Client.
113-
logger.Info("Shutting down Redis client...")
114-
if redisErr := redisClient.Close(); redisErr != nil {
115-
logger.Error(fmt.Sprintf("Redis client shutdown error: %v", redisErr))
116-
} else {
117-
logger.Info("Redis client shutdown complete.")
118-
}
113+
util.ShutDownRedisClient(*logger)
119114

120115
logger.Info("Server exiting.")
121116
}

modules/sse/server.go

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"evolve/util"
88
"fmt"
99
"net/http"
10-
"os"
1110
"strings"
1211
"time"
1312

@@ -33,41 +32,17 @@ type redisLogPayload struct {
3332
RunID string `json:"runId"` // From EOF message.
3433
}
3534

36-
// GetRedisClient initializes a Redis client.
37-
func GetRedisClient(logger util.Logger) (*redis.Client, error) {
38-
redisURL := os.Getenv("REDIS_URL")
39-
if redisURL == "" {
40-
redisURL = "redis://localhost:6379/0"
41-
logger.Warn(fmt.Sprintf("REDIS_URL not set, using default: %s", redisURL))
42-
}
43-
opts, err := redis.ParseURL(redisURL)
44-
if err != nil {
45-
logger.Error(fmt.Sprintf("Failed to parse REDIS_URL '%s': %v", redisURL, err))
46-
return nil, err
47-
}
48-
rdb := redis.NewClient(opts)
49-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
50-
defer cancel()
51-
_, err = rdb.Ping(ctx).Result()
52-
if err != nil {
53-
logger.Error(fmt.Sprintf("Failed to connect to Redis at %s: %v", opts.Addr, err))
54-
return nil, err
55-
}
56-
logger.Info(fmt.Sprintf("Successfully connected to Redis at %s", opts.Addr))
57-
return rdb, nil
58-
}
59-
6035
// GetSSEHandler returns an HTTP handler
6136
// for Server-Sent Events (SSE) using Redis Streams.
62-
func GetSSEHandler(logger util.Logger, redisClient *redis.Client) http.HandlerFunc {
63-
if redisClient == nil {
37+
func GetSSEHandler(logger util.Logger) http.HandlerFunc {
38+
if util.RedisClient == nil {
6439
logger.Error("GetSSEHandler requires a non-nil Redis client")
6540
return func(w http.ResponseWriter, r *http.Request) {
6641
http.Error(w, "Internal Server Error: Redis client not configured", http.StatusInternalServerError)
6742
}
6843
}
6944
return func(w http.ResponseWriter, r *http.Request) {
70-
serveSSEWithStream(logger, redisClient, w, r) // Call the new function
45+
serveSSEWithStream(logger, w, r) // Call the new function
7146
}
7247
}
7348

@@ -93,7 +68,7 @@ func sendSSEData(w http.ResponseWriter, rc *http.ResponseController, payload str
9368
}
9469

9570
// serveSSEWithStream handles the SSE stream for a given run ID.
96-
func serveSSEWithStream(logger util.Logger, redisClient *redis.Client, w http.ResponseWriter, r *http.Request) {
71+
func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Request) {
9772
ctx := r.Context()
9873
logger.Info("[SSE Stream Handler] Entered serveSSEWithStream")
9974

@@ -174,7 +149,7 @@ func serveSSEWithStream(logger util.Logger, redisClient *redis.Client, w http.Re
174149
for {
175150
// Use XRead to get batches of historical data
176151
// We don't block here, just read what's available.
177-
cmd := redisClient.XRead(ctx, &redis.XReadArgs{
152+
cmd := util.RedisClient.XRead(ctx, &redis.XReadArgs{
178153
Streams: []string{redisStreamName, lastProcessedID},
179154
Count: streamReadCount,
180155
})
@@ -251,7 +226,7 @@ func serveSSEWithStream(logger util.Logger, redisClient *redis.Client, w http.Re
251226
}
252227

253228
// logger.Info(fmt.Sprintf("[SSE Stream Handler] Blocking read on stream '%s' from ID: %s", redisStreamName, lastProcessedID))
254-
cmd := redisClient.XRead(ctx, &redis.XReadArgs{
229+
cmd := util.RedisClient.XRead(ctx, &redis.XReadArgs{
255230
Streams: []string{redisStreamName, lastProcessedID},
256231
Count: streamReadCount,
257232
Block: blockTimeout,

util/enqueue.go

Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"fmt"
77
"os"
88
"time"
9-
10-
amqp "github.com/rabbitmq/amqp091-go"
119
)
1210

1311
func EnqueueRunRequest(ctx context.Context, runID string, fileName string, extension string) error {
@@ -21,39 +19,11 @@ func EnqueueRunRequest(ctx context.Context, runID string, fileName string, exten
2119
Timestamp time.Time `json:"timestamp"`
2220
}
2321

24-
// Get RabbitMQ connection string from environment variable or use default
25-
rabbitMQURL := os.Getenv("RABBITMQ_URL")
26-
if rabbitMQURL == "" {
27-
rabbitMQURL = "amqp://guest:guest@localhost:5672/"
28-
}
29-
30-
// Connect to RabbitMQ server
31-
conn, err := amqp.Dial(rabbitMQURL)
32-
if err != nil {
33-
logger.Error(fmt.Sprintf("Failed to connect to RabbitMQ: %v", err))
34-
}
35-
defer conn.Close()
36-
37-
// Create a channel
38-
ch, err := conn.Channel()
39-
if err != nil {
40-
logger.Error(fmt.Sprintf("Failed to open a channel: %v", err))
41-
}
42-
defer ch.Close()
43-
4422
// Declare a queue
45-
queueName := "task_queue"
46-
q, err := ch.QueueDeclare(
47-
queueName, // name
48-
true, // durable
49-
false, // delete when unused
50-
false, // exclusive
51-
false, // no-wait
52-
nil, // arguments
53-
)
54-
55-
if err != nil {
56-
logger.Error(fmt.Sprintf("Failed to declare a queue: %v", err))
23+
queueName := os.Getenv("REDIS_QUEUE_NAME")
24+
if queueName == "" {
25+
queueName = "task_queue"
26+
logger.Warn(fmt.Sprintf("REDIS_QUEUE_NAME not set, using default: %s", queueName))
5727
}
5828

5929
// Create a new message
@@ -68,20 +38,11 @@ func EnqueueRunRequest(ctx context.Context, runID string, fileName string, exten
6838
body, err := json.Marshal(msg)
6939
if err != nil {
7040
logger.Error(fmt.Sprintf("Error marshaling message: %v", err))
41+
return err
7142
}
7243

73-
// Publish message
74-
err = ch.PublishWithContext(
75-
ctx,
76-
"", // exchange
77-
q.Name, // routing key
78-
false, // mandatory
79-
false, // immediate
80-
amqp.Publishing{
81-
DeliveryMode: amqp.Persistent,
82-
Body: body,
83-
},
84-
)
44+
// Push message to Redis List (LPUSH = enqueue at head)
45+
err = RedisClient.LPush(ctx, queueName, string(body)).Err();
8546

8647
if err != nil {
8748
logger.Error(fmt.Sprintf("Failed to publish message: %v", err))

util/redis.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package util
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9"
10+
)
11+
12+
var RedisClient *redis.Client
13+
14+
// InitRedisClient initializes a Redis client.
15+
func InitRedisClient(logger Logger) (error) {
16+
redisURL := os.Getenv("REDIS_URL")
17+
if redisURL == "" {
18+
redisURL = "redis://localhost:6379/0"
19+
logger.Warn(fmt.Sprintf("REDIS_URL not set, using default: %s", redisURL))
20+
}
21+
opts, err := redis.ParseURL(redisURL)
22+
if err != nil {
23+
logger.Error(fmt.Sprintf("Failed to parse REDIS_URL '%s': %v", redisURL, err))
24+
return err
25+
}
26+
rdb := redis.NewClient(opts)
27+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
28+
defer cancel()
29+
_, err = rdb.Ping(ctx).Result()
30+
if err != nil {
31+
logger.Error(fmt.Sprintf("Failed to connect to Redis at %s: %v", opts.Addr, err))
32+
return err
33+
}
34+
logger.Info(fmt.Sprintf("Successfully connected to Redis at %s", opts.Addr))
35+
RedisClient = rdb
36+
return nil
37+
}
38+
39+
// ShutDownRedisClient shuts down the Redis client
40+
func ShutDownRedisClient(logger Logger) {
41+
logger.Info("Shutting down Redis client...")
42+
if redisErr := RedisClient.Close(); redisErr != nil {
43+
logger.Error(fmt.Sprintf("Redis client shutdown error: %v", redisErr))
44+
} else {
45+
logger.Info("Redis client shutdown complete.")
46+
}
47+
}

0 commit comments

Comments
 (0)