Skip to content

Commit b8f7098

Browse files
committed
add customization point for ctx
1 parent 9e11862 commit b8f7098

File tree

3 files changed

+22
-4
lines changed

3 files changed

+22
-4
lines changed

src/bin/distributed-datafusion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async fn main() -> Result<()> {
3333

3434
match args.mode.as_str() {
3535
"proxy" => {
36-
let service = DDProxyService::new(new_friendly_name()?, args.port).await?;
36+
let service = DDProxyService::new(new_friendly_name()?, args.port, None).await?;
3737
service.serve().await?;
3838
}
3939
"worker" => {

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub use proto::generated::protobuf;
2424

2525
pub mod analyze;
2626
pub mod codec;
27+
pub mod ctx_customizer;
2728
pub mod explain;
2829
pub mod flight;
2930
pub mod friendly;

src/proxy_service.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use tokio::{
4040
use tonic::{async_trait, transport::Server, Request, Response, Status};
4141

4242
use crate::{
43+
ctx_customizer::CtxCustomizer,
4344
flight::{FlightSqlHandler, FlightSqlServ},
4445
logging::{debug, info, trace},
4546
planning::{add_ctx_extentions, get_ctx},
@@ -57,10 +58,16 @@ pub struct DDProxyHandler {
5758
pub host: Host,
5859

5960
pub planner: QueryPlanner,
61+
62+
pub ctx_customizer: Option<Arc<dyn CtxCustomizer + Send + Sync>>,
6063
}
6164

6265
impl DDProxyHandler {
63-
pub fn new(name: String, addr: String) -> Self {
66+
pub fn new(
67+
name: String,
68+
addr: String,
69+
ctx_customizer: Option<Arc<dyn CtxCustomizer + Send + Sync>>,
70+
) -> Self {
6471
// call this function to bootstrap the worker discovery mechanism
6572
get_worker_addresses().expect("Could not get worker addresses upon startup");
6673

@@ -71,6 +78,7 @@ impl DDProxyHandler {
7178
Self {
7279
host: host.clone(),
7380
planner: QueryPlanner::new(),
81+
ctx_customizer,
7482
}
7583
}
7684

@@ -118,6 +126,11 @@ impl DDProxyHandler {
118126
add_ctx_extentions(&mut ctx, &self.host, &query_id, stage_id, addrs, vec![])
119127
.map_err(|e| Status::internal(format!("Could not add context extensions {e:?}")))?;
120128

129+
if let Some(ref c) = self.ctx_customizer {
130+
c.customize(&mut ctx)
131+
.map_err(|e| Status::internal(format!("Could not customize context {e:?}")))?;
132+
}
133+
121134
// TODO: revisit this to allow for consuming a partitular partition
122135
trace!("calling execute plan");
123136
let partition = 0;
@@ -278,7 +291,11 @@ pub struct DDProxyService {
278291
}
279292

280293
impl DDProxyService {
281-
pub async fn new(name: String, port: usize) -> Result<Self> {
294+
pub async fn new(
295+
name: String,
296+
port: usize,
297+
ctx_customizer: Option<Arc<dyn CtxCustomizer + Send + Sync>>,
298+
) -> Result<Self> {
282299
debug!("Creating DDProxyService!");
283300

284301
let (all_done_tx, all_done_rx) = channel(1);
@@ -290,7 +307,7 @@ impl DDProxyService {
290307

291308
info!("DDProxyService bound to {addr}");
292309

293-
let handler = Arc::new(DDProxyHandler::new(name, addr.clone()));
310+
let handler = Arc::new(DDProxyHandler::new(name, addr.clone(), ctx_customizer));
294311

295312
Ok(Self {
296313
listener,

0 commit comments

Comments
 (0)