Skip to content

Commit 73f4735

Browse files
committed
#4 - implement MQTT, without manual acknowledgments
Signed-off-by: Lance-Drane <[email protected]>
1 parent d723217 commit 73f4735

File tree

14 files changed

+205
-35
lines changed

14 files changed

+205
-35
lines changed

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,18 @@ Specific configuration structs are in `proxy-http-server/src/configuration.rs` a
8484

8585
## Setup
8686

87-
### Using the RabbitMQ web management UIs
87+
### Using the RabbitMQ web management UIs (AMQP)
8888

8989
These instructions assume you are using the docker compose configuration and the default `conf.yaml` configurations for each.
9090

9191
1) Make sure that you run `docker compose up -d` from the root directory, to start both brokers and their associated management UIs.
9292
2) Make sure that you have both applications started (do NOT start more than 1 of each). Each application should be connected to a separate broker.
9393
3) To login to the broker that the server instance uses, go to localhost:15672, username `intersect_username`, password `intersect_password`
9494
4) To login to the broker that the client instance uses, go to localhost:15673, username `intersect_username`, password `intersect_password`
95-
5) On each application, click on the `Exchanges` tab, and click on the `intersect-messages` exchange.
95+
5)
96+
- IF AMQP: On each application, click on the `Exchanges` tab, and click on the `intersect-messages` exchange.
97+
- IF MQTT: On each application, click on the `Queues and Streams` tab, then click on the queue (it should look like `mqtt-subscription-proxy-http-clientqos1` or `mqtt-subscription-proxy-http-serverqos1`).
98+
9699
6) Make sure that the `Publish message` dropdown is expanded, select the large text area which is labeled with `Payload:`
97100

98101
For the application on `localhost:15672`, set the payload to below (no newlines):

proxy-http-client/conf.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ broker:
88
password: intersect_password
99
host: "127.0.0.1"
1010
# note: differs from other config file (two separate brokers used)
11+
# -- IF AMQP --
1112
port: 5673
1213
protocol: "amqp"
14+
# -- IF MQTT --
15+
#port: 1884
16+
#protocol: "mqtt"
1317
# use amqp topic notation, note that the system is different from the other configuration
1418
topic_prefix: "organization.facility.system2" # CHANGE THIS PER DEPLOYMENT!!!
1519
log_level: "debug"

proxy-http-client/src/event_source.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ async fn send_message(
2020
}
2121
let (topic, data) = es_data_result.unwrap();
2222

23+
if let Err(e) = proto_handler.preverify_publish(&topic) {
24+
tracing::warn!("invalid topic name -- {e}");
25+
return Ok(());
26+
}
2327
proto_handler.publish_message(&topic, data).await
2428
}
2529

proxy-http-server/conf.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ broker:
55
password: intersect_password
66
host: "127.0.0.1"
77
# note: differs from other config file (two separate brokers used)
8+
# -- IF AMQP --
89
port: 5672
910
protocol: "amqp"
11+
# -- IF MQTT --
12+
#port: 1883
13+
#protocol: "mqtt"
1014
# use amqp topic notation
1115
topic_prefix: "organization.facility.system" # CHANGE THIS PER DEPLOYMENT!!!
1216
log_level: "debug"

shared-deps/src/protocols/amqp/publish.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use amqprs::{channel::BasicPublishArguments, BasicProperties};
22
use deadpool_amqprs::Pool;
33

4-
use crate::protocols::amqp::utils::{get_channel, is_routing_key_compliant};
4+
use crate::protocols::amqp::utils::get_channel;
5+
use crate::protocols::proxy::is_routing_key_compliant;
56
use crate::{
67
intersect_messaging::INTERSECT_MESSAGE_EXCHANGE, protocols::interfaces::PublishProtoHandler,
78
};
@@ -34,20 +35,13 @@ impl PublishProtoHandler for AmqpPublishProtoHandler {
3435
fn preverify_publish(&self, topic: &str) -> Result<(), String> {
3536
if !is_routing_key_compliant(topic) {
3637
return Err(format!(
37-
"'{topic}' does not meet the AMQP routing key specification"
38+
"'{topic}' does not meet the INTERSECT proxy-app routing key specification."
3839
));
3940
}
4041
Ok(())
4142
}
4243

4344
async fn publish_message(&self, topic: &str, data: String) -> Result<(), &str> {
44-
if !is_routing_key_compliant(topic) {
45-
tracing::warn!(
46-
"{} is not a valid AMQP topic name, will not attempt publish",
47-
topic
48-
);
49-
return Ok(());
50-
}
5145
let connection = self.pool.get().await.map_err(|e| {
5246
tracing::error!(error = ?e, "cannot connect to broker");
5347
"cannot connect to broker"

shared-deps/src/protocols/amqp/subscribe.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use crate::intersect_messaging::{make_eventsource_data, should_message_passthrou
1212
use crate::protocols::amqp::utils::{get_channel, verify_connection_pool};
1313
use crate::protocols::interfaces::{HttpBroadcast, SubscribeProtoHandler};
1414

15-
#[derive(Clone)]
1615
pub struct AmqpSubscribeProtoHandler {
1716
pool: Pool,
1817
/// application_name is used for the hardcoded queue name and for debugging purposes

shared-deps/src/protocols/amqp/utils.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{configuration::BrokerSettings, intersect_messaging::INTERSECT_MESSAG
1212
///
1313
/// NOTE: calling this function cannot fail on its own, you have to call pool.get().await for it to fail
1414
#[must_use]
15-
pub fn get_connection_pool(connection_details: &BrokerSettings) -> Pool {
15+
pub(crate) fn get_connection_pool(connection_details: &BrokerSettings) -> Pool {
1616
let mut args = OpenConnectionArguments::new(
1717
&connection_details.host,
1818
connection_details.port,
@@ -29,7 +29,10 @@ pub fn get_connection_pool(connection_details: &BrokerSettings) -> Pool {
2929
///
3030
/// # Errors
3131
/// - Errors if can't connect to server or has invalid permissions
32-
pub async fn verify_connection_pool(pool: &Pool, queue_name_src: &str) -> Result<(), String> {
32+
pub(crate) async fn verify_connection_pool(
33+
pool: &Pool,
34+
queue_name_src: &str,
35+
) -> Result<(), String> {
3336
let connection = pool
3437
.get()
3538
.await
@@ -83,7 +86,7 @@ pub async fn verify_connection_pool(pool: &Pool, queue_name_src: &str) -> Result
8386
///
8487
/// # Errors
8588
/// - Errors if failure to communicate with broker server
86-
pub async fn get_channel(connection: &Connection) -> Result<Channel, amqprs::error::Error> {
89+
pub(crate) async fn get_channel(connection: &Connection) -> Result<Channel, amqprs::error::Error> {
8790
let channel = connection.open_channel(None).await?;
8891
channel.register_callback(DefaultChannelCallback).await?;
8992
Ok(channel)
@@ -105,12 +108,3 @@ async fn make_exchange(channel: &Channel) -> Result<(), amqprs::error::Error> {
105108
)
106109
.await
107110
}
108-
109-
/// make sure that the routing key is valid for AMQP
110-
///
111-
/// we do not permit publishing on wildcards
112-
#[must_use]
113-
pub(crate) fn is_routing_key_compliant(key: &str) -> bool {
114-
!key.chars()
115-
.any(|c| !c.is_alphanumeric() && c != '-' && c != '_' && c != '.' && c != ':')
116-
}

shared-deps/src/protocols/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod amqp;
22
pub mod interfaces;
33
pub mod mqtt;
4+
pub mod proxy;

shared-deps/src/protocols/mqtt/init.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use secrecy::ExposeSecret;
55

66
use crate::{
77
configuration::BrokerSettings,
8-
protocols::mqtt::{publish::MqttPublishProtoHandler, subscribe::MqttSubscribeProtoHandler},
8+
protocols::mqtt::{
9+
publish::MqttPublishProtoHandler, subscribe::MqttSubscribeProtoHandler,
10+
utils::subscribe_all,
11+
},
912
};
1013

1114
/// Sets up the MQTT proto handlers, and verifies that we can connect to the MQTT broker.
@@ -25,7 +28,8 @@ pub async fn init_mqtt_proto_handlers(
2528
// TODO may want to handle message sizes >= 4 GiB
2629
mqtt_options.set_max_packet_size(1 << 32, 1 << 32);
2730
mqtt_options.set_clean_session(false);
28-
mqtt_options.set_manual_acks(true);
31+
// TODO - implement manual acks
32+
// mqtt_options.set_manual_acks(true);
2933
mqtt_options.set_keep_alive(Duration::from_secs(60));
3034

3135
let (mqtt_client, mut mqtt_event_loop) = AsyncClient::new(mqtt_options, 256);
@@ -58,6 +62,9 @@ pub async fn init_mqtt_proto_handlers(
5862
}
5963
}
6064

65+
// listen for every single message on the exchange, we must do this due to the way userspace messages work
66+
subscribe_all(&mqtt_client).await?;
67+
6168
let publish_handler = MqttPublishProtoHandler::new(application_name, mqtt_client.clone());
6269
let subscribe_handler =
6370
MqttSubscribeProtoHandler::new(application_name, mqtt_client, mqtt_event_loop);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod init;
22
pub mod publish;
33
pub mod subscribe;
4+
pub mod utils;

0 commit comments

Comments
 (0)