Skip to content

Commit 67bee1e

Browse files
committed
Add runRequest to RabbitMQ.
1 parent 3a43c98 commit 67bee1e

File tree

8 files changed

+114
-1
lines changed

8 files changed

+114
-1
lines changed

controller/ea.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,5 +123,11 @@ func CreateEA(res http.ResponseWriter, req *http.Request) {
123123

124124
// TODO: Schedule the run.
125125

126+
if err := util.EnqueueRunRequest(req.Context(), runID, "code", "py"); err != nil {
127+
util.JSONResponse(res, http.StatusInternalServerError, err.Error(), nil)
128+
return
129+
}
130+
126131
util.JSONResponse(res, http.StatusOK, "It works! 👍🏻", data)
132+
127133
}

controller/gp.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ func CreateGP(res http.ResponseWriter, req *http.Request) {
122122
}
123123

124124
// TODO: Schedule the run.
125+
if err := util.EnqueueRunRequest(req.Context(), runID, "code", "py"); err != nil {
126+
util.JSONResponse(res, http.StatusInternalServerError, err.Error(), nil)
127+
return
128+
}
125129

126130
util.JSONResponse(res, http.StatusOK, "It works! 👍🏻", data)
127131
}

controller/ml.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ func CreateML(res http.ResponseWriter, req *http.Request) {
122122
}
123123

124124
// TODO: Schedule the run.
125+
if err := util.EnqueueRunRequest(req.Context(), runID, "code", "py"); err != nil {
126+
util.JSONResponse(res, http.StatusInternalServerError, err.Error(), nil)
127+
return
128+
}
125129

126130
util.JSONResponse(res, http.StatusOK, "It works! 👍🏻", data)
127131
}

controller/pso.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) {
122122
}
123123

124124
// TODO: Schedule the run.
125+
if err := util.EnqueueRunRequest(req.Context(), runID, "code", "py"); err != nil {
126+
util.JSONResponse(res, http.StatusInternalServerError, err.Error(), nil)
127+
return
128+
}
125129

126130
util.JSONResponse(res, http.StatusOK, "It works! 👍🏻", data)
127131
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
2323
github.com/minio/crc64nvme v1.0.1 // indirect
2424
github.com/minio/md5-simd v1.1.2 // indirect
25+
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
2526
github.com/rs/xid v1.6.0 // indirect
2627
golang.org/x/crypto v0.33.0 // indirect
2728
golang.org/x/net v0.35.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ github.com/minio/minio-go/v7 v7.0.87 h1:nkr9x0u53PespfxfUqxP3UYWiE2a41gaofgNnC4Y
3838
github.com/minio/minio-go/v7 v7.0.87/go.mod h1:33+O8h0tO7pCeCWwBVa07RhVVfB/3vS4kEX7rwYKmIg=
3939
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4040
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
41+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
42+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
4143
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
4244
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
4345
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=

util/enqueue.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package util
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"os"
8+
"time"
9+
amqp "github.com/rabbitmq/amqp091-go"
10+
)
11+
12+
func EnqueueRunRequest(ctx context.Context, runID string, fileName string, extension string) error {
13+
14+
// Message represents the structure of our message
15+
type Message struct {
16+
RunId string `json:"runId"`
17+
FileName string `json:"fileName"`
18+
Extension string `json:"extension"`
19+
Timestamp time.Time `json:"timestamp"`
20+
}
21+
22+
// Get RabbitMQ connection string from environment variable or use default
23+
rabbitMQURL := os.Getenv("RABBITMQ_URL")
24+
if rabbitMQURL == "" {
25+
rabbitMQURL = "amqp://guest:guest@localhost:5672/"
26+
}
27+
28+
// Connect to RabbitMQ server
29+
conn, err := amqp.Dial(rabbitMQURL)
30+
if err != nil {
31+
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
32+
}
33+
defer conn.Close()
34+
35+
// Create a channel
36+
ch, err := conn.Channel()
37+
if err != nil {
38+
log.Fatalf("Failed to open a channel: %v", err)
39+
}
40+
defer ch.Close()
41+
42+
// Declare a queue
43+
queueName := "task_queue"
44+
q, err := ch.QueueDeclare(
45+
queueName, // name
46+
true, // durable
47+
false, // delete when unused
48+
false, // exclusive
49+
false, // no-wait
50+
nil, // arguments
51+
)
52+
53+
if err != nil {
54+
log.Fatalf("Failed to declare a queue: %v", err)
55+
}
56+
57+
58+
// Create a new message
59+
msg := Message{
60+
RunId: runID,
61+
FileName: fileName,
62+
Extension: extension,
63+
Timestamp: time.Now(),
64+
}
65+
66+
// Convert message to JSON
67+
body, err := json.Marshal(msg)
68+
if err != nil {
69+
log.Printf("Error marshaling message: %v", err)
70+
}
71+
72+
// Publish message
73+
err = ch.PublishWithContext(
74+
ctx,
75+
"", // exchange
76+
q.Name, // routing key
77+
false, // mandatory
78+
false, // immediate
79+
amqp.Publishing{
80+
DeliveryMode: amqp.Persistent,
81+
Body: body,
82+
},
83+
)
84+
85+
if err != nil {
86+
log.Printf("Failed to publish a message: %v", err)
87+
}
88+
89+
log.Printf("Published message: %s", msg.RunId)
90+
91+
92+
return nil
93+
}

util/minio.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"os"
7-
87
"github.com/minio/minio-go/v7"
98
"github.com/minio/minio-go/v7/pkg/credentials"
109
)

0 commit comments

Comments
 (0)