Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 41 additions & 62 deletions datafusion-examples/examples/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,78 +108,57 @@ use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_distr::{Distribution, Poisson};

/// This example demonstrates the table sample support.

#[derive(Debug, Clone)]
struct TableSamplePlanNode {
inner_plan: LogicalPlan,

lower_bound: f64,
upper_bound: f64,
with_replacement: bool,
seed: u64,
/// This example demonstrates the table sample support.

/// Hashable and comparible f64 for sampling bounds
#[derive(Debug, Clone, Copy, PartialOrd)]
struct Bound(f64);
impl PartialEq for Bound {
fn eq(&self, other: &Self) -> bool {
(self.0 - other.0).abs() < f64::EPSILON
}
}

impl Hash for TableSamplePlanNode {
impl Eq for Bound {}

impl Hash for Bound {
fn hash<H: Hasher>(&self, state: &mut H) {
self.inner_plan.hash(state);
self.lower_bound.to_bits().hash(state);
self.upper_bound.to_bits().hash(state);
self.with_replacement.hash(state);
self.seed.hash(state);
// Hash the bits of the f64
self.0.to_bits().hash(state);
}
}

impl PartialEq for TableSamplePlanNode {
fn eq(&self, other: &Self) -> bool {
self.inner_plan == other.inner_plan
&& (self.lower_bound - other.lower_bound).abs() < f64::EPSILON
&& (self.upper_bound - other.upper_bound).abs() < f64::EPSILON
&& self.with_replacement == other.with_replacement
&& self.seed == other.seed
impl From<f64> for Bound {
fn from(value: f64) -> Self {
Self(value)
}
}
impl From<Bound> for f64 {
fn from(value: Bound) -> Self {
value.0
}
}

impl Eq for TableSamplePlanNode {}

impl PartialOrd for TableSamplePlanNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.inner_plan
.partial_cmp(&other.inner_plan)
.and_then(|ord| {
if ord != Ordering::Equal {
Some(ord)
} else {
self.lower_bound
.partial_cmp(&other.lower_bound)
.and_then(|ord| {
if ord != Ordering::Equal {
Some(ord)
} else {
self.upper_bound.partial_cmp(&other.upper_bound).and_then(
|ord| {
if ord != Ordering::Equal {
Some(ord)
} else {
self.with_replacement
.partial_cmp(&other.with_replacement)
.and_then(|ord| {
if ord != Ordering::Equal {
Some(ord)
} else {
self.seed.partial_cmp(&other.seed)
}
})
}
},
)
}
})
}
})
impl AsRef<f64> for Bound {
fn as_ref(&self) -> &f64 {
&self.0
}
}


#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd)]
struct TableSamplePlanNode {
inner_plan: LogicalPlan,

lower_bound: Bound,
upper_bound: Bound,
with_replacement: bool,
seed: u64,
}


impl UserDefinedLogicalNodeCore for TableSamplePlanNode {
fn name(&self) -> &str {
"TableSample"
Expand Down Expand Up @@ -234,8 +213,8 @@ impl TableSampleExtensionPlanner {
Ok(Arc::new(SampleExec {
input: physical_input.clone(),
lower_bound: 0.0,
upper_bound: specific_node.upper_bound,
with_replacement: specific_node.with_replacement,
upper_bound: specific_node.upper_bound.into(),
with_replacement: specific_node.with_replacement.into(),
seed: specific_node.seed,
metrics: Default::default(),
cache: SampleExec::compute_properties(&physical_input),
Expand Down Expand Up @@ -663,8 +642,8 @@ impl<'a, S: ContextProvider> TableSamplePlanner<'a, S> {
) -> Result<LogicalPlan> {
let node = TableSamplePlanNode {
inner_plan: input,
lower_bound: 0.0,
upper_bound: fraction,
lower_bound: Bound::from(0.0),
upper_bound: Bound::from(fraction),
with_replacement: with_replacement.unwrap_or(false),
seed: seed.unwrap_or_else(rand::random),
};
Expand Down