Skip to content

Commit e92fbf6

Browse files
committed
Rollback DistributedExt de-implementation from SessionConfig, SessionState and SessionContext
1 parent 48cb8af commit e92fbf6

File tree

1 file changed

+149
-16
lines changed

1 file changed

+149
-16
lines changed

src/distributed_ext.rs

Lines changed: 149 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc
77
use crate::{ChannelResolver, DistributedConfig, TaskEstimator};
88
use datafusion::common::DataFusionError;
99
use datafusion::config::ConfigExtension;
10-
use datafusion::execution::SessionStateBuilder;
10+
use datafusion::execution::{SessionState, SessionStateBuilder};
11+
use datafusion::prelude::{SessionConfig, SessionContext};
1112
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1213
use delegate::delegate;
1314
use http::HeaderMap;
@@ -359,53 +360,48 @@ pub trait DistributedExt: Sized {
359360
) -> Result<(), DataFusionError>;
360361
}
361362

362-
impl DistributedExt for SessionStateBuilder {
363+
impl DistributedExt for SessionConfig {
363364
fn set_distributed_option_extension<T: ConfigExtension + Default>(
364365
&mut self,
365366
t: T,
366367
) -> Result<(), DataFusionError> {
367-
set_distributed_option_extension(self.config().get_or_insert_default(), t)
368+
set_distributed_option_extension(self, t)
368369
}
369370

370371
fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
371372
&mut self,
372373
headers: &HeaderMap,
373374
) -> Result<(), DataFusionError> {
374-
set_distributed_option_extension_from_headers::<T>(
375-
self.config().get_or_insert_default(),
376-
headers,
377-
)
375+
set_distributed_option_extension_from_headers::<T>(self, headers)
378376
}
379377

380378
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
381-
set_distributed_user_codec(self.config().get_or_insert_default(), codec)
379+
set_distributed_user_codec(self, codec)
382380
}
383381

384382
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>) {
385-
set_distributed_user_codec_arc(self.config().get_or_insert_default(), codec)
383+
set_distributed_user_codec_arc(self, codec)
386384
}
387385

388386
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
389387
&mut self,
390388
resolver: T,
391389
) {
392-
let cfg = self.config().get_or_insert_default();
393-
set_distributed_channel_resolver(cfg, resolver);
390+
set_distributed_channel_resolver(self, resolver);
394391
}
395392

396393
fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
397394
&mut self,
398395
estimator: T,
399396
) {
400-
set_distributed_task_estimator(self.config().get_or_insert_default(), estimator)
397+
set_distributed_task_estimator(self, estimator)
401398
}
402399

403400
fn set_distributed_files_per_task(
404401
&mut self,
405402
files_per_task: usize,
406403
) -> Result<(), DataFusionError> {
407-
let cfg = self.config().get_or_insert_default();
408-
let d_cfg = DistributedConfig::from_config_options_mut(cfg.options_mut())?;
404+
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
409405
d_cfg.files_per_task = files_per_task;
410406
Ok(())
411407
}
@@ -414,8 +410,7 @@ impl DistributedExt for SessionStateBuilder {
414410
&mut self,
415411
factor: f64,
416412
) -> Result<(), DataFusionError> {
417-
let cfg = self.config().get_or_insert_default();
418-
let d_cfg = DistributedConfig::from_config_options_mut(cfg.options_mut())?;
413+
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
419414
d_cfg.cardinality_task_count_factor = factor;
420415
Ok(())
421416
}
@@ -456,3 +451,141 @@ impl DistributedExt for SessionStateBuilder {
456451
}
457452
}
458453
}
454+
455+
impl DistributedExt for SessionStateBuilder {
456+
delegate! {
457+
to self.config().get_or_insert_default() {
458+
fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) -> Result<(), DataFusionError>;
459+
#[call(set_distributed_option_extension)]
460+
#[expr($?;Ok(self))]
461+
fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Result<Self, DataFusionError>;
462+
463+
fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
464+
#[call(set_distributed_option_extension_from_headers)]
465+
#[expr($?;Ok(self))]
466+
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
467+
468+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
469+
#[call(set_distributed_user_codec)]
470+
#[expr($;self)]
471+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
472+
473+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
474+
#[call(set_distributed_user_codec_arc)]
475+
#[expr($;self)]
476+
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
477+
478+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
479+
#[call(set_distributed_channel_resolver)]
480+
#[expr($;self)]
481+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
482+
483+
fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
484+
#[call(set_distributed_task_estimator)]
485+
#[expr($;self)]
486+
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
487+
488+
fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
489+
#[call(set_distributed_files_per_task)]
490+
#[expr($?;Ok(self))]
491+
fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
492+
493+
fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
494+
#[call(set_distributed_cardinality_effect_task_scale_factor)]
495+
#[expr($?;Ok(self))]
496+
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
497+
}
498+
}
499+
}
500+
501+
impl DistributedExt for SessionState {
502+
delegate! {
503+
to self.config_mut() {
504+
fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) -> Result<(), DataFusionError>;
505+
#[call(set_distributed_option_extension)]
506+
#[expr($?;Ok(self))]
507+
fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Result<Self, DataFusionError>;
508+
509+
fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
510+
#[call(set_distributed_option_extension_from_headers)]
511+
#[expr($?;Ok(self))]
512+
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
513+
514+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
515+
#[call(set_distributed_user_codec)]
516+
#[expr($;self)]
517+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
518+
519+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
520+
#[call(set_distributed_user_codec_arc)]
521+
#[expr($;self)]
522+
fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
523+
524+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
525+
#[call(set_distributed_channel_resolver)]
526+
#[expr($;self)]
527+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
528+
529+
fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
530+
#[call(set_distributed_task_estimator)]
531+
#[expr($;self)]
532+
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
533+
534+
fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
535+
#[call(set_distributed_files_per_task)]
536+
#[expr($?;Ok(self))]
537+
fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
538+
539+
fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
540+
#[call(set_distributed_cardinality_effect_task_scale_factor)]
541+
#[expr($?;Ok(self))]
542+
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
543+
}
544+
}
545+
}
546+
547+
impl DistributedExt for SessionContext {
548+
delegate! {
549+
to self.state_ref().write().config_mut() {
550+
fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) -> Result<(), DataFusionError>;
551+
#[call(set_distributed_option_extension)]
552+
#[expr($?;Ok(self))]
553+
fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Result<Self, DataFusionError>;
554+
555+
fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
556+
#[call(set_distributed_option_extension_from_headers)]
557+
#[expr($?;Ok(self))]
558+
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
559+
560+
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
561+
#[call(set_distributed_user_codec)]
562+
#[expr($;self)]
563+
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
564+
565+
fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
566+
#[call(set_distributed_user_codec_arc)]
567+
#[expr($;self)]
568+
fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
569+
570+
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
571+
#[call(set_distributed_channel_resolver)]
572+
#[expr($;self)]
573+
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
574+
575+
fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
576+
#[call(set_distributed_task_estimator)]
577+
#[expr($;self)]
578+
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(self, estimator: T) -> Self;
579+
580+
fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
581+
#[call(set_distributed_files_per_task)]
582+
#[expr($?;Ok(self))]
583+
fn with_distributed_files_per_task(self, files_per_task: usize) -> Result<Self, DataFusionError>;
584+
585+
fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
586+
#[call(set_distributed_cardinality_effect_task_scale_factor)]
587+
#[expr($?;Ok(self))]
588+
fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;
589+
}
590+
}
591+
}

0 commit comments

Comments
 (0)