Skip to content

Commit fb3401d

Browse files
drcaramelsyrupduke8253
authored andcommitted
Add shutdown flag to proxy session
Add a Session::is_process_shutting_down API for custom logic to be actioned in filters when shutdown has started.
1 parent 75a8775 commit fb3401d

File tree

2 files changed

+42
-10
lines changed

2 files changed

+42
-10
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
93b912130db516aecdfb9b87b48fa7797f9d1a49
1+
03479bcb7e6312754af45a43f6cdd173538a3cc2

pingora-proxy/src/lib.rs

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ use once_cell::sync::Lazy;
4545
use pingora_http::{RequestHeader, ResponseHeader};
4646
use std::fmt::Debug;
4747
use std::str;
48-
use std::sync::Arc;
48+
use std::sync::{
49+
atomic::{AtomicBool, Ordering},
50+
Arc,
51+
};
4952
use std::time::Duration;
5053
use tokio::sync::{mpsc, Notify};
5154
use tokio::time;
@@ -111,6 +114,7 @@ where
111114
inner: SV, // TODO: name it better than inner
112115
client_upstream: Connector<C>,
113116
shutdown: Notify,
117+
shutdown_flag: Arc<AtomicBool>,
114118
pub server_options: Option<HttpServerOptions>,
115119
pub h2_options: Option<H2Options>,
116120
pub downstream_modules: HttpModules,
@@ -124,6 +128,7 @@ impl<SV> HttpProxy<SV, ()> {
124128
inner,
125129
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
126130
shutdown: Notify::new(),
131+
shutdown_flag: Arc::new(AtomicBool::new(false)),
127132
server_options: None,
128133
h2_options: None,
129134
downstream_modules: HttpModules::new(),
@@ -154,6 +159,7 @@ where
154159
inner,
155160
client_upstream,
156161
shutdown: Notify::new(),
162+
shutdown_flag: Arc::new(AtomicBool::new(false)),
157163
server_options: None,
158164
downstream_modules: HttpModules::new(),
159165
max_retries: conf.max_retries,
@@ -401,12 +407,15 @@ pub struct Session {
401407
/// Upstream response body bytes received (payload only). Set by proxy layer.
402408
/// TODO: move this into an upstream session digest for future fields.
403409
upstream_body_bytes_received: usize,
410+
/// Flag that is set when the shutdown process has begun.
411+
shutdown_flag: Arc<AtomicBool>,
404412
}
405413

406414
impl Session {
407415
fn new(
408416
downstream_session: impl Into<Box<HttpSession>>,
409417
downstream_modules: &HttpModules,
418+
shutdown_flag: Arc<AtomicBool>,
410419
) -> Self {
411420
Session {
412421
downstream_session: downstream_session.into(),
@@ -419,22 +428,33 @@ impl Session {
419428
subrequest_spawner: None, // optionally set later on
420429
downstream_modules_ctx: downstream_modules.build_ctx(),
421430
upstream_body_bytes_received: 0,
431+
shutdown_flag,
422432
}
423433
}
424434

425435
/// Create a new [Session] from the given [Stream]
426436
///
427-
/// This function is mostly used for testing and mocking.
437+
/// This function is mostly used for testing and mocking, given the downstream modules and
438+
/// shutdown flags will never be set.
428439
pub fn new_h1(stream: Stream) -> Self {
429440
let modules = HttpModules::new();
430-
Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
441+
Self::new(
442+
Box::new(HttpSession::new_http1(stream)),
443+
&modules,
444+
Arc::new(AtomicBool::new(false)),
445+
)
431446
}
432447

433448
/// Create a new [Session] from the given [Stream] with modules
434449
///
435-
/// This function is mostly used for testing and mocking.
450+
/// This function is mostly used for testing and mocking, given the shutdown flag will never be
451+
/// set.
436452
pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
437-
Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
453+
Self::new(
454+
Box::new(HttpSession::new_http1(stream)),
455+
downstream_modules,
456+
Arc::new(AtomicBool::new(false)),
457+
)
438458
}
439459

440460
pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
@@ -569,6 +589,11 @@ impl Session {
569589
self.upstream_body_bytes_received = n;
570590
}
571591

592+
/// Is the proxy process in the process of shutting down (e.g. due to graceful upgrade)?
593+
pub fn is_process_shutting_down(&self) -> bool {
594+
self.shutdown_flag.load(Ordering::Acquire)
595+
}
596+
572597
pub fn downstream_custom_message(
573598
&mut self,
574599
) -> Result<
@@ -910,7 +935,11 @@ where
910935
debug!("starting subrequest");
911936

912937
let mut session = match self.handle_new_request(session).await {
913-
Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
938+
Some(downstream_session) => Session::new(
939+
downstream_session,
940+
&self.downstream_modules,
941+
self.shutdown_flag.clone(),
942+
),
914943
None => return, // bad request
915944
};
916945

@@ -978,7 +1007,11 @@ where
9781007

9791008
// TODO: keepalive pool, use stack
9801009
let mut session = match self.handle_new_request(session).await {
981-
Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
1010+
Some(downstream_session) => Session::new(
1011+
downstream_session,
1012+
&self.downstream_modules,
1013+
self.shutdown_flag.clone(),
1014+
),
9821015
None => return None, // bad request
9831016
};
9841017

@@ -992,10 +1025,9 @@ where
9921025
}
9931026

9941027
async fn http_cleanup(&self) {
1028+
self.shutdown_flag.store(true, Ordering::Release);
9951029
// Notify all keepalived requests blocking on read_request() to abort
9961030
self.shutdown.notify_waiters();
997-
998-
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
9991031
}
10001032

10011033
fn server_options(&self) -> Option<&HttpServerOptions> {

0 commit comments

Comments
 (0)