Skip to content

Commit 84415dd

Browse files
henrifroeseHenri Froese
andauthored
Expose register_listing_table (apache#618)
* Improve documentation It's currently hard to see how datafusion can be configured in python. Adding a small section on configuration, and linking to examples, should help with that. * Fix typo * Fix clippy warning * Support integer table partition columns (tested in next commit) * Expose `register_listing_table` This lets users nicely use `object_store` with python datafusion for partitioned dataset e.g. in S3. Closes apache#617 --------- Co-authored-by: Henri Froese <[email protected]>
1 parent 7204a35 commit 84415dd

File tree

5 files changed

+165
-7
lines changed

5 files changed

+165
-7
lines changed

datafusion/tests/test_sql.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import pyarrow as pa
2222
import pyarrow.dataset as ds
2323
import pytest
24+
from datafusion.object_store import LocalFileSystem
2425

25-
from datafusion import udf
26+
from datafusion import udf, col
2627

2728
from . import generic as helpers
2829

@@ -374,3 +375,58 @@ def test_simple_select(ctx, tmp_path, arr):
374375
result = batches[0].column(0)
375376

376377
np.testing.assert_equal(result, arr)
378+
379+
380+
@pytest.mark.parametrize("file_sort_order", (None, [[col("int").sort(True, True)]]))
381+
@pytest.mark.parametrize("pass_schema", (True, False))
382+
def test_register_listing_table(ctx, tmp_path, pass_schema, file_sort_order):
383+
dir_root = tmp_path / "dataset_parquet_partitioned"
384+
dir_root.mkdir(exist_ok=False)
385+
(dir_root / "grp=a/date_id=20201005").mkdir(exist_ok=False, parents=True)
386+
(dir_root / "grp=a/date_id=20211005").mkdir(exist_ok=False, parents=True)
387+
(dir_root / "grp=b/date_id=20201005").mkdir(exist_ok=False, parents=True)
388+
389+
table = pa.Table.from_arrays(
390+
[
391+
[1, 2, 3, 4, 5, 6, 7],
392+
["a", "b", "c", "d", "e", "f", "g"],
393+
[1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7],
394+
],
395+
names=["int", "str", "float"],
396+
)
397+
pa.parquet.write_table(
398+
table.slice(0, 3), dir_root / "grp=a/date_id=20201005/file.parquet"
399+
)
400+
pa.parquet.write_table(
401+
table.slice(3, 2), dir_root / "grp=a/date_id=20211005/file.parquet"
402+
)
403+
pa.parquet.write_table(
404+
table.slice(5, 10), dir_root / "grp=b/date_id=20201005/file.parquet"
405+
)
406+
407+
ctx.register_object_store("file://local", LocalFileSystem(), None)
408+
ctx.register_listing_table(
409+
"my_table",
410+
f"file://{dir_root}/",
411+
table_partition_cols=[("grp", "string"), ("date_id", "int")],
412+
file_extension=".parquet",
413+
schema=table.schema if pass_schema else None,
414+
file_sort_order=file_sort_order,
415+
)
416+
assert ctx.tables() == {"my_table"}
417+
418+
result = ctx.sql(
419+
"SELECT grp, COUNT(*) AS count FROM my_table GROUP BY grp"
420+
).collect()
421+
result = pa.Table.from_batches(result)
422+
423+
rd = result.to_pydict()
424+
assert dict(zip(rd["grp"], rd["count"])) == {"a": 5, "b": 2}
425+
426+
result = ctx.sql(
427+
"SELECT grp, COUNT(*) AS count FROM my_table WHERE date_id=20201005 GROUP BY grp"
428+
).collect()
429+
result = pa.Table.from_batches(result)
430+
431+
rd = result.to_pydict()
432+
assert dict(zip(rd["grp"], rd["count"])) == {"a": 3, "b": 2}

docs/source/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ Example
7575
Github and Issue Tracker <https://github.com/apache/arrow-datafusion-python>
7676
Rust's API Docs <https://docs.rs/datafusion/latest/datafusion/>
7777
Code of conduct <https://github.com/apache/arrow-datafusion/blob/main/CODE_OF_CONDUCT.md>
78+
Examples <https://github.com/apache/arrow-datafusion-python/tree/main/examples>
7879

7980
.. _toc.guide:
8081
.. toctree::
@@ -84,6 +85,7 @@ Example
8485

8586
user-guide/introduction
8687
user-guide/basics
88+
user-guide/configuration
8789
user-guide/common-operations/index
8890
user-guide/io/index
8991
user-guide/sql
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
.. or more contributor license agreements. See the NOTICE file
3+
.. distributed with this work for additional information
4+
.. regarding copyright ownership. The ASF licenses this file
5+
.. to you under the Apache License, Version 2.0 (the
6+
.. "License"); you may not use this file except in compliance
7+
.. with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
.. software distributed under the License is distributed on an
13+
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
.. KIND, either express or implied. See the License for the
15+
.. specific language governing permissions and limitations
16+
.. under the License.
17+
18+
Configuration
19+
========
20+
21+
Let's look at how we can configure DataFusion. When creating a :code:`SessionContext`, you can pass in
22+
a :code:`SessionConfig` and :code:`RuntimeConfig` object. These two cover a wide range of options.
23+
24+
.. code-block:: python
25+
26+
from datafusion import RuntimeConfig, SessionConfig, SessionContext
27+
28+
# create a session context with default settings
29+
ctx = SessionContext()
30+
print(ctx)
31+
32+
# create a session context with explicit runtime and config settings
33+
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
34+
config = (
35+
SessionConfig()
36+
.with_create_default_catalog_and_schema(True)
37+
.with_default_catalog_and_schema("foo", "bar")
38+
.with_target_partitions(8)
39+
.with_information_schema(True)
40+
.with_repartition_joins(False)
41+
.with_repartition_aggregations(False)
42+
.with_repartition_windows(False)
43+
.with_parquet_pruning(False)
44+
.set("datafusion.execution.parquet.pushdown_filters", "true")
45+
)
46+
ctx = SessionContext(config, runtime)
47+
print(ctx)
48+
49+
50+
You can read more about available :code:`SessionConfig` options `here <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
51+
and about :code:`RuntimeConfig` options `here https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeConfig.html`_.

src/context.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
4343
use datafusion::arrow::pyarrow::PyArrowType;
4444
use datafusion::arrow::record_batch::RecordBatch;
4545
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
46+
use datafusion::datasource::file_format::parquet::ParquetFormat;
47+
use datafusion::datasource::listing::{
48+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
49+
};
4650
use datafusion::datasource::MemTable;
4751
use datafusion::datasource::TableProvider;
4852
use datafusion::execution::context::{
@@ -283,7 +287,7 @@ impl PySessionContext {
283287
})
284288
}
285289

286-
/// Register a an object store with the given name
290+
/// Register an object store with the given name
287291
pub fn register_object_store(
288292
&mut self,
289293
scheme: &str,
@@ -317,6 +321,53 @@ impl PySessionContext {
317321
Ok(())
318322
}
319323

324+
#[allow(clippy::too_many_arguments)]
325+
#[pyo3(signature = (name, path, table_partition_cols=vec![],
326+
file_extension=".parquet",
327+
schema=None,
328+
file_sort_order=None))]
329+
pub fn register_listing_table(
330+
&mut self,
331+
name: &str,
332+
path: &str,
333+
table_partition_cols: Vec<(String, String)>,
334+
file_extension: &str,
335+
schema: Option<PyArrowType<Schema>>,
336+
file_sort_order: Option<Vec<Vec<PyExpr>>>,
337+
py: Python,
338+
) -> PyResult<()> {
339+
let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
340+
.with_file_extension(file_extension)
341+
.with_table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
342+
.with_file_sort_order(
343+
file_sort_order
344+
.unwrap_or_default()
345+
.into_iter()
346+
.map(|e| e.into_iter().map(|f| f.into()).collect())
347+
.collect(),
348+
);
349+
let table_path = ListingTableUrl::parse(path)?;
350+
let resolved_schema: SchemaRef = match schema {
351+
Some(s) => Arc::new(s.0),
352+
None => {
353+
let state = self.ctx.state();
354+
let schema = options.infer_schema(&state, &table_path);
355+
wait_for_future(py, schema).map_err(DataFusionError::from)?
356+
}
357+
};
358+
let config = ListingTableConfig::new(table_path)
359+
.with_listing_options(options)
360+
.with_schema(resolved_schema);
361+
let table = ListingTable::try_new(config)?;
362+
self.register_table(
363+
name,
364+
&PyTable {
365+
table: Arc::new(table),
366+
},
367+
)?;
368+
Ok(())
369+
}
370+
320371
/// Returns a PyDataFrame whose plan corresponds to the SQL statement.
321372
pub fn sql(&mut self, query: &str, py: Python) -> PyResult<PyDataFrame> {
322373
let result = self.ctx.sql(query);
@@ -913,8 +964,9 @@ pub fn convert_table_partition_cols(
913964
.into_iter()
914965
.map(|(name, ty)| match ty.as_str() {
915966
"string" => Ok((name, DataType::Utf8)),
967+
"int" => Ok((name, DataType::Int32)),
916968
_ => Err(DataFusionError::Common(format!(
917-
"Unsupported data type '{ty}' for partition column"
969+
"Unsupported data type '{ty}' for partition column. Supported types are 'string' and 'int'"
918970
))),
919971
})
920972
.collect::<Result<Vec<_>, _>>()

src/dataframe.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,7 @@ impl PyDataFrame {
424424
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
425425

426426
match stream {
427-
Ok(batches) => Ok(batches
428-
.into_iter()
429-
.map(|batch_stream| PyRecordBatchStream::new(batch_stream))
430-
.collect()),
427+
Ok(batches) => Ok(batches.into_iter().map(PyRecordBatchStream::new).collect()),
431428
_ => Err(PyValueError::new_err(
432429
"Unable to execute stream partitioned",
433430
)),

0 commit comments

Comments
 (0)