Skip to content

Commit 8a0e046

Browse files
committed
feat: implement rabbitmq msg queue
1 parent 3396c29 commit 8a0e046

File tree

13 files changed

+259
-58
lines changed

13 files changed

+259
-58
lines changed

apps/execution-service/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/google/uuid v1.6.0 // indirect
3232
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
3333
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
34+
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
3435
go.opencensus.io v0.24.0 // indirect
3536
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
3637
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect

apps/execution-service/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwA
8585
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
8686
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
8787
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
88+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
89+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
8890
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8991
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
9092
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=

apps/execution-service/handlers/submit.go

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
package handlers
22

33
import (
4-
"bytes"
54
"encoding/json"
65
"execution-service/constants"
6+
"execution-service/messagequeue"
77
"execution-service/models"
88
"execution-service/utils"
99
"fmt"
10+
"net/http"
11+
1012
"github.com/go-chi/chi/v5"
1113
"google.golang.org/api/iterator"
12-
"net/http"
13-
"os"
1414
)
1515

1616
func (s *Service) ExecuteVisibleAndHiddenTestsAndSubmit(w http.ResponseWriter, r *http.Request) {
@@ -84,33 +84,39 @@ func (s *Service) ExecuteVisibleAndHiddenTestsAndSubmit(w http.ResponseWriter, r
8484
return
8585
}
8686

87-
// get history-service url from os env
88-
historyServiceUrl := os.Getenv("HISTORY_SERVICE_URL")
89-
if historyServiceUrl == "" {
90-
http.Error(w, "HISTORY_SERVICE_URL is not set", http.StatusInternalServerError)
91-
return
92-
}
93-
94-
req, err := http.NewRequest(http.MethodPost, historyServiceUrl+"histories",
95-
bytes.NewBuffer(jsonData))
96-
if err != nil {
97-
http.Error(w, err.Error(), http.StatusInternalServerError)
98-
return
99-
}
100-
101-
req.Header.Set("Content-Type", "application/json")
102-
103-
client := &http.Client{}
104-
resp, err := client.Do(req)
87+
err = messagequeue.PublishSubmissionMessage(jsonData)
10588
if err != nil {
106-
http.Error(w, err.Error(), http.StatusInternalServerError)
89+
http.Error(w, fmt.Sprintf("Failed to save submission history: %v", err), http.StatusInternalServerError)
10790
return
10891
}
109-
defer resp.Body.Close()
11092

111-
if resp.StatusCode != http.StatusOK {
112-
http.Error(w, "Failed to save submission history", http.StatusInternalServerError)
113-
}
93+
// get history-service url from os env
94+
// historyServiceUrl := os.Getenv("HISTORY_SERVICE_URL")
95+
// if historyServiceUrl == "" {
96+
// http.Error(w, "HISTORY_SERVICE_URL is not set", http.StatusInternalServerError)
97+
// return
98+
// }
99+
100+
// req, err := http.NewRequest(http.MethodPost, historyServiceUrl+"histories",
101+
// bytes.NewBuffer(jsonData))
102+
// if err != nil {
103+
// http.Error(w, err.Error(), http.StatusInternalServerError)
104+
// return
105+
// }
106+
107+
// req.Header.Set("Content-Type", "application/json")
108+
109+
// client := &http.Client{}
110+
// resp, err := client.Do(req)
111+
// if err != nil {
112+
// http.Error(w, err.Error(), http.StatusInternalServerError)
113+
// return
114+
// }
115+
// defer resp.Body.Close()
116+
117+
// if resp.StatusCode != http.StatusOK {
118+
// http.Error(w, "Failed to save submission history", http.StatusInternalServerError)
119+
// }
114120

115121
w.Header().Set("Content-Type", "application/json")
116122
w.WriteHeader(http.StatusOK)

apps/execution-service/main.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package main
33
import (
44
"context"
55
"execution-service/handlers"
6+
"execution-service/messagequeue"
7+
"execution-service/utils"
68
"fmt"
79
"log"
810
"net/http"
@@ -21,16 +23,15 @@ import (
2123
func main() {
2224
// Load .env file
2325
err := godotenv.Load()
24-
if err != nil {
25-
log.Fatal("Error loading .env file")
26-
}
26+
utils.FailOnError(err, "Error loading .env file")
27+
28+
amqpChannel := messagequeue.InitRabbitMQServer()
29+
defer amqpChannel.Close()
2730

2831
// Initialize Firestore client
2932
ctx := context.Background()
3033
client, err := initFirestore(ctx)
31-
if err != nil {
32-
log.Fatalf("Failed to initialize Firestore client: %v", err)
33-
}
34+
utils.FailOnError(err, "Failed to initialize Firestore client")
3435
defer client.Close()
3536

3637
service := &handlers.Service{Client: client}
@@ -107,7 +108,5 @@ func initRestServer(r *chi.Mux) {
107108
// Start the server
108109
log.Printf("Starting REST server on http://localhost:%s", port)
109110
err := http.ListenAndServe(fmt.Sprintf(":%s", port), r)
110-
if err != nil {
111-
log.Fatalf("Failed to start server: %v", err)
112-
}
111+
utils.FailOnError(err, "Failed to start REST server")
113112
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package messagequeue
2+
3+
import (
4+
"execution-service/utils"
5+
"fmt"
6+
"log"
7+
"os"
8+
9+
amqp "github.com/rabbitmq/amqp091-go"
10+
)
11+
12+
const CODE_SUBMISSION_QUEUE_KEY = "code-submission"
13+
14+
var (
15+
codeSubmissionQueue amqp.Queue
16+
rabbitMQChannel *amqp.Channel
17+
)
18+
19+
func InitRabbitMQServer() *amqp.Channel {
20+
// Connect to RabbitMQ server
21+
rabbitMQURL := os.Getenv("RABBITMQ_URL")
22+
conn, err := amqp.Dial(rabbitMQURL)
23+
utils.FailOnError(err, "Failed to connect to RabbitMQ")
24+
defer conn.Close()
25+
26+
// Create a channel
27+
ch, err := conn.Channel()
28+
utils.FailOnError(err, "Failed to open a channel")
29+
rabbitMQChannel = ch
30+
31+
// Declare a queue
32+
q, err := ch.QueueDeclare(
33+
CODE_SUBMISSION_QUEUE_KEY, // name
34+
false, // durable
35+
false, // delete when unused
36+
false, // exclusive
37+
false, // no-wait
38+
nil, // arguments
39+
)
40+
utils.FailOnError(err, "Failed to declare a queue")
41+
codeSubmissionQueue = q
42+
43+
return ch
44+
}
45+
46+
func PublishSubmissionMessage(submission []byte) error {
47+
err := rabbitMQChannel.Publish(
48+
"", // exchange
49+
codeSubmissionQueue.Name, // routing key
50+
false, // mandatory
51+
false, // immediate
52+
amqp.Publishing{
53+
ContentType: "application/json",
54+
Body: submission,
55+
})
56+
if err != nil {
57+
return fmt.Errorf("Failed to publish a message: %v", err)
58+
}
59+
log.Printf("RabbitMQ: [x] Sent %s", submission)
60+
return nil
61+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package utils
2+
3+
import "log"
4+
5+
func FailOnError(err error, msg string) {
6+
if err != nil {
7+
log.Fatalf("%s: %s", msg, err)
8+
}
9+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package databases
2+
3+
import (
4+
"context"
5+
"history-service/models"
6+
7+
"cloud.google.com/go/firestore"
8+
)
9+
10+
func CreateHistory(client *firestore.Client, ctx context.Context, submissionHistory models.SubmissionHistory) (*firestore.DocumentRef, error) {
11+
// Document reference ID in firestore mapped to the match ID in model
12+
collection := client.Collection("collaboration-history")
13+
14+
docRef, _, err := collection.Add(ctx, map[string]interface{}{
15+
"title": submissionHistory.Title,
16+
"code": submissionHistory.Code,
17+
"language": submissionHistory.Language,
18+
"user": submissionHistory.User,
19+
"matchedUser": submissionHistory.MatchedUser,
20+
"matchedTopics": submissionHistory.MatchedTopics,
21+
"questionDocRefId": submissionHistory.QuestionDocRefID,
22+
"questionDifficulty": submissionHistory.QuestionDifficulty,
23+
"questionTopics": submissionHistory.QuestionTopics,
24+
"status": submissionHistory.Status,
25+
"createdAt": firestore.ServerTimestamp,
26+
"updatedAt": firestore.ServerTimestamp,
27+
})
28+
if err != nil {
29+
return nil, err
30+
}
31+
return docRef, nil
32+
}

apps/history-service/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/google/uuid v1.6.0 // indirect
3232
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
3333
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
34+
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
3435
go.opencensus.io v0.24.0 // indirect
3536
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
3637
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect

apps/history-service/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwA
8585
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
8686
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
8787
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
88+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
89+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
8890
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8991
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
9092
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=

apps/history-service/handlers/create.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package handlers
22

33
import (
44
"encoding/json"
5+
"history-service/databases"
56
"history-service/models"
67
"history-service/utils"
78
"net/http"
89

9-
"cloud.google.com/go/firestore"
1010
"google.golang.org/api/iterator"
1111
)
1212

@@ -22,22 +22,7 @@ func (s *Service) CreateHistory(w http.ResponseWriter, r *http.Request) {
2222
}
2323

2424
// Document reference ID in firestore mapped to the match ID in model
25-
collection := s.Client.Collection("collaboration-history")
26-
27-
docRef, _, err := collection.Add(ctx, map[string]interface{}{
28-
"title": submissionHistory.Title,
29-
"code": submissionHistory.Code,
30-
"language": submissionHistory.Language,
31-
"user": submissionHistory.User,
32-
"matchedUser": submissionHistory.MatchedUser,
33-
"matchedTopics": submissionHistory.MatchedTopics,
34-
"questionDocRefId": submissionHistory.QuestionDocRefID,
35-
"questionDifficulty": submissionHistory.QuestionDifficulty,
36-
"questionTopics": submissionHistory.QuestionTopics,
37-
"status": submissionHistory.Status,
38-
"createdAt": firestore.ServerTimestamp,
39-
"updatedAt": firestore.ServerTimestamp,
40-
})
25+
docRef, err := databases.CreateHistory(s.Client, ctx, submissionHistory)
4126
if err != nil {
4227
http.Error(w, err.Error(), http.StatusInternalServerError)
4328
return

0 commit comments

Comments
 (0)