✨ feat(config): add RabbitMQ configuration support#10
✨ feat(config): add RabbitMQ configuration support#10
Conversation
- introduce RabbitMQ configuration in config struct - add default URL for RabbitMQ connection ✨ feat(service): implement RabbitMQ message consumption - create a method to start consuming messages from RabbitMQ - handle different request types for train routes, carriages, and station searches 📝 docs(README): update gRPC run command instruction - modify command to include configuration file path for server startup 💄 style(.gitignore): clean up ignored files - add .DS_Store to ignored files - ensure vendor directory is ignored ✅ test(queue): create test queue for RabbitMQ messages - implement a test program to send messages to RabbitMQ queue - include test cases for different request types and payloads
WalkthroughThis pull request updates configuration and service initialization while adding asynchronous RabbitMQ message handling. The changes modify the Changes
Sequence Diagram(s)sequenceDiagram
participant T as TestQueue
participant R as RabbitMQ
participant S as Service
T->>R: Publish message (JSON with requestType)
R->>S: Deliver message from "rzd_queue"
S->>S: Parse JSON and determine request type
alt requestType = getTrainRoutes
S->>S: Process train routes request
else requestType = getTrainCarriages
S->>S: Process train carriages request
else requestType = searchStation
S->>S: Process station search request
end
S->>S: Log/send constructed response
Tip ⚡💬 Agentic Chat (Pro Plan, General Availability)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (7)
config.yml (2)
14-15: RabbitMQ configuration looks appropriate.
Explicitly specifying the RabbitMQ URL here is clear, but confirm that sensitive credentials are not exposed in production. Using environment variables or a secret management mechanism is often safer for production deployments.🧰 Tools
🪛 YAMLlint (1.35.1)
[error] 15-15: no new line character at the end of file
(new-line-at-end-of-file)
15-15: Missing newline at the end of the file.
Some tools and editors may flag the lack of a trailing newline. Consider adding a newline to align with YAML best practices.Apply this diff to add a newline at EOF:
RabbitMQ: URL: "amqp://guest:guest@localhost:5672/" +🧰 Tools
🪛 YAMLlint (1.35.1)
[error] 15-15: no new line character at the end of file
(new-line-at-end-of-file)
pkg/config/config.go (1)
34-37: New RabbitMQ struct introduced.
The default URL and environment variable mapping look correct. Consider externalizing credentials if this service is deployed in production or if multiple environments need different credentials.test_queue.go (1)
1-95: Consider graceful exits and durability.
This file effectively demonstrates sending test messages to RabbitMQ with a durable queue and priority. However, note the following potential improvements:
- Replace
log.Fatalfwith gentler error handling if you anticipate partial failures and want the program to proceed in certain situations.- Use a context or signal handling to gracefully shut down the connection instead of abruptly terminating after a
log.Fatalf.- Evaluate whether you need additional acknowledgment flags or publisher confirm modes to ensure message delivery.
internal/service/service.go (3)
43-113: Add reconnection and shutdown logic instartConsuming.
Currently,select {}keeps the consumer running indefinitely. If RabbitMQ goes down or a network fault occurs, the channel won’t reconnect. A robust production approach often includes:
- Reconnection attempts after channel/connection failures.
- Cancelation or shutdown signals that can close channels and connections gracefully.
163-181: Optional validation forhandleSearchStationfields.
Likewise, ensuring the presence and correct type ofqueryandcompactModecan safeguard against malformed messages.
183-186: Design a real return path for responses.
sendResponsecurrently logs the response. If you need further downstream processing (e.g., replying to another queue), now is the time to outline that approach.Would you like help implementing a full reply queue mechanism or another delivery option?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (8)
.gitignore(1 hunks)README.md(1 hunks)cmd/rzd-scraper/main.go(1 hunks)config.yml(2 hunks)go.mod(1 hunks)internal/service/service.go(3 hunks)pkg/config/config.go(2 hunks)test_queue.go(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
internal/service/service.go (4)
pkg/config/config.go (2)
Config(11-15)RabbitMQ(35-37)internal/service/interface.go (1)
Service(11-18)internal/domain/models.go (4)
Service(68-72)GetTrainRoutesParams(80-88)GetTrainCarriagesParams(91-97)SearchStationParams(100-103)internal/domain/enums.go (3)
Direction(5-5)TrainType(34-34)TrainSearchType(13-13)
cmd/rzd-scraper/main.go (3)
pkg/config/config.go (2)
RabbitMQ(35-37)RZD(18-27)internal/infrastructure/rzd/rzd_client.go (1)
NewRzdClient(38-77)internal/service/service.go (1)
New(23-27)
🪛 YAMLlint (1.35.1)
config.yml
[error] 15-15: no new line character at the end of file
(new-line-at-end-of-file)
🔇 Additional comments (8)
.gitignore (2)
111-111: Good practice for dependency managementUncommenting the
vendordirectory in.gitignoreis a good practice for Go projects using modules, as it prevents the vendor directory from being committed to the repository.
113-113: Proper exclusion of system filesAdding
.DS_Storeto the gitignore is a good practice to prevent macOS system files from being committed to the repository.README.md (1)
53-53: Documentation updated correctlyThe README has been properly updated to include the
-config=config.ymlflag, which matches the command-line flag support added in the application.cmd/rzd-scraper/main.go (1)
50-51: Correctly updated service initializationThe service initialization has been properly updated to pass the configuration object, which is required for the new RabbitMQ functionality. This change correctly enables the service to access RabbitMQ configuration parameters.
config.yml (2)
2-3: Validate the updated timeout value carefully.
Changing theTIMEOUTfrom a small integer to2000(presumably milliseconds) is a substantial increase. Verify that this value reflects the desired behavior for your requests and does not unintentionally cause extremely delayed timeouts.
12-12: Port string usage is acceptable.
Converting the port into a quoted string is fine since YAML can interpret numeric or string values for ports. This change does not introduce any functional issues.pkg/config/config.go (1)
12-14: Configuration structure expanded with RabbitMQ field.
Adding aRabbitMQfield is consistent with your YAML configuration. Ensure all references tocfg.RabbitMQare handled gracefully throughout the codebase.internal/service/service.go (1)
6-14: New imports for RabbitMQ and JSON handling.
These additions are appropriate for message consuming and payload unmarshalling. No issues here.
| github.com/spf13/viper v1.12.0 // indirect | ||
| github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect | ||
| github.com/stbenjam/no-sprintf-host-port v0.2.0 // indirect | ||
| github.com/streadway/amqp v1.1.0 // indirect |
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Verify the RabbitMQ client library choice
The addition of the github.com/streadway/amqp package is appropriate for implementing RabbitMQ functionality. However, it's worth noting that this library is no longer actively maintained.
🌐 Web query:
Is github.com/streadway/amqp still actively maintained? What are the current recommended Go libraries for RabbitMQ?
💡 Result:
The github.com/streadway/amqp library is no longer actively maintained[1][4]. The repository explicitly warns users to consider alternatives like rabbitmq/amqp091-go, which is now the official successor maintained by the RabbitMQ core team[1][3][7].
Recommended Go Libraries for RabbitMQ
-
rabbitmq/amqp091-go- Official fork of
streadway/amqp, maintained by the RabbitMQ team[3][7]. - Retains most API elements from the original but open to reasonable breaking changes[3].
- Supports modern Go versions and RabbitMQ server releases[7][10].
- Example usage:
import amqp "github.com/rabbitmq/amqp091-go"
- Official fork of
-
wagslane/go-rabbitmq- A higher-level wrapper around
amqp091-gowith built-in reconnection logic and easier abstractions[2]. - Ideal for applications requiring automatic recovery from network failures[2].
- Example consumer setup:
consumer, err := rabbitmq.NewConsumer( conn, "my_queue", rabbitmq.WithConsumerOptionsExchangeDeclare, )
- A higher-level wrapper around
Key Differences Between Libraries
| Feature | streadway/amqp |
amqp091-go |
go-rabbitmq |
|---|---|---|---|
| Maintenance Status | Unmaintained[1][4] | Actively maintained[3] | Actively maintained[2] |
| Reconnection Logic | Manual implementation | Manual implementation | Built-in[2] |
| API Stability | Frozen | Evolving[3][7] | Wrapper with defaults |
| Recommended Use Case | Legacy systems | New projects | Simplified workflows |
For most new projects, rabbitmq/amqp091-go is the baseline recommendation[3][5], while go-rabbitmq offers convenience for common patterns like pub/sub with resilience[2].
Citations:
- 1: https://pkg.go.dev/github.com/streadway/amqp
- 2: https://github.com/wagslane/go-rabbitmq
- 3: https://github.com/rabbitmq/amqp091-go
- 4: https://github.com/streadway/amqp
- 5: https://www.rabbitmq.com/client-libraries/devtools
- 6: go modules support streadway/amqp#388
- 7: https://pkg.go.dev/github.com/rabbitmq/amqp091-go
- 8: https://www.rabbitmq.com/client-libraries/amqp-client-libraries
- 9: https://github.com/rabbitmq/amqp091-go/blob/main/integration_test.go
- 10: https://lavinmq.com/documentation/go-sample-code
Action Required: Update RabbitMQ Client Library
- Location:
go.mod(Line 191) - Current Dependency:
github.com/streadway/amqp v1.1.0(indirect) - Issue: This library is no longer actively maintained.
- Recommendation: Consider migrating to the officially supported
rabbitmq/amqp091-go, which is maintained by the RabbitMQ team. Alternatively, if higher-level abstractions or built-in reconnection support are desired, explore options likewagslane/go-rabbitmq.
Please update the dependency accordingly to ensure long-term maintenance and compatibility.
There was a problem hiding this comment.
🧩 Analysis chain
I'll search for where the AMQP library is actually used in the code:
Let me also check the test_queue.go file which seems to be using RabbitMQ:
flowchart TB
A(["Migrate to rabbitmq/amqp091-go"])
A --o B["Update go.mod dependency"]
A --o C["Update import in service.go"]
A --o D["Update import in test_queue.go (if exists)"]
B --o E["Remove streadway/amqp"]
B --o F["Add rabbitmq/amqp091-go"]
C --o G["Change import path"]
C --o H["Verify API compatibility"]
D --o I["Locate test_queue.go"]
D --o J["Change import path"]
@Fascinat0r Я подготовил план для перехода на библиотеку rabbitmq/amqp091-go. Миграция должна быть довольно простой, поскольку API обеих библиотек почти идентичны. Основные изменения:
-
Обновите зависимость в
go.mod:- github.com/streadway/amqp v1.1.0 // indirect + github.com/rabbitmq/amqp091-go v1.8.1 // indirect
-
Измените импорт в
internal/service/service.go:- import "github.com/streadway/amqp" + import "github.com/rabbitmq/amqp091-go"
Или с псевдонимом для сохранения совместимости с существующим кодом:
- import "github.com/streadway/amqp" + import amqp "github.com/rabbitmq/amqp091-go"
-
Если файл
test_queue.goсуществует, обновите импорт аналогичным образом.
После этих изменений ваш код должен работать без дополнительных модификаций, так как rabbitmq/amqp091-go — это официальный форк streadway/amqp с сохранением API.
🏁 Script executed:
#!/bin/bash
# Search for imports of streadway/amqp in the codebase
rg "import.*streadway/amqp" -A 3
Length of output: 34
🏁 Script executed:
#!/bin/bash
# Search for "amqp" usage in the codebase
rg "amqp\." -A 3
Length of output: 1682
🏁 Script executed:
#!/bin/bash
# Check the content of test_queue.go
cat test_queue.go
Length of output: 2725
🏁 Script executed:
#!/bin/bash
# Search for RabbitMQ related code
rg -i "rabbit|amqp" --no-filename
Length of output: 1796
🏁 Script executed:
#!/bin/bash
# Check the internal/service/service.go file mentioned in the AI summary
cat internal/service/service.go
Length of output: 1263
🏁 Script executed:
#!/bin/bash
# Check if test_queue.go exists in the root directory
find . -name "test_queue.go"
Length of output: 44
🏁 Script executed:
#!/bin/bash
# Let's look at the AI summary for test_queue.go again
cat test_queue.go 2>/dev/null || echo "File not found, checking in other directories"
find . -type f -exec grep -l "streadway/amqp" {} \;
Length of output: 2865
🏁 Script executed:
# Search for test_queue.go file in the repository
find . -type f -name "test_queue.go" -o -name "*test*queue*.go"
Length of output: 79
🌐 Web query:
Are there API differences between streadway/amqp and rabbitmq/amqp091-go?
💡 Result:
The streadway/amqp and rabbitmq/amqp091-go libraries are both Go clients for AMQP 0.9.1, but they differ in maintenance, API evolution philosophy, and minor implementation details. Here are the key differences:
Summary of Differences
- Maintenance Status:
streadway/amqpis unmaintained, whilerabbitmq/amqp091-gois actively developed by the RabbitMQ team. - API Stability:
rabbitmq/amqp091-goallows breaking changes for improvements, unlikestreadway/amqp, which prioritizes strict API stability. - Package Name: The import path differs, requiring code adjustments during migration.
Key API and Usage Differences
1. Package Name and Import
streadway/amqp:import "github.com/streadway/amqp"
rabbitmq/amqp091-go:amqp "github.com/rabbitmq/amqp091-go" // Aliasing for easier migration
2. Context-Aware Methods
rabbitmq/amqp091-go introduces context support in methods like PublishWithContext[3][6], while streadway/amqp uses older conventions:
// rabbitmq/amqp091-go
err = ch.PublishWithContext(ctx, exchange, routingKey, mandatory, immediate, amqp.Publishing{...})
// streadway/amqp (no context)
err = ch.Publish(exchange, routingKey, mandatory, immediate, amqp.Publishing{...})3. Public API Evolution
rabbitmq/amqp091-gois open to breaking changes for long-term improvements, such as optimizations or ergonomic adjustments[1][6].streadway/amqpstrictly avoids breaking changes, even at the cost of stagnation[4][8].
4. Error Handling and Features
Both libraries share core functionality (e.g., queue declaration, message publishing), but differences include:
- Authentication Mechanisms:
rabbitmq/amqp091-goexplicitly restricts support toPLAINandEXTERNALauth[1][6], whilestreadway/amqpleaves room for community extensions[8]. - Protocol Events:
rabbitmq/amqp091-goacknowledges limitations in ordering guarantees for events likebasic.returnandbasic.ackdue to Go channel semantics[1][6].
Migration Considerations
- Use an import alias to minimize code changes[1][6].
- Update method calls to use context-aware variants where applicable.
- Review error-handling logic, as reconnection strategies remain the caller’s responsibility in both libraries[1][4].
Recommendation
For new projects, rabbitmq/amqp091-go is the preferred choice due to active maintenance and modern API practices. Existing projects using streadway/amqp should migrate for long-term support[4][8].
Citations:
- 1: https://github.com/rabbitmq/amqp091-go
- 2: https://go.libhunt.com/amqp091-go-alternatives
- 3: https://www.rabbitmq.com/tutorials/tutorial-one-go
- 4: https://github.com/streadway/amqp
- 5: https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
- 6: https://pkg.go.dev/github.com/rabbitmq/amqp091-go
- 7: https://mehdihadeli.github.io/awesome-go-education/messaging/rabbitmq/
- 8: https://pkg.go.dev/github.com/streadway/amqp
@Fascinat0r По результатам анализа кода, я обнаружил, что библиотека streadway/amqp используется в следующих файлах:
test_queue.go(в корневой директории)internal/infrastructure/rzd/rabbitmq/consumer.go
Вот подробный план миграции на rabbitmq/amqp091-go:
1. Обновите зависимость в go.mod
- github.com/streadway/amqp v1.1.0 // indirect
+ github.com/rabbitmq/amqp091-go v1.8.1 // indirect2. Обновите импорт в test_queue.go
import (
"log"
"time"
- "github.com/streadway/amqp"
+ amqp "github.com/rabbitmq/amqp091-go"
"encoding/json"
)3. Обновите импорт в internal/infrastructure/rzd/rabbitmq/consumer.go
import (
"log"
- "github.com/streadway/amqp"
+ amqp "github.com/rabbitmq/amqp091-go"
)Код должен продолжать работать без дополнительных изменений, так как API практически идентичен. В будущем вы можете рассмотреть возможность использования новых возможностей rabbitmq/amqp091-go, таких как методы с поддержкой контекста (например, PublishWithContext вместо Publish).
После выполнения этих изменений выполните go mod tidy для обновления go.sum и загрузки новой зависимости.
✏️ Learnings added
Learnt from: Fascinat0r
PR: Chaika-Team/ChaikaRzdScraper#10
File: go.mod:191-191
Timestamp: 2025-04-13T16:07:28.193Z
Learning: Следует использовать библиотеку `github.com/rabbitmq/amqp091-go` вместо `github.com/streadway/amqp` для работы с RabbitMQ, так как последняя больше не поддерживается.
Learnt from: Fascinat0r
PR: Chaika-Team/ChaikaRzdScraper#10
File: go.mod:191-191
Timestamp: 2025-04-13T16:07:28.193Z
Learning: При работе с RabbitMQ в Go следует использовать библиотеку `github.com/rabbitmq/amqp091-go` вместо устаревшей `github.com/streadway/amqp`, так как первая активно поддерживается командой RabbitMQ и является официальным преемником.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
⏳ Preparing changes...
There was a problem hiding this comment.
A summary of the edits CodeRabbit can apply:
We will update the go.mod file to replace the dependency on github.com/streadway/amqp v1.1.0 with github.com/rabbitmq/amqp091-go v1.8.1.
We will update the import statement in internal/service/service.go to use the officially supported RabbitMQ library.
We will update the import in test_queue.go to replace the deprecated RabbitMQ library with the new one.
The dependency in go.mod will be updated to replace github.com/streadway/amqp with github.com/rabbitmq/amqp091-go.
The import in internal/service/service.go will be updated to use github.com/rabbitmq/amqp091-go instead of github.com/streadway/amqp.
If the file test_queue.go exists, its import will be updated to use github.com/rabbitmq/amqp091-go.
The import in internal/infrastructure/rzd/rabbitmq/consumer.go will be updated to use github.com/rabbitmq/amqp091-go.
This change will update the RabbitMQ client dependency in go.mod to use rabbitmq/amqp091-go for long-term maintenance.
This update will modify the RabbitMQ client import in internal/service/service.go to use rabbitmq/amqp091-go instead of streadway/amqp.
This update will modify test_queue.go to change the RabbitMQ client import to use rabbitmq/amqp091-go, ensuring consistent dependency usage across the codebase.
We will update the go.mod file to replace the unmaintained dependency
github.com/streadway/amqp v1.1.0 // indirectwith the maintained
github.com/rabbitmq/amqp091-go v1.8.1 // indirect.
We will update the import statements in
internal/service/service.goto use
github.com/rabbitmq/amqp091-goinstead of the deprecated
github.com/streadway/amqp.
We will update the import in
test_queue.goto use
github.com/rabbitmq/amqp091-goinstead of
github.com/streadway/amqp.
A plan of each step that CodeRabbit will take in order to execute the edits for each file (with inline rough draft snippets):
✏️ go.mod
Locate the dependency declaration:
github.com/streadway/amqp v1.1.0 // indirect
Replace it with:
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
Run
go mod tidy
to update and clean the module dependencies.
✏️ internal/service/service.go
Locate the current import:
"github.com/streadway/amqp"
Replace it with the new import:
"github.com/rabbitmq/amqp091-go"
(Optionally, you can alias it as
amqp
by writing
amqp "github.com/rabbitmq/amqp091-go"
).
Verify that all usages of the AMQP library are compatible with the new package.
✏️ test_queue.go
Locate the existing import:
import "github.com/streadway/amqp"
Replace it with:
import amqp "github.com/rabbitmq/amqp091-go"
Confirm that the rest of the file uses the RabbitMQ API correctly and that the tests pass.
✏️ go.mod
Replace the line:
- github.com/streadway/amqp v1.1.0 // indirect
with:
+ github.com/rabbitmq/amqp091-go v1.8.1 // indirect
Run
go mod tidy
to update go.sum and ensure all dependencies are correctly resolved.
✏️ internal/service/service.go
Locate the import line:
- "github.com/streadway/amqp"
and replace it with:
+ "github.com/rabbitmq/amqp091-go"
Verify that the alias "amqp" remains used consistently in the file.
✏️ test_queue.go
Find the import statement:
- import "github.com/streadway/amqp"
or
- import amqp "github.com/streadway/amqp"
and replace it with:
+ import "github.com/rabbitmq/amqp091-go"
or with an alias:
+ import amqp "github.com/rabbitmq/amqp091-go"
✏️ internal/infrastructure/rzd/rabbitmq/consumer.go
Replace the import:
"github.com/streadway/amqp"
with:
"github.com/rabbitmq/amqp091-go"
Confirm that the types (e.g., amqp.Connection, amqp.Channel) are still used correctly and require no additional changes.
✏️ go.mod
Locate the dependency line for
github.com/streadway/amqp
and replace it with
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
.
After modifying go.mod, run
go mod tidy
to update the dependency graph.
✏️ internal/service/service.go
Replace the line
"github.com/streadway/amqp"
with
"github.com/rabbitmq/amqp091-go"
. Optionally, use an alias for compatibility, for example:
import amqp "github.com/rabbitmq/amqp091-go"
.
Verify that all subsequent usages of the
amqp
package remain valid.
✏️ test_queue.go
Change the import line from
"github.com/streadway/amqp"
to
"github.com/rabbitmq/amqp091-go"
. Optionally, use an alias if required, for example:
import amqp "github.com/rabbitmq/amqp091-go"
.
✏️ go.mod
Replace
github.com/streadway/amqp v1.1.0 // indirect
with
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
.
Run
go mod tidy
to update the dependency graph.
✏️ internal/service/service.go
Change the import line:
- "github.com/streadway/amqp"
+ import amqp "github.com/rabbitmq/amqp091-go"
Verify that all AMQP API usages in the file remain compatible with the new package.
✏️ test_queue.go
Replace the import line:
- import "github.com/streadway/amqp"
+ import amqp "github.com/rabbitmq/amqp091-go"
Confirm that any usage of the AMQP functions in this file is valid with the new dependency.
A summary of the context that CodeRabbit is considering across the codebase:
The RabbitMQ client library "github.com/streadway/amqp" is deprecated and unmaintained.
We need to update our dependency in go.mod and in all code files that import it so that we use the maintained
"github.com/rabbitmq/amqp091-go" library instead. This change ensures long-term support and compatibility.
This update is required because the streadway/amqp package is no longer maintained. Migrating to rabbitmq/amqp091-go ensures long-term support and compatibility.
This change ensures that the code in internal/service/service.go uses the actively maintained RabbitMQ client library, thereby reducing future technical debt.
Updating test_queue.go ensures that tests and other RabbitMQ interactions in the codebase utilize the new, supported library, guaranteeing consistency.
The shell search results confirmed that the deprecated dependency
github.com/streadway/amqp v1.1.0 // indirectis referenced in the repository
(in both go.mod, internal/infrastructure/rzd/rabbitmq/consumer.go, and test_queue.go).
We will update all these references to the maintained version
github.com/rabbitmq/amqp091-go v1.8.1 // indirect.
-
✅ Create PR with these edits
-
📋 Get copyable edits
-
📌 Create commit in current branch
-
🔄 Try a different approach
| log.Printf("RabbitMQ URL: %s", cfg.RabbitMQ.URL) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Consider sensitive information exposure in logs
While logging the RabbitMQ URL is helpful for debugging, it may expose sensitive credentials if they're embedded in the URL. Consider masking passwords in logs.
-log.Printf("RabbitMQ URL: %s", cfg.RabbitMQ.URL)
+// Create a sanitized URL for logging (hide credentials)
+parsedURL, err := url.Parse(cfg.RabbitMQ.URL)
+if err == nil && parsedURL.User != nil {
+ parsedURL.User = url.UserPassword(parsedURL.User.Username(), "***")
+ log.Printf("RabbitMQ URL: %s", parsedURL.String())
+} else {
+ log.Printf("RabbitMQ enabled (URL parsing failed)")
+}Don't forget to import "net/url" if you implement this change.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log.Printf("RabbitMQ URL: %s", cfg.RabbitMQ.URL) | |
| // Create a sanitized URL for logging (hide credentials) | |
| parsedURL, err := url.Parse(cfg.RabbitMQ.URL) | |
| if err == nil && parsedURL.User != nil { | |
| parsedURL.User = url.UserPassword(parsedURL.User.Username(), "***") | |
| log.Printf("RabbitMQ URL: %s", parsedURL.String()) | |
| } else { | |
| log.Printf("RabbitMQ enabled (URL parsing failed)") | |
| } |
- integrate RabbitMQ for message processing - implement consumer and message handling logic - support multiple concurrent workers for efficient processing ♻️ refactor(service): remove deprecated RabbitMQ logic - eliminate old RabbitMQ handling from service - prepare service for new RabbitMQ integration
| github.com/spf13/viper v1.12.0 // indirect | ||
| github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect | ||
| github.com/stbenjam/no-sprintf-host-port v0.2.0 // indirect | ||
| github.com/streadway/amqp v1.1.0 // indirect |
| @@ -0,0 +1,54 @@ | |||
| package rabbitmq | |||
There was a problem hiding this comment.
тут папка rzd нужна для тех кто приходит от ржд, лучше просто в инфре реббит держать
| func New(rzdClient *rzd.Client, cfg *config.Config) Service { | ||
| svc := &mainService{rzdClient: rzdClient} | ||
| // Передаем URL из конфигурации | ||
| return svc |
| log.Println("Waiting for messages...") | ||
|
|
||
| // Количество горутин в зависимости от числа ядер процессора | ||
| numWorkers := runtime.NumCPU() * 2 |
There was a problem hiding this comment.
Их лучше указывать в конфигурации. Я посмотрю на этот сервис когда он будет запущен на 0.25 ядра по конфигу, вот будет умора, ноль воркеров.
| } | ||
| defer consumer.Close() | ||
|
|
||
| msgs, err := consumer.Consume("rzd_queue") |
| "github.com/streadway/amqp" | ||
| ) | ||
|
|
||
| type Consumer struct { |
There was a problem hiding this comment.
Раз это чистый консьюмер, он является входной точной, а не выходной. Так что лучше его держать там-же где ты его и используешь - в презентационном слое. Это у меня в Chaika Route Adapter он лежит в инфе, потому что у меня там Client, а не Consumer.
| } | ||
|
|
||
| params := domain.GetTrainRoutesParams{ | ||
| FromCode: int(message["payload"].(map[string]interface{})["fromCode"].(float64)), |
There was a problem hiding this comment.
Можно выделить в мапперы, но это не критично пока
| } | ||
|
|
||
| func sendResponse(response map[string]interface{}) { | ||
| // Здесь вы можете реализовать отправку ответа обратно в очередь или другой механизм доставки |
There was a problem hiding this comment.
Можешь, конечно можешь. Я в тебя верю.
|
|
||
| // RabbitMQ содержит конфигурацию для подключения к RabbitMQ. | ||
| type RabbitMQ struct { | ||
| URL string `yaml:"URL" env:"RABBITMQ_URL,default=amqp://guest:guest@localhost:5672/"` |
There was a problem hiding this comment.
Да, отлично. Добавь сюда ещё всё то что было в ревью выше. Таймауты, название очереди, и т.д.
| @@ -0,0 +1,95 @@ | |||
| package main | |||
There was a problem hiding this comment.
Ну знаешь, есть папочка test....
There was a problem hiding this comment.
Или клади рядом с тестируемым модулем, но только не в корень пж
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
internal/transports/rabbitmq/handler.go (2)
22-25: Parameterize or configure queue name instead of hardcoding.
Hardcoding"rzd_queue"makes queue changes require code edits. Moving this to a config or environment variable is typically more flexible.
31-33: Move worker concurrency settings to configuration.
Specifying the concurrency based on runtime CPU can be surprising in certain environments. Allowing override in config or via environment variable provides more control.
🧹 Nitpick comments (7)
internal/infrastructure/rzd/rabbitmq/consumer.go (4)
1-2: Consider renaming or relocating the package to avoid nesting underrzd.
This file is currently placed ininternal/infrastructure/rzd/rabbitmq, but it doesn’t appear to reference RZD-specific logic. Placing it under something likeinternal/infrastructure/rabbitmqorinternal/transports/rabbitmqmight be more appropriate and reduce confusion.
14-32: Consider providing more contextual logging or standardized error handling.
While this code is functional, you could enhance its maintainability by:
- Standardizing your logs (e.g., using structured logging libraries) to include error metadata.
- Minimizing
log.Printfs in favor of returning well-defined error types up the chain, or standardizing them in your error handling approach.
34-44: Reconsider using auto-ack to prevent message loss.
WithautoAck = true, messages are acknowledged immediately, so any processing error will not re-trigger consumption. You might want to use manual acknowledgment and re-queue or handle failed processing attempts.-false, // auto-ack +false, // auto-ack replaced with manual ack usage
46-54: Ensure theClose()method integrates with ongoing message consumption.
If a worker is still processing a message, closing connections concurrently may lead to unexpected errors. Consider a shutdown workflow (e.g., wait groups, graceful shutdown signals) to ensure messages finish processing before teardown.internal/transports/rabbitmq/handler.go (3)
15-20: Avoid terminating the entire process withlog.Fatalf.
Usinglog.Fatalfwill kill the application immediately, denying a graceful exit strategy. Consider returning the error or using a more controlled shutdown.-if err != nil { - log.Fatalf("Failed to create RabbitMQ consumer: %v", err) -} +if err != nil { + log.Printf("Failed to create RabbitMQ consumer: %v", err) + // return or handle gracefully without abrupt exit +}
48-135: Extract request handlers into separate methods.
This switch block is readable but fairly long. Splitting each case into a dedicated function can simplify testing and improve maintainability.
144-147: Implement a real response-delivery mechanism.
The placeholder logic sends the response to logs. If you intend to respond to a RabbitMQ queue, consider implementing a publish or RPC-style correlation mechanism here.Would you like help creating a follow-up implementation to send responses back via RabbitMQ?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
cmd/rzd-scraper/main.go(3 hunks)internal/infrastructure/rzd/rabbitmq/consumer.go(1 hunks)internal/service/service.go(2 hunks)internal/transports/rabbitmq/handler.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cmd/rzd-scraper/main.go
- internal/service/service.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal/transports/rabbitmq/handler.go (3)
internal/infrastructure/rzd/rabbitmq/consumer.go (1)
NewConsumer(14-32)internal/domain/models.go (3)
GetTrainRoutesParams(80-88)GetTrainCarriagesParams(91-97)SearchStationParams(100-103)internal/domain/enums.go (3)
Direction(5-5)TrainType(34-34)TrainSearchType(13-13)
🔇 Additional comments (1)
internal/infrastructure/rzd/rabbitmq/consumer.go (1)
9-12: Struct fields look good.
TheConsumerstruct clearly holds the connection and channel. This is straightforward and maintainable.
| for i := 0; i < numWorkers; i++ { | ||
| go func(workerID int) { | ||
| for d := range msgs { | ||
| log.Printf("Worker %d: Received a message: %s", workerID, d.Body) | ||
| var message map[string]interface{} | ||
| if err := json.Unmarshal(d.Body, &message); err != nil { | ||
| log.Printf("Worker %d: Error decoding JSON: %v", workerID, err) | ||
| continue | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Consider a robust error-handling strategy for parsing failures.
Currently, a failed JSON parse simply logs and continues. Because auto-ack is on, the message is lost. If critical, consider manual acknowledgments and error-handling logic to re-queue or store the failed message for later reprocessing.
Docstrings generation was requested by @Fascinat0r. * #10 (comment) The following files were modified: * `cmd/rzd-scraper/main.go` * `internal/infrastructure/rzd/rabbitmq/consumer.go` * `internal/service/service.go` * `internal/transports/rabbitmq/handler.go` * `pkg/config/config.go`
|
Note Generated docstrings for this pull request at #11 |
✨ feat(service): implement RabbitMQ message consumption
📝 docs(README): update gRPC run command instruction
💄 style(.gitignore): clean up ignored files
✅ test(queue): create test queue for RabbitMQ messages
Summary by CodeRabbit
New Features
Chores