@@ -3,34 +3,34 @@ package messagequeue
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "fmt"
6
7
"history-service/models"
7
8
"history-service/utils"
8
9
"log"
9
10
"os"
11
+ "time"
10
12
11
13
"cloud.google.com/go/firestore"
12
14
amqp "github.com/rabbitmq/amqp091-go"
13
15
)
14
16
15
- const CODE_SUBMISSION_QUEUE_KEY = "code-submission"
17
+ const (
18
+ CODE_SUBMISSION_QUEUE_KEY = "code-submission"
19
+ NUM_RETRIES = 10
20
+ )
16
21
17
22
var (
18
23
codeSubmissionQueue amqp.Queue
19
24
rabbitMQChannel * amqp.Channel
20
25
)
21
26
22
- func InitRabbitMQServer () * amqp.Channel {
23
- // Connect to RabbitMQ server
24
- rabbitMQURL := os .Getenv ("RABBITMQ_URL" )
25
- conn , err := amqp .Dial (rabbitMQURL )
26
- utils .FailOnError (err , "Failed to connect to RabbitMQ" )
27
- defer conn .Close ()
27
+ func InitRabbitMQServer () (* amqp.Connection , * amqp.Channel ) {
28
+ conn := connectToRabbitMQ ()
28
29
29
30
// Create a channel
30
31
ch , err := conn .Channel ()
31
32
utils .FailOnError (err , "Failed to open a channel" )
32
33
rabbitMQChannel = ch
33
- defer ch .Close ()
34
34
35
35
// Declare a queue
36
36
q , err := ch .QueueDeclare (
@@ -44,7 +44,24 @@ func InitRabbitMQServer() *amqp.Channel {
44
44
utils .FailOnError (err , "Failed to declare a queue" )
45
45
codeSubmissionQueue = q
46
46
47
- return ch
47
+ return conn , ch
48
+ }
49
+
50
+ func connectToRabbitMQ () * amqp.Connection {
51
+ var conn * amqp.Connection
52
+ var err error
53
+ rabbitMQURL := os .Getenv ("RABBITMQ_URL" )
54
+ for i := 0 ; i < NUM_RETRIES ; i ++ { // Retry up to 10 times
55
+ conn , err = amqp .Dial (rabbitMQURL )
56
+ if err == nil {
57
+ log .Println ("Connected to RabbitMQ" )
58
+ return conn
59
+ }
60
+ log .Printf ("Failed to connect to RabbitMQ, retrying in 5 seconds... (Attempt %d/%d)" , i + 1 , NUM_RETRIES )
61
+ time .Sleep (5 * time .Second )
62
+ }
63
+ utils .FailOnError (err , fmt .Sprintf ("Failed to connect to RabbitMQ after %d attempts" , NUM_RETRIES ))
64
+ return nil
48
65
}
49
66
50
67
func ConsumeSubmissionMessages (client * firestore.Client , createSubmission func (
@@ -62,7 +79,7 @@ func ConsumeSubmissionMessages(client *firestore.Client, createSubmission func(
62
79
false , // no-wait
63
80
nil , // args
64
81
)
65
- utils .FailOnError (err , "Failed to register a consumer" )
82
+ utils .FailOnError (err , "RabbitMQ: Failed to register a consumer" )
66
83
67
84
// Create a channel to block indefinitely
68
85
forever := make (chan bool )
0 commit comments