Skip to content

Commit c3f6087

Browse files
authored
RabbitMQ documentation (#5)
* - Added more info regarding publisher and consumer - Provide information about the flow, and implementation of rabbitMQ * better picture
1 parent 6282929 commit c3f6087

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed
108 KB
Loading
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
---
2+
title: RabbitMQ
3+
description: Documentation of Rabbitmq
4+
---
5+
6+
## Definition
7+
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.
8+
For more information:
9+
- https://www.rabbitmq.com/tutorials
10+
11+
## Usage for GCS
12+
- RabbitMQ serves, as the central message broker in GCS(Ground Control Station) architectures, enabling efficient communication between multiple vehicles using different queues within one channel and one connection to the same server.
13+
### Architecture Components:
14+
- Publishers: Vehicle systems that published the data from the vehicle
15+
- Subscribers: Backend systems that receive and process messages
16+
- Queues: Separate message channels for different vehicle types or functions
17+
- Exchange: Routes messages to appropriate vehicle queues
18+
19+
![alt text](image2.png)
20+
21+
### Benefits:
22+
- Each vehicle has its own data queue,no mixing data
23+
- If backend is temporarily down, messages wait in the queue
24+
- Easy to add new vehicles without changing existing code
25+
- Data flows continuously from vehicles to display
26+
## Code example
27+
### Set up server
28+
> ```rust
29+
> use rabbitmq_client::RabbitMQPublisher;
30+
> let publisher = RabbitMQPublisher::new("amqp://admin:admin@localhost:5672/%2f").await?;
31+
>```
32+
33+
### Vehicle data publishing
34+
>```rust
35+
>impl RabbitMQPublisher {
36+
> pub async fn new(addr: &str) -> LapinResult<Self> {
37+
> let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
38+
> let channel = connection.create_channel().await?;
39+
> Ok(Self { channel })
40+
> }>
41+
>
42+
> pub async fn publish_telemetry(
43+
> &self,
44+
> name_of_vehicle: &str,
45+
> telemetry: TelemetryData,
46+
> ) -> LapinResult<()> {
47+
> let queue_name = format!("telemetry_{}", name_of_vehicle);
48+
> self.channel
49+
> .queue_declare(
50+
> &queue_name,
51+
> QueueDeclareOptions {
52+
> durable: true,
53+
> auto_delete: false,
54+
> exclusive: false,
55+
> ..Default::default()
56+
> },
57+
> FieldTable::default(),
58+
> )
59+
> .await?;
60+
>
61+
> let payload = serde_json::to_vec(&telemetry)
62+
> .map_err(|e| lapin::Error::from(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
63+
>
64+
> self.channel
65+
> .basic_publish(
66+
> "",
67+
> &queue_name,
68+
> BasicPublishOptions::default(),
69+
> &payload,
70+
> BasicProperties::default()
71+
> .with_content_type("application/json".into())
72+
> .with_delivery_mode(2),
73+
> )
74+
> .await?;
75+
>
76+
> println!("Published telemetry data for vehicle {}", name_of_vehicle);
77+
> Ok(())
78+
> }
79+
>}
80+
81+
- The publisher connects to the server initialized
82+
- Creates a channel where the data is going to be passed
83+
- Creates queues for each vehicle
84+
- Passes the information based on their respective name
85+
86+
87+
88+
### Vehicle data consumer
89+
>```rust
90+
> pub async fn init_consumers(&self) -> LapinResult<()> {
91+
> // Start heartbeat monitor
92+
> self.start_heartbeat_monitor().await;
93+
> for vehicle_id in VALID_VEHICLE_IDS.iter() {
94+
> let queue_name = format!("telemetry_{}", vehicle_id);
95+
> println!("Initializing consumer for queue: {}", queue_name);
96+
>
97+
> // Declare queue first
98+
> self.queue_declare(&queue_name).await?;
99+
>
100+
> tokio::spawn({
101+
> let consumer = self.clone();
102+
> let queue = queue_name.clone();
103+
> async move {
104+
> if let Err(e) = consumer.start_consuming(&queue).await {
105+
> eprintln!("Failed to consume from queue {}: {}", queue, e);
106+
> }
107+
> }
108+
> });
109+
>}
110+
>
111+
> Ok(())
112+
113+
- Sets up consumers for all valid vehicle IDs (eru, mea, mra, fra)
114+
- Starts heartbeat monitoring to track vehicle connections
115+
- Creates separate background tasks for each vehicle queue
116+
- Each consumer watches its vehicle's queue for new messages
117+
- When data arrives, it gets processed into objects for the frontend
118+
- Updates the display screen with current vehicle status and telemetry
119+
- Stores data in database for historical tracking
120+
121+
122+
### Data Flow Summary:
123+
#### VehicleRabbitMQ QueueBackend ConsumerFrontend Display + Database Storage

0 commit comments

Comments
 (0)