Skip to content

Commit c87507d

Browse files
committed
Merge remote-tracking branch 'upstream/main' into feature/network-settings
Signed-off-by: Jan Zachmann <50990105+JanZachmann@users.noreply.github.com>
2 parents 4b1e148 + 6986e22 commit c87507d

File tree

3 files changed

+45
-66
lines changed

3 files changed

+45
-66
lines changed

src/main.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,30 @@ const MEMORY_LIMIT_BYTES: usize = 10 * 1024 * 1024;
4949

5050
type UiApi = Api<OmnectDeviceServiceClient, KeycloakProvider>;
5151

52+
enum ShutdownReason {
53+
Restart,
54+
Shutdown,
55+
}
56+
57+
impl std::fmt::Display for ShutdownReason {
58+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59+
match self {
60+
ShutdownReason::Restart => write!(f, "restarting server"),
61+
ShutdownReason::Shutdown => write!(f, "shutting down"),
62+
}
63+
}
64+
}
65+
5266
#[actix_web::main]
5367
async fn main() {
5468
initialize();
5569

56-
let mut restart_rx = NetworkConfigService::setup_restart_receiver()
57-
.expect("failed to setup restart receiver");
70+
let mut restart_rx =
71+
NetworkConfigService::setup_restart_receiver().expect("failed to setup restart receiver");
5872

5973
let mut sigterm = signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
6074

61-
while run_until_shutdown(&mut restart_rx, &mut sigterm).await {
62-
info!("restarting server...");
63-
}
75+
while let ShutdownReason::Restart = run_until_shutdown(&mut restart_rx, &mut sigterm).await {}
6476
}
6577

6678
fn initialize() {
@@ -96,17 +108,15 @@ fn initialize() {
96108
panic!("failed to find required data directory: /data is missing");
97109
};
98110

99-
if !fs::exists(config_path!()).is_ok_and(|ok| ok) {
100-
fs::create_dir_all(config_path!()).expect("failed to create config directory");
101-
};
111+
fs::create_dir_all(config_path!()).expect("failed to create config directory");
102112

103113
common::create_frontend_config_file().expect("failed to create frontend config file");
104114
}
105115

106116
async fn run_until_shutdown(
107117
restart_rx: &mut broadcast::Receiver<()>,
108118
sigterm: &mut tokio::signal::unix::Signal,
109-
) -> bool {
119+
) -> ShutdownReason {
110120
let mut centrifugo = run_centrifugo();
111121
let service_client = OmnectDeviceServiceClientBuilder::new()
112122
.with_certificate_setup(|payload: CreateCertPayload| async move {
@@ -118,58 +128,54 @@ async fn run_until_shutdown(
118128
.expect("failed to create device service client");
119129
let (server_handle, server_task) = run_server(service_client.clone()).await;
120130

121-
let should_restart = tokio::select! {
131+
let reason = tokio::select! {
122132
_ = tokio::signal::ctrl_c() => {
123133
debug!("ctrl-c received");
124-
false
134+
ShutdownReason::Shutdown
125135
},
126136
_ = sigterm.recv() => {
127137
debug!("SIGTERM received");
128-
false
138+
ShutdownReason::Shutdown
129139
},
130140
_ = restart_rx.recv() => {
131141
debug!("server restart requested");
132-
true
142+
ShutdownReason::Restart
133143
},
134144
result = server_task => {
135145
match result {
136146
Ok(Ok(())) => debug!("server stopped normally"),
137147
Ok(Err(e)) => debug!("server stopped with error: {e}"),
138148
Err(e) => debug!("server task panicked: {e}"),
139149
}
140-
false
150+
ShutdownReason::Shutdown
141151
},
142152
_ = centrifugo.wait() => {
143153
debug!("centrifugo stopped unexpectedly");
144-
false
154+
ShutdownReason::Shutdown
145155
}
146156
};
147157

148-
// Unified cleanup sequence - ensures consistent shutdown regardless of exit reason
149-
if !should_restart {
150-
info!("shutting down...");
151-
}
158+
// Unified cleanup sequence
159+
info!("{reason}...");
152160

153-
// 1. Shutdown service client first (unregister from omnect-device-service)
161+
// 1. Shutdown service client (unregister from omnect-device-service)
154162
if let Err(e) = service_client.shutdown().await {
155163
error!("failed to shutdown service client: {e:#}");
156164
}
157165

158166
// 2. Stop the server gracefully
159167
server_handle.stop(true).await;
160-
if !should_restart {
161-
info!("server stopped");
162-
}
163168

164169
// 3. Kill centrifugo
165170
if let Err(e) = centrifugo.kill().await {
166171
error!("failed to kill centrifugo: {e:#}");
167172
}
168-
if !should_restart {
169-
info!("centrifugo stopped");
173+
174+
if matches!(reason, ShutdownReason::Shutdown) {
175+
info!("shutdown complete");
170176
}
171177

172-
should_restart
178+
reason
173179
}
174180

175181
async fn run_server(

src/network.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,4 +375,4 @@ impl NetworkConfigService {
375375

376376
Ok(())
377377
}
378-
}
378+
}

src/omnect_device_service_client.rs

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -120,59 +120,32 @@ pub struct OmnectDeviceServiceClient {
120120
has_publish_endpoint: bool,
121121
}
122122

123-
// Type aliases for the default no-op certificate setup
124-
type NoOpCertSetup = fn(CreateCertPayload) -> std::future::Ready<Result<()>>;
125-
type NoOpCertFuture = std::future::Ready<Result<()>>;
126-
127-
pub struct OmnectDeviceServiceClientBuilder<F, Fut>
128-
where
129-
F: FnOnce(CreateCertPayload) -> Fut,
130-
Fut: std::future::Future<Output = Result<()>>,
131-
{
132-
publish_endpoint: Option<PublishEndpoint>,
133-
certificate_setup: Option<F>,
134-
_phantom: std::marker::PhantomData<Fut>,
135-
}
123+
type CertSetupFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>>>>;
124+
type CertSetupFn = Box<dyn FnOnce(CreateCertPayload) -> CertSetupFuture>;
136125

137-
impl Default for OmnectDeviceServiceClientBuilder<NoOpCertSetup, NoOpCertFuture> {
138-
fn default() -> Self {
139-
Self {
140-
publish_endpoint: None,
141-
certificate_setup: None,
142-
_phantom: std::marker::PhantomData,
143-
}
144-
}
126+
#[derive(Default)]
127+
pub struct OmnectDeviceServiceClientBuilder {
128+
publish_endpoint: Option<PublishEndpoint>,
129+
certificate_setup: Option<CertSetupFn>,
145130
}
146131

147-
impl OmnectDeviceServiceClientBuilder<NoOpCertSetup, NoOpCertFuture> {
132+
impl OmnectDeviceServiceClientBuilder {
148133
pub fn new() -> Self {
149134
Self::default()
150135
}
151-
}
152136

153-
impl<F, Fut> OmnectDeviceServiceClientBuilder<F, Fut>
154-
where
155-
F: FnOnce(CreateCertPayload) -> Fut,
156-
Fut: std::future::Future<Output = Result<()>>,
157-
{
158137
pub fn with_publish_endpoint(mut self, endpoint: PublishEndpoint) -> Self {
159138
self.publish_endpoint = Some(endpoint);
160139
self
161140
}
162141

163-
pub fn with_certificate_setup<NewF, NewFut>(
164-
self,
165-
setup_fn: NewF,
166-
) -> OmnectDeviceServiceClientBuilder<NewF, NewFut>
142+
pub fn with_certificate_setup<F, Fut>(mut self, setup_fn: F) -> Self
167143
where
168-
NewF: FnOnce(CreateCertPayload) -> NewFut,
169-
NewFut: std::future::Future<Output = Result<()>>,
144+
F: FnOnce(CreateCertPayload) -> Fut + 'static,
145+
Fut: std::future::Future<Output = Result<()>> + 'static,
170146
{
171-
OmnectDeviceServiceClientBuilder {
172-
publish_endpoint: self.publish_endpoint,
173-
certificate_setup: Some(setup_fn),
174-
_phantom: std::marker::PhantomData,
175-
}
147+
self.certificate_setup = Some(Box::new(move |payload| Box::pin(setup_fn(payload))));
148+
self
176149
}
177150

178151
pub async fn build(self) -> Result<OmnectDeviceServiceClient> {

0 commit comments

Comments
 (0)