Skip to content

Commit ce7ac7a

Browse files
committed
consolidate some duplicated code
Signed-off-by: Lance-Drane <[email protected]>
1 parent b335b31 commit ce7ac7a

File tree

3 files changed

+50
-79
lines changed

3 files changed

+50
-79
lines changed

proxy-http-client/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use tokio::sync::oneshot;
44

5-
use intersect_ingress_proxy_common::configuration::get_configuration;
5+
use intersect_ingress_proxy_common::configuration::{get_configuration, Protocol};
66
use intersect_ingress_proxy_common::protocols::{
77
amqp::init::init_amqp_proto_handlers,
88
interfaces::{PublishProtoHandler, SubscribeProtoHandler},
@@ -71,7 +71,7 @@ async fn main() -> anyhow::Result<()> {
7171
}
7272

7373
match configuration.broker.protocol {
74-
intersect_ingress_proxy_common::configuration::Protocol::Amqp => {
74+
Protocol::Amqp => {
7575
match init_amqp_proto_handlers(&configuration.broker, APPLICATION_NAME).await {
7676
Ok((publish_proto_handler, subscribe_proto_handler)) => {
7777
begin_execution(
@@ -87,7 +87,7 @@ async fn main() -> anyhow::Result<()> {
8787
}
8888
}
8989
}
90-
intersect_ingress_proxy_common::configuration::Protocol::Mqtt => {
90+
Protocol::Mqtt => {
9191
match init_mqtt_proto_handlers(&configuration.broker, APPLICATION_NAME).await {
9292
Ok((publish_proto_handler, subscribe_proto_handler)) => {
9393
begin_execution(

proxy-http-server/src/main.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use tokio::sync::oneshot;
44

5-
use intersect_ingress_proxy_common::configuration::get_configuration;
5+
use intersect_ingress_proxy_common::configuration::{get_configuration, Protocol};
66
use intersect_ingress_proxy_common::protocols::{
77
amqp::init::init_amqp_proto_handlers, interfaces::SubscribeProtoHandler,
88
mqtt::init::init_mqtt_proto_handlers,
@@ -14,7 +14,7 @@ use intersect_ingress_proxy_common::telemetry::{
1414
use proxy_http_server::{
1515
broadcaster::Broadcaster,
1616
configuration::Settings,
17-
webapp_server::{AmqpWebApplication, MqttWebApplication, WebApplication},
17+
webapp_server::{build_amqp_webapp, build_mqtt_webapp, WebApplication},
1818
APPLICATION_NAME,
1919
};
2020

@@ -26,7 +26,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
2626
async fn begin_execution(
2727
configuration: Settings,
2828
subscribe_proto_handler: impl SubscribeProtoHandler,
29-
application: impl WebApplication,
29+
application: WebApplication,
3030
broadcaster: Arc<Broadcaster>,
3131
) -> anyhow::Result<()> {
3232
// How this works:
@@ -73,10 +73,10 @@ async fn main() -> anyhow::Result<()> {
7373
let broadcaster = Broadcaster::new();
7474

7575
match configuration.broker.protocol {
76-
intersect_ingress_proxy_common::configuration::Protocol::Amqp => {
76+
Protocol::Amqp => {
7777
match init_amqp_proto_handlers(&configuration.broker, APPLICATION_NAME).await {
7878
Ok((publish_proto_handler, subscribe_proto_handler)) => {
79-
let web_server = AmqpWebApplication::build(
79+
let web_server = build_amqp_webapp(
8080
&configuration,
8181
broadcaster.clone(),
8282
publish_proto_handler,
@@ -96,10 +96,10 @@ async fn main() -> anyhow::Result<()> {
9696
}
9797
}
9898
}
99-
intersect_ingress_proxy_common::configuration::Protocol::Mqtt => {
99+
Protocol::Mqtt => {
100100
match init_mqtt_proto_handlers(&configuration.broker, APPLICATION_NAME).await {
101101
Ok((publish_proto_handler, subscribe_proto_handler)) => {
102-
let web_server = MqttWebApplication::build(
102+
let web_server = build_mqtt_webapp(
103103
&configuration,
104104
broadcaster.clone(),
105105
publish_proto_handler,

proxy-http-server/src/webapp_server.rs

Lines changed: 40 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -53,26 +53,18 @@ fn get_router<S: WebApplicationState + Send + Sync + 'static>(initial_state: S)
5353
.fallback(handler_404)
5454
}
5555

56-
pub trait WebApplication {
57-
fn port(&self) -> u16;
58-
fn run_until_stopped(
59-
self,
60-
) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send;
61-
}
62-
63-
pub struct AmqpWebApplication {
56+
pub struct WebApplication {
6457
port: u16,
6558
server: WebAppServer,
6659
}
6760

68-
impl AmqpWebApplication {
61+
impl WebApplication {
6962
///
7063
/// # Errors
7164
/// - errors if unable to bind to provided TCP port
7265
pub async fn build(
7366
configuration: &Settings,
74-
broadcaster: Arc<Broadcaster>,
75-
proto_handler: AmqpPublishProtoHandler,
67+
initial_state: impl WebApplicationState + Send + Sync + 'static,
7668
) -> Result<Self, anyhow::Error> {
7769
let address = format!(
7870
"{}:{}",
@@ -85,87 +77,66 @@ impl AmqpWebApplication {
8577
);
8678
let listener = TcpListener::bind(address).await?;
8779
let port = listener.local_addr()?.port();
88-
let router = get_router(AmqpWebApplicationState {
89-
proto_handler: proto_handler.clone(),
90-
broadcaster,
91-
username: configuration.username.clone(),
92-
password: configuration.password.clone(),
93-
});
80+
81+
let router = get_router(initial_state);
9482
let server = axum::serve(listener, router);
9583

9684
tracing::info!("Web server is running on port {}", port);
9785

9886
Ok(Self { port, server })
9987
}
100-
}
10188

102-
impl WebApplication for AmqpWebApplication {
103-
fn port(&self) -> u16 {
89+
pub fn port(&self) -> u16 {
10490
self.port
10591
}
10692

10793
///
10894
/// # Errors
10995
/// - Errors if unable to initialize web server
110-
async fn run_until_stopped(self) -> Result<(), std::io::Error> {
96+
pub async fn run_until_stopped(self) -> Result<(), std::io::Error> {
11197
// the return type of "with_graceful_shutdown" is unstable, so set it up here
11298
self.server
11399
.with_graceful_shutdown(wait_for_os_signal())
114100
.await
115101
}
116102
}
117103

118-
pub struct MqttWebApplication {
119-
port: u16,
120-
server: WebAppServer,
121-
}
122-
123-
impl MqttWebApplication {
124-
///
125-
/// # Errors
126-
/// - errors if unable to bind to provided TCP port
127-
pub async fn build(
128-
configuration: &Settings,
129-
broadcaster: Arc<Broadcaster>,
130-
proto_handler: MqttPublishProtoHandler,
131-
) -> Result<Self, anyhow::Error> {
132-
let address = format!(
133-
"{}:{}",
134-
if configuration.production {
135-
"0.0.0.0"
136-
} else {
137-
"127.0.0.1"
138-
},
139-
configuration.app_port
140-
);
141-
let listener = TcpListener::bind(address).await?;
142-
let port = listener.local_addr()?.port();
143-
let router = get_router(MqttWebApplicationState {
144-
proto_handler: proto_handler.clone(),
104+
///
105+
/// # Errors
106+
/// - errors if unable to bind to provided TCP port in configuration
107+
pub async fn build_amqp_webapp(
108+
configuration: &Settings,
109+
broadcaster: Arc<Broadcaster>,
110+
proto_handler: AmqpPublishProtoHandler,
111+
) -> Result<WebApplication, anyhow::Error> {
112+
WebApplication::build(
113+
configuration,
114+
AmqpWebApplicationState {
115+
proto_handler,
145116
broadcaster,
146117
username: configuration.username.clone(),
147118
password: configuration.password.clone(),
148-
});
149-
let server = axum::serve(listener, router);
150-
151-
tracing::info!("Web server is running on port {}", port);
152-
153-
Ok(Self { port, server })
154-
}
119+
},
120+
)
121+
.await
155122
}
156123

157-
impl WebApplication for MqttWebApplication {
158-
fn port(&self) -> u16 {
159-
self.port
160-
}
161-
162-
///
163-
/// # Errors
164-
/// - Errors if unable to initialize web server
165-
async fn run_until_stopped(self) -> Result<(), std::io::Error> {
166-
// the return type of "with_graceful_shutdown" is unstable, so set it up here
167-
self.server
168-
.with_graceful_shutdown(wait_for_os_signal())
169-
.await
170-
}
124+
///
125+
/// # Errors
126+
/// - errors if unable to bind to provided TCP port in configuration
127+
pub async fn build_mqtt_webapp(
128+
configuration: &Settings,
129+
broadcaster: Arc<Broadcaster>,
130+
proto_handler: MqttPublishProtoHandler,
131+
) -> Result<WebApplication, anyhow::Error> {
132+
WebApplication::build(
133+
configuration,
134+
MqttWebApplicationState {
135+
proto_handler,
136+
broadcaster,
137+
username: configuration.username.clone(),
138+
password: configuration.password.clone(),
139+
},
140+
)
141+
.await
171142
}

0 commit comments

Comments
 (0)