Skip to content

Commit b3f4090

Browse files
Fix: Sync Files on Sigterm (#969)
Co-authored-by: Nikhil Sinha <[email protected]>
1 parent d2df652 commit b3f4090

File tree

9 files changed

+151
-86
lines changed

9 files changed

+151
-86
lines changed

Cargo.lock

Lines changed: 26 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

helm/templates/ingestor-statefulset.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,4 @@ spec:
103103
requests:
104104
storage: {{ .Values.parseable.persistence.ingestor.size | quote }}
105105
{{- end }}
106-
{{- end }}
106+
{{- end }}

server/src/handlers/http/health_check.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ use lazy_static::lazy_static;
2727
use std::sync::Arc;
2828
use tokio::signal::unix::{signal, SignalKind};
2929
use tokio::sync::{oneshot, Mutex};
30-
use tokio::time::{sleep, Duration};
3130

3231
// Create a global variable to store signal status
3332
lazy_static! {
34-
static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
33+
pub static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
3534
}
3635

3736
pub async fn liveness() -> HttpResponse {
@@ -66,23 +65,13 @@ pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()
6665
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6766
*shutdown_flag = true;
6867

69-
// Trigger graceful shutdown
70-
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
71-
let _ = shutdown_sender.send(());
72-
}
73-
74-
// Delay to allow readiness probe to return SERVICE_UNAVAILABLE
75-
let _ = sleep(Duration::from_secs(20)).await;
76-
7768
// Sync to local
7869
crate::event::STREAM_WRITERS.unset_all();
7970

80-
// Sync to S3
81-
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
82-
log::warn!("Failed to sync local data with object store. {:?}", e);
71+
// Trigger graceful shutdown
72+
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
73+
let _ = shutdown_sender.send(());
8374
}
84-
85-
log::info!("Local and S3 Sync done, handler SIGTERM completed.");
8675
}
8776
None => {
8877
log::info!("Signal handler received None, indicating an error or end of stream");

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ impl ParseableServer for IngestServer {
112112
let shutdown_signal = server_shutdown_signal.clone();
113113

114114
// Spawn the signal handler task
115-
tokio::spawn(async move {
115+
let signal_task = tokio::spawn(async move {
116116
health_check::handle_signals(shutdown_signal).await;
117-
println!("Received shutdown signal, notifying server to shut down...");
117+
log::info!("Received shutdown signal, notifying server to shut down...");
118118
});
119119

120120
// Create the HTTP server
@@ -134,17 +134,40 @@ impl ParseableServer for IngestServer {
134134
// Graceful shutdown handling
135135
let srv_handle = srv.handle();
136136

137-
tokio::spawn(async move {
137+
let sync_task = tokio::spawn(async move {
138138
// Wait for the shutdown signal
139-
shutdown_rx.await.ok();
139+
let _ = shutdown_rx.await;
140+
141+
// Perform S3 sync and wait for completion
142+
log::info!("Starting data sync to S3...");
143+
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
144+
log::warn!("Failed to sync local data with object store. {:?}", e);
145+
} else {
146+
log::info!("Successfully synced all data to S3.");
147+
}
140148

141149
// Initiate graceful shutdown
142150
log::info!("Graceful shutdown of HTTP server triggered");
143151
srv_handle.stop(true).await;
144152
});
145153

146-
// Await the server to run and handle shutdown
147-
srv.await?;
154+
// Await the HTTP server to run
155+
let server_result = srv.await;
156+
157+
// Await the signal handler to ensure proper cleanup
158+
if let Err(e) = signal_task.await {
159+
log::error!("Error in signal handler: {:?}", e);
160+
}
161+
162+
// Wait for the sync task to complete before exiting
163+
if let Err(e) = sync_task.await {
164+
log::error!("Error in sync task: {:?}", e);
165+
} else {
166+
log::info!("Sync task completed successfully.");
167+
}
168+
169+
// Return the result of the server
170+
server_result?;
148171

149172
Ok(())
150173
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ impl ParseableServer for QueryServer {
8989
let shutdown_signal = server_shutdown_signal.clone();
9090

9191
// Spawn the signal handler task
92-
tokio::spawn(async move {
92+
let signal_task = tokio::spawn(async move {
9393
health_check::handle_signals(shutdown_signal).await;
94+
log::info!("Received shutdown signal, notifying server to shut down...");
9495
});
9596

9697
// Create the HTTP server
@@ -110,17 +111,40 @@ impl ParseableServer for QueryServer {
110111
// Graceful shutdown handling
111112
let srv_handle = srv.handle();
112113

113-
tokio::spawn(async move {
114+
let sync_task = tokio::spawn(async move {
114115
// Wait for the shutdown signal
115-
shutdown_rx.await.ok();
116+
let _ = shutdown_rx.await;
117+
118+
// Perform S3 sync and wait for completion
119+
log::info!("Starting data sync to S3...");
120+
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
121+
log::warn!("Failed to sync local data with object store. {:?}", e);
122+
} else {
123+
log::info!("Successfully synced all data to S3.");
124+
}
116125

117126
// Initiate graceful shutdown
118127
log::info!("Graceful shutdown of HTTP server triggered");
119128
srv_handle.stop(true).await;
120129
});
121130

122-
// Await the server to run and handle shutdown
123-
srv.await?;
131+
// Await the HTTP server to run
132+
let server_result = srv.await;
133+
134+
// Await the signal handler to ensure proper cleanup
135+
if let Err(e) = signal_task.await {
136+
log::error!("Error in signal handler: {:?}", e);
137+
}
138+
139+
// Wait for the sync task to complete before exiting
140+
if let Err(e) = sync_task.await {
141+
log::error!("Error in sync task: {:?}", e);
142+
} else {
143+
log::info!("Sync task completed successfully.");
144+
}
145+
146+
// Return the result of the server
147+
server_result?;
124148

125149
Ok(())
126150
}

server/src/handlers/http/modal/server.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ use super::generate;
6565
use super::ssl_acceptor::get_ssl_acceptor;
6666
use super::OpenIdClient;
6767
use super::ParseableServer;
68-
6968
#[derive(Default)]
7069
pub struct Server;
7170

@@ -110,9 +109,9 @@ impl ParseableServer for Server {
110109
let shutdown_signal = server_shutdown_signal.clone();
111110

112111
// Spawn the signal handler task
113-
tokio::spawn(async move {
112+
let signal_task = tokio::spawn(async move {
114113
health_check::handle_signals(shutdown_signal).await;
115-
println!("Received shutdown signal, notifying server to shut down...");
114+
log::info!("Received shutdown signal, notifying server to shut down...");
116115
});
117116

118117
// Create the HTTP server
@@ -132,17 +131,40 @@ impl ParseableServer for Server {
132131
// Graceful shutdown handling
133132
let srv_handle = srv.handle();
134133

135-
tokio::spawn(async move {
134+
let sync_task = tokio::spawn(async move {
136135
// Wait for the shutdown signal
137-
shutdown_rx.await.ok();
136+
let _ = shutdown_rx.await;
137+
138+
// Perform S3 sync and wait for completion
139+
log::info!("Starting data sync to S3...");
140+
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
141+
log::warn!("Failed to sync local data with object store. {:?}", e);
142+
} else {
143+
log::info!("Successfully synced all data to S3.");
144+
}
138145

139146
// Initiate graceful shutdown
140147
log::info!("Graceful shutdown of HTTP server triggered");
141148
srv_handle.stop(true).await;
142149
});
143150

144-
// Await the server to run and handle shutdown
145-
srv.await?;
151+
// Await the HTTP server to run
152+
let server_result = srv.await;
153+
154+
// Await the signal handler to ensure proper cleanup
155+
if let Err(e) = signal_task.await {
156+
log::error!("Error in signal handler: {:?}", e);
157+
}
158+
159+
// Wait for the sync task to complete before exiting
160+
if let Err(e) = sync_task.await {
161+
log::error!("Error in sync task: {:?}", e);
162+
} else {
163+
log::info!("Sync task completed successfully.");
164+
}
165+
166+
// Return the result of the server
167+
server_result?;
146168

147169
Ok(())
148170
}

0 commit comments

Comments
 (0)