Skip to content

Commit fd71a5f

Browse files
committed
catch SIGHUP/SIGTERM, optimize memory usage
Signed-off-by: Lance-Drane <[email protected]>
1 parent 0f0c60e commit fd71a5f

File tree

17 files changed

+160
-101
lines changed

17 files changed

+160
-101
lines changed

proxy-http-client/src/configuration.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/// FOR DEVOPS USERS:
22
/// 1) The root struct is "Settings", follow logic from there
33
/// 2) integers can be provided as a string in config files or environment variables
4-
/// 3) if using environment variables, see comment in "get_configuration()" as an example of how nesting works
5-
/// 4) if using ONLY a file variable, this is determined from the APP_CONFIG_FILE environment variable (environment variables have higher precedence)
4+
/// 3) if using environment variables, see comment in `get_configuration()` as an example of how nesting works
5+
/// 4) if using ONLY a file variable, this is determined from the `APP_CONFIG_FILE` environment variable (environment variables have higher precedence)
66
/// 5) Additional logic can be found in shared-deps/src/configuration.rs
77
use secrecy::SecretString;
88

@@ -21,7 +21,7 @@ pub struct ExternalProxy {
2121
pub password: SecretString,
2222
}
2323

24-
#[derive(serde::Deserialize, Clone)]
24+
#[derive(serde::Deserialize, Clone, Debug)]
2525
pub struct Settings {
2626
/// configuration for the broker, which our applications are listening to
2727
pub broker: BrokerSettings, // TODO make this a Vec<BrokerSettings>

proxy-http-client/src/event_source.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use intersect_ingress_proxy_common::protocols::amqp::{
1010
use intersect_ingress_proxy_common::server_paths::SUBSCRIBE_URL;
1111
use intersect_ingress_proxy_common::signals::wait_for_os_signal;
1212

13-
use crate::configuration::Settings;
13+
use crate::configuration::ExternalProxy;
1414

1515
/// Return Err only if we weren't able to publish a correct message to the broker, invalid messages are ignored
16-
async fn send_message(message: String, connection_pool: Pool) -> Result<(), String> {
16+
async fn send_message(message: String, connection_pool: &Pool) -> Result<(), String> {
1717
let es_data_result = extract_eventsource_data(&message);
1818
if es_data_result.is_err() {
1919
return Ok(());
@@ -39,28 +39,28 @@ async fn send_message(message: String, connection_pool: Pool) -> Result<(), Stri
3939
})?;
4040

4141
match amqp_publish_message(channel, &topic, data).await {
42-
Ok(_) => Ok(()),
43-
Err(_) => Err(
42+
Ok(()) => Ok(()),
43+
Err(()) => Err(
4444
"WARNING: message received from other proxy was NOT published on our own broker."
4545
.into(),
4646
),
4747
}
4848
}
4949

5050
/// Return value - exit code to use
51-
pub async fn event_source_loop(configuration: &Settings, connection_pool: Pool) -> i32 {
51+
///
52+
/// # Panics
53+
/// - Inner API could potentially panic but is currently not expected to do so
54+
pub async fn event_source_loop(other_proxy: ExternalProxy, connection_pool: Pool) -> i32 {
5255
let mut es = EventSource::new(
5356
reqwest::Client::new()
54-
.get(format!(
55-
"{}{}",
56-
&configuration.other_proxy.url, SUBSCRIBE_URL
57-
))
57+
.get(format!("{}{}", &other_proxy.url, SUBSCRIBE_URL))
5858
.basic_auth(
59-
&configuration.other_proxy.username,
60-
Some(configuration.other_proxy.password.expose_secret()),
59+
&other_proxy.username,
60+
Some(&other_proxy.password.expose_secret()),
6161
),
6262
)
63-
.unwrap();
63+
.expect("The event source request body was somehow a stream?");
6464
let mut rc = 0;
6565
loop {
6666
tokio::select! {
@@ -76,10 +76,10 @@ pub async fn event_source_loop(configuration: &Settings, connection_pool: Pool)
7676
Some(event) => {
7777
match event {
7878
Ok(Event::Open) => {
79-
tracing::info!("connected to {}", &configuration.other_proxy.url);
79+
tracing::info!("connected to {}", &other_proxy.url);
8080
},
8181
Ok(Event::Message(message)) => {
82-
if let Err(e) = send_message(message.data, connection_pool.clone()).await {
82+
if let Err(e) = send_message(message.data, &connection_pool).await {
8383
tracing::error!(e);
8484
};
8585
},
@@ -94,7 +94,7 @@ pub async fn event_source_loop(configuration: &Settings, connection_pool: Pool)
9494
}
9595
},
9696
// OS kill signal
97-
_ = wait_for_os_signal() => {
97+
() = wait_for_os_signal() => {
9898
break;
9999
},
100100
};

proxy-http-client/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
3232
}
3333

3434
// set up broker connection pool
35-
let pool = get_connection_pool(&configuration.broker).await;
35+
let pool = get_connection_pool(&configuration.broker);
3636
if let Err(msg) = verify_connection_pool(&pool, APPLICATION_NAME).await {
3737
tracing::error!(msg);
3838
std::process::exit(1);
@@ -54,8 +54,11 @@ pub async fn main() -> anyhow::Result<()> {
5454
rx,
5555
);
5656

57+
let other_proxy = configuration.other_proxy.clone();
58+
drop(configuration);
59+
5760
// this will run until we get an event source error or we catch an OS signal
58-
let rc = event_source_loop(&configuration, pool.clone()).await;
61+
let rc = event_source_loop(other_proxy, pool).await;
5962

6063
tracing::info!("Attempting graceful shutdown: No longer listening for events over HTTP");
6164
drop(tx);

proxy-http-client/src/poster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct Poster {
1010
}
1111

1212
impl Poster {
13+
#[must_use]
1314
pub fn new(proxy: &ExternalProxy) -> Self {
1415
let http_client = reqwest::Client::new()
1516
.post(format!("{}{}", proxy.url, PUBLISH_URL))

proxy-http-server/src/broadcaster.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ pub struct Broadcaster {
1818
impl Broadcaster {
1919
/// Create the broadcaster. Note that it automatically wraps it in an Arc.
2020
/// The broadcaster manages its producer but does not manage its consumers
21+
#[must_use]
2122
pub fn new() -> Arc<Self> {
2223
// use a fairly large channel capacity to account for potential receiver lags
2324
let (tx, _) = broadcast::channel(256);
2425
Arc::new(Broadcaster { fanout: tx })
2526
}
2627

2728
/// Add a broadcaster consumer - the calling function is responsible for cleaning up the consumer
29+
#[must_use]
2830
pub fn add_client(&self) -> broadcast::Receiver<Event> {
2931
self.fanout.subscribe()
3032
}
@@ -41,6 +43,7 @@ impl Broadcaster {
4143
/// 3) Somehow transfer these messages over to the other message broker, make the messages their responsibility.
4244
///
4345
/// Once the messages are on the other message broker, proxy-http-server and proxy-http-client don't need to care, handling them will be the SDK's job.
46+
#[must_use]
4447
pub fn broadcast(&self, event: &str) -> usize {
4548
self.fanout.send(Event::default().data(event)).unwrap_or(0)
4649
}

proxy-http-server/src/configuration.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/// FOR DEVOPS USERS:
22
/// 1) The root struct is "Settings", follow logic from there
33
/// 2) integers can be provided as a string in config files or environment variables
4-
/// 3) if using environment variables, see comment in "get_configuration()" as an example of how nesting works
5-
/// 4) if using ONLY a file variable, this is determined from the APP_CONFIG_FILE environment variable (environment variables have higher precedence)
4+
/// 3) if using environment variables, see comment in `get_configuration()` as an example of how nesting works
5+
/// 4) if using ONLY a file variable, this is determined from the `APP_CONFIG_FILE` environment variable (environment variables have higher precedence)
66
/// 5) Additional logic can be found in shared-deps/src/configuration.rs
77
use secrecy::SecretString;
88
use serde_aux::field_attributes::deserialize_number_from_string;
@@ -11,7 +11,7 @@ use intersect_ingress_proxy_common::configuration::{
1111
deserialize_enforce_topic_prefixes, BrokerSettings, LogLevel,
1212
};
1313

14-
#[derive(serde::Deserialize, Clone)]
14+
#[derive(serde::Deserialize, Clone, Debug)]
1515
pub struct Settings {
1616
/// configuration for the broker, which our applications are listening to
1717
pub broker: BrokerSettings, // TODO make this a Vec<BrokerSettings>

proxy-http-server/src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
3333
}
3434

3535
// set up broker connection pool
36-
let pool = get_connection_pool(&configuration.broker).await;
36+
let pool = get_connection_pool(&configuration.broker);
3737
if let Err(msg) = verify_connection_pool(&pool, APPLICATION_NAME).await {
3838
tracing::error!(msg);
3939
std::process::exit(1);
@@ -54,10 +54,12 @@ async fn main() -> anyhow::Result<()> {
5454
pool,
5555
configuration.topic_prefix.clone(),
5656
APPLICATION_NAME.into(),
57-
broadcaster.clone(),
57+
broadcaster,
5858
rx,
5959
);
6060

61+
drop(configuration);
62+
6163
application.run_until_stopped().await?;
6264

6365
tracing::info!("Application shutting down, please wait for cleanups...");

proxy-http-server/src/routes/publish.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ use intersect_ingress_proxy_common::protocols::amqp::{
1616
use crate::webapp::WebApplicationState;
1717

1818
/// HTTP POST endpoint which will publish a message meeting the INTERSECT specification
19+
///
20+
/// # Errors
21+
/// - Sends back a 401 if authentication is incorrect
22+
/// - Sends back a 400 if the message body is improperly formatted
23+
/// - Sends back a 500 if the server was unable to send the message
1924
pub async fn publish_message(
2025
State(app_state): State<Arc<WebApplicationState>>,
2126
TypedHeader(authorization): TypedHeader<Authorization<Basic>>,
@@ -50,7 +55,7 @@ pub async fn publish_message(
5055
);
5156
return Err((
5257
StatusCode::BAD_REQUEST,
53-
format!("{} is not a valid AMQP topic name", topic),
58+
format!("{topic} is not a valid AMQP topic name"),
5459
));
5560
}
5661
tracing::debug!("Publishing message with topic: {}", &topic);
@@ -72,7 +77,7 @@ pub async fn publish_message(
7277
})?;
7378
amqp_publish_message(channel, &topic, data)
7479
.await
75-
.map_err(|_| {
80+
.map_err(|()| {
7681
(
7782
StatusCode::INTERNAL_SERVER_ERROR,
7883
"server fault, message not published".to_string(),

proxy-http-server/src/routes/subscribe.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::sync::Arc;
1414
use crate::webapp::WebApplicationState;
1515
use intersect_ingress_proxy_common::signals::wait_for_os_signal;
1616

17+
#[allow(clippy::needless_pass_by_value)]
1718
fn sse_response(
1819
app_state: Arc<WebApplicationState>,
1920
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
@@ -23,7 +24,7 @@ fn sse_response(
2324
loop {
2425
tokio::select! {
2526
// if we catch an OS signal, disconnect the client
26-
_ = wait_for_os_signal() => {
27+
() = wait_for_os_signal() => {
2728
break;
2829
},
2930
// send the broadcast message to the client, and continue listening for more messages
@@ -36,25 +37,25 @@ fn sse_response(
3637
Err(e) => {
3738
match e {
3839
tokio::sync::broadcast::error::RecvError::Closed => {
39-
tracing::error!(error = ?e, "Broadcasting pipeline to SSE somehow closed, should not see this message!")
40+
tracing::error!(error = ?e, "Broadcasting pipeline to SSE somehow closed, should not see this message!");
4041
},
4142
tokio::sync::broadcast::error::RecvError::Lagged(lag_count) => {
42-
tracing::error!(error = ?e, "SSE has missed {} messages from broadcaster", lag_count)
43+
tracing::error!(error = ?e, "SSE has missed {} messages from broadcaster", lag_count);
4344
},
44-
};
45+
}
4546
},
4647
}
4748
},
48-
};
49-
};
49+
}
50+
}
5051
};
5152

5253
Sse::new(stream).keep_alive(KeepAlive::default())
5354
}
5455

5556
/// Resources:
56-
/// https://github.com/tokio-rs/axum/discussions/1670
57-
/// https://github.com/tokio-rs/axum/discussions/2264
57+
/// `<https://github.com/tokio-rs/axum/discussions/1670>`
58+
/// `<https://github.com/tokio-rs/axum/discussions/2264>`
5859
pub async fn sse_handler(
5960
State(app_state): State<Arc<WebApplicationState>>,
6061
TypedHeader(authorization): TypedHeader<Authorization<Basic>>,

proxy-http-server/src/webapp.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ pub struct WebApplication {
4242
}
4343

4444
impl WebApplication {
45+
///
46+
/// # Errors
47+
/// - errors if unable to bind to provided TCP port
4548
pub async fn build(
4649
configuration: &Settings,
4750
broadcaster: Arc<Broadcaster>,
@@ -57,8 +60,8 @@ impl WebApplication {
5760
configuration.app_port
5861
);
5962
let listener = TcpListener::bind(address).await?;
60-
let port = listener.local_addr().unwrap().port();
61-
let server = run(listener, configuration, broadcaster, amqp_pool).await?;
63+
let port = listener.local_addr()?.port();
64+
let server = create_axum(listener, configuration, broadcaster, amqp_pool);
6265

6366
tracing::info!("Web server is running on port {}", port);
6467

@@ -69,6 +72,9 @@ impl WebApplication {
6972
self.port
7073
}
7174

75+
///
76+
/// # Errors
77+
/// - Errors if unable to initialize web server
7278
pub async fn run_until_stopped(self) -> Result<(), std::io::Error> {
7379
// the return type of "with_graceful_shutdown" is unstable, so set it up here
7480
self.server
@@ -77,12 +83,12 @@ impl WebApplication {
7783
}
7884
}
7985

80-
async fn run(
86+
fn create_axum(
8187
listener: TcpListener,
8288
configuration: &Settings,
8389
broadcaster: Arc<Broadcaster>,
8490
amqp_pool: Pool,
85-
) -> Result<WebAppServer, anyhow::Error> {
91+
) -> WebAppServer {
8692
let middleware = ServiceBuilder::new()
8793
.set_x_request_id(MakeRequestUuid)
8894
.layer(
@@ -111,6 +117,5 @@ async fn run(
111117
.route("/healthcheck", get(health_check))
112118
.fallback(handler_404);
113119

114-
let server = axum::serve(listener, app);
115-
Ok(server)
120+
axum::serve(listener, app)
116121
}

0 commit comments

Comments
 (0)