Skip to content

Commit 71b9770

Browse files
authored
Change tpch validation to use exec_sql_on_tables (#66)
* Change tpch validation to use `exec_sql_on_tables` Fixes #65 `exec_sql_on_tables` is a util function added by this PR that uses DataFution without Ray to execute queries. This ensures the validation is using the same Rust version as the DataFusion-Ray avoiding validation failure caused by inconsistency between different versions of DataFusion and DataFusion-Python. With this change, all TPCH validations are passing regardless of versions. Also, expose the `schema` of `RayDataFrame` to facilitate debugging. Thank @robtandy for the idea; see #65 (comment) * Use only `register_listing_table` to register tables Also added a unit test to show that is working for both file and directory.
1 parent 116734d commit 71b9770

File tree

8 files changed

+183
-38
lines changed

8 files changed

+183
-38
lines changed

Cargo.lock

Lines changed: 57 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ tonic-build = { version = "0.8", default-features = false, features = [
8686
] }
8787
url = "2"
8888

89+
[dev-dependencies]
90+
tempfile = "3.17"
91+
8992
[lib]
9093
name = "datafusion_ray"
9194
crate-type = ["cdylib", "rlib"]

datafusion_ray/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@
2020
except ImportError:
2121
import importlib_metadata
2222

23-
from .core import RayContext, prettify, runtime_env, RayStagePool
23+
from .core import RayContext, exec_sql_on_tables, prettify, runtime_env, RayStagePool
2424

2525
__version__ = importlib_metadata.version(__name__)

datafusion_ray/core.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from datafusion_ray._datafusion_ray_internal import (
3232
RayContext as RayContextInternal,
3333
RayDataFrame as RayDataFrameInternal,
34+
exec_sql_on_tables,
3435
prettify,
3536
)
3637

@@ -465,6 +466,9 @@ def stages(self):
465466

466467
return self._stages
467468

469+
def schema(self):
470+
return self.df.schema()
471+
468472
def execution_plan(self):
469473
return self.df.execution_plan()
470474

@@ -479,7 +483,7 @@ def collect(self) -> list[pa.RecordBatch]:
479483
t1 = time.time()
480484
self.stages()
481485
t2 = time.time()
482-
log.debug(f"creating stages took {t2 -t1}s")
486+
log.debug(f"creating stages took {t2 - t1}s")
483487

484488
last_stage_id = max([stage.stage_id for stage in self._stages])
485489
log.debug(f"last stage is {last_stage_id}")
@@ -553,7 +557,9 @@ def __init__(
553557
s = time.time()
554558
call_sync(wait_for([start_ref], "RayContextSupervisor start"))
555559
e = time.time()
556-
log.info(f"RayContext::__init__ waiting for supervisor to be ready took {e-s}s")
560+
log.info(
561+
f"RayContext::__init__ waiting for supervisor to be ready took {e - s}s"
562+
)
557563

558564
def register_parquet(self, name: str, path: str):
559565
self.ctx.register_parquet(name, path)

src/dataframe.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,10 @@ impl RayDataFrame {
266266
Ok(PyLogicalPlan::new(self.df.logical_plan().clone()))
267267
}
268268

269+
fn schema(&self, py: Python) -> PyResult<PyObject> {
270+
self.df.schema().as_arrow().to_pyarrow(py)
271+
}
272+
269273
fn optimized_logical_plan(&self) -> PyResult<PyLogicalPlan> {
270274
Ok(PyLogicalPlan::new(self.df.clone().into_optimized_plan()?))
271275
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ fn _datafusion_ray_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
4444
m.add_class::<dataframe::PyDataFrameStage>()?;
4545
m.add_class::<stage_service::StageService>()?;
4646
m.add_function(wrap_pyfunction!(util::prettify, m)?)?;
47+
m.add_function(wrap_pyfunction!(util::exec_sql_on_tables, m)?)?;
4748
Ok(())
4849
}
4950

0 commit comments

Comments
 (0)