Skip to content

Commit b7ae05b

Browse files
committed
Make bot consume messages in a go routine
1 parent 2644d7b commit b7ae05b

File tree

13 files changed

+435
-62
lines changed

13 files changed

+435
-62
lines changed

Dockerfile

Lines changed: 0 additions & 18 deletions
This file was deleted.

Dockerfile.server

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
FROM golang:1.25 AS builder
2+
WORKDIR /app
3+
4+
COPY go.mod go.sum ./
5+
RUN go mod download
6+
7+
COPY . .
8+
RUN CGO_ENABLED=0 GOOS=linux go build -o gochatty ./cmd/server
9+
10+
FROM debian:bookworm-slim
11+
WORKDIR /app
12+
13+
COPY --from=builder /app/gochatty .
14+
15+
EXPOSE 8080
16+
17+
CMD ["./gochatty"]

api.http

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,13 @@ Content-Type: application/json
3131

3232
### Get Last 50 Messages
3333
GET {{baseUrl}}/messages
34-
Authorization: Bearer {{token}}
34+
Authorization: Bearer {{token}}
35+
36+
### Post Stock Quote Command Message
37+
POST {{baseUrl}}/messages
38+
Authorization: Bearer {{token}}
39+
Content-Type: application/json
40+
41+
{
42+
"content": "/stock=AAPL.US"
43+
}

cmd/client/main.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"log"
7+
"os"
8+
9+
"github.com/gorilla/websocket"
10+
)
11+
12+
func main() {
13+
url := "ws://localhost:8080/ws"
14+
15+
c, _, err := websocket.DefaultDialer.Dial(url, nil)
16+
if err != nil {
17+
log.Fatal("dial:", err)
18+
}
19+
defer c.Close()
20+
21+
fmt.Println("Connected to WebSocket server at", url)
22+
23+
done := make(chan struct{})
24+
25+
go func() {
26+
defer close(done)
27+
for {
28+
_, message, err := c.ReadMessage()
29+
if err != nil {
30+
log.Println("read:", err)
31+
return
32+
}
33+
fmt.Printf("Received: %s\n", message)
34+
}
35+
}()
36+
37+
scanner := bufio.NewScanner(os.Stdin)
38+
for {
39+
fmt.Print("Enter 'exit' to terminate: ")
40+
if !scanner.Scan() {
41+
break
42+
}
43+
text := scanner.Text()
44+
if text == "exit" {
45+
break
46+
}
47+
}
48+
49+
c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
50+
<-done
51+
}

cmd/server/main.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"encoding/csv"
6+
"encoding/json"
7+
"fmt"
8+
"gochatty/internal/auth"
9+
"gochatty/internal/chat"
10+
"gochatty/internal/db"
11+
"gochatty/internal/models"
12+
"gochatty/internal/mq"
13+
"gochatty/internal/websocket"
14+
"io"
15+
"log"
16+
"net/http"
17+
"os"
18+
"strings"
19+
"time"
20+
21+
"github.com/gin-gonic/gin"
22+
)
23+
24+
const BOT_ID = 1
25+
26+
type StockCommand struct {
27+
UserID uint `json:"user_id"`
28+
StockCode string `json:"stock_code"`
29+
}
30+
31+
func fetchStockQuote(stockCode string) (string, error) {
32+
url := fmt.Sprintf("https://stooq.com/q/l/?s=%s&f=sd2t2ohlcv&h&e=csv", stockCode)
33+
resp, err := http.Get(url)
34+
if err != nil {
35+
return "", err
36+
}
37+
defer resp.Body.Close()
38+
39+
reader := csv.NewReader(bufio.NewReader(resp.Body))
40+
_, err = reader.Read()
41+
if err != nil {
42+
return "", err
43+
}
44+
45+
record, err := reader.Read()
46+
if err != nil {
47+
if err == io.EOF {
48+
return "", fmt.Errorf("stock quote not found")
49+
}
50+
return "", err
51+
}
52+
53+
closePrice := record[6]
54+
if closePrice == "N/D" {
55+
return "", fmt.Errorf("no data for stock %s", stockCode)
56+
}
57+
58+
return fmt.Sprintf("%s quote is $%s per share", strings.ToUpper(stockCode), closePrice), nil
59+
}
60+
61+
func runBot(cmdClient *mq.RabbitMQ, msgClient *mq.RabbitMQ) {
62+
msgs, err := cmdClient.Consume(false) // manual ack
63+
if err != nil {
64+
log.Fatalf("Bot: Failed to consume command queue: %v", err)
65+
}
66+
log.Println("Bot: Started consuming stock_commands")
67+
68+
for d := range msgs {
69+
var cmd StockCommand
70+
err := json.Unmarshal(d.Body, &cmd)
71+
if err != nil {
72+
log.Printf("Bot: Invalid command message: %v", err)
73+
d.Ack(false)
74+
continue
75+
}
76+
77+
quote, err := fetchStockQuote(cmd.StockCode)
78+
if err != nil {
79+
quote = fmt.Sprintf("Error fetching quote for %s: %v", cmd.StockCode, err)
80+
}
81+
82+
botMessage := chat.PostMessageRequest{
83+
Content: quote,
84+
}
85+
86+
body, err := json.Marshal(botMessage)
87+
if err != nil {
88+
log.Printf("Bot: Failed to marshal botMessage: %v", err)
89+
d.Ack(false)
90+
continue
91+
}
92+
93+
err = msgClient.Publish(body)
94+
if err != nil {
95+
log.Printf("Bot: Failed to publish botMessage: %v", err)
96+
d.Ack(false)
97+
continue
98+
}
99+
100+
d.Ack(false)
101+
log.Println("Bot: Message acknowledged")
102+
}
103+
}
104+
105+
func main() {
106+
dsn := os.Getenv("DATABASE_URL")
107+
db.Init(dsn)
108+
109+
rabbitMQURL := os.Getenv("RABBITMQ_URL")
110+
if rabbitMQURL == "" {
111+
rabbitMQURL = "amqp://guest:guest@localhost:5672/"
112+
}
113+
114+
rabbitCmd, err := mq.NewRabbitMQ(rabbitMQURL, "stock_commands")
115+
if err != nil {
116+
log.Fatalf("Failed to connect to RabbitMQ for commands: %v", err)
117+
}
118+
defer rabbitCmd.Close()
119+
120+
rabbitMsgs, err := mq.NewRabbitMQ(rabbitMQURL, "chat_messages")
121+
if err != nil {
122+
log.Fatalf("Failed to connect to RabbitMQ for messages: %v", err)
123+
}
124+
defer rabbitMsgs.Close()
125+
126+
chat.InitRabbitMQClient(rabbitCmd, rabbitMsgs)
127+
128+
go runBot(rabbitCmd, rabbitMsgs)
129+
130+
r := gin.Default()
131+
132+
r.POST("/register", auth.Register)
133+
r.POST("/login", auth.Login)
134+
135+
authorized := r.Group("/")
136+
authorized.Use(auth.JWTAuthMiddleware())
137+
138+
authorized.POST("/messages", chat.PostMessage)
139+
authorized.GET("/messages", chat.GetMessages)
140+
141+
go websocket.HandleMessages()
142+
r.GET("/ws", websocket.HandleConnections)
143+
144+
go func() {
145+
msgs, err := rabbitMsgs.Consume(false)
146+
if err != nil {
147+
log.Fatalf("Failed to consume chat messages: %v", err)
148+
}
149+
150+
log.Println("Started consuming chat_messages")
151+
152+
for d := range msgs {
153+
var msg chat.PostMessageRequest
154+
err := json.Unmarshal(d.Body, &msg)
155+
if err != nil {
156+
log.Printf("Failed to unmarshal chat post message: %v", err)
157+
d.Ack(false)
158+
continue
159+
}
160+
161+
message := models.Message{
162+
UserID: BOT_ID,
163+
Content: msg.Content,
164+
CreatedAt: time.Now(),
165+
}
166+
167+
log.Println("Saving bot message")
168+
chat.SaveMessage(message)
169+
170+
chat.BroadcastMessageToClients(msg.Content, "bot", message.CreatedAt)
171+
d.Ack(false)
172+
}
173+
}()
174+
175+
log.Println("Server running on port 8080")
176+
log.Fatal(r.Run(":8080"))
177+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
2828
github.com/quic-go/qpack v0.5.1 // indirect
2929
github.com/quic-go/quic-go v0.54.0 // indirect
30+
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
3031
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
3132
github.com/ugorji/go/codec v1.3.0 // indirect
3233
go.uber.org/mock v0.5.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
5353
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
5454
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
5555
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
56+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
57+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
5658
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5759
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
5860
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=

0 commit comments

Comments
 (0)