Skip to content

Commit d0ae849

Browse files
authored
chore: add python deprecation warnings (#3869)
# Description This PR extends our deprecation warnings for the `files` and `files_by_partitions` methods. i.e., a warning is emitted when used, and the function is decorated with `@deprecated`. We also tighten the checks in `to_pyarrow_dataset` to explicitly raise on tables with column mapping or deletion vectors enabled. Beyond that, there is some code beautification - mostly line breaks in pyo3 signatures. Signed-off-by: Robert Pack <[email protected]>
1 parent 3583056 commit d0ae849

File tree

3 files changed

+154
-43
lines changed

3 files changed

+154
-43
lines changed

python/deltalake/table.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,11 @@ def files(
331331
("z", "not in", ["a","b"])
332332
```
333333
"""
334+
warnings.warn(
335+
"Method `files` is deprecated, Use DeltaTable.file_uris(predicate) instead.",
336+
category=DeprecationWarning,
337+
stacklevel=2,
338+
)
334339
return self._table.files(self._stringify_partition_values(partition_filters))
335340

336341
def file_uris(
@@ -467,13 +472,17 @@ def schema(self) -> DeltaSchema:
467472
"""
468473
return self._table.schema
469474

475+
@deprecated(
476+
version="1.2.1",
477+
reason="Not compatible with modern Delta features (e.g. shallow clones). Use `file_uris` instead.",
478+
)
470479
def files_by_partitions(self, partition_filters: PartitionFilterType) -> list[str]:
471480
"""
472481
Get the files for each partition
473482
474483
"""
475484
warnings.warn(
476-
"files_by_partitions is deprecated, please use DeltaTable.files() instead.",
485+
"Method `files_by_partitions` is deprecated, please use DeltaTable.file_uris() instead.",
477486
category=DeprecationWarning,
478487
stacklevel=2,
479488
)
@@ -882,6 +891,24 @@ def to_pyarrow_dataset(
882891
"but these are not yet supported by the deltalake reader."
883892
)
884893

894+
if (
895+
table_protocol.reader_features
896+
and "columnMapping" in table_protocol.reader_features
897+
):
898+
raise DeltaProtocolError(
899+
"The table requires reader feature 'columnMapping' "
900+
"but this is not supported using pyarrow Datasets."
901+
)
902+
903+
if (
904+
table_protocol.reader_features
905+
and "deletionVectors" in table_protocol.reader_features
906+
):
907+
raise DeltaProtocolError(
908+
"The table requires reader feature 'deletionVectors' "
909+
"but this is not supported using pyarrow Datasets."
910+
)
911+
885912
import pyarrow
886913
import pyarrow.fs as pa_fs
887914

python/src/lib.rs

Lines changed: 122 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,13 @@ struct RawDeltaTable {
114114
_config: FsConfig,
115115
}
116116

117-
#[pyclass(frozen)]
117+
#[pyclass(frozen, get_all)]
118118
struct RawDeltaTableMetaData {
119-
#[pyo3(get)]
120119
id: String,
121-
#[pyo3(get)]
122120
name: Option<String>,
123-
#[pyo3(get)]
124121
description: Option<String>,
125-
#[pyo3(get)]
126122
partition_columns: Vec<String>,
127-
#[pyo3(get)]
128123
created_time: Option<i64>,
129-
#[pyo3(get)]
130124
configuration: HashMap<String, String>,
131125
}
132126

@@ -481,8 +475,15 @@ impl RawDeltaTable {
481475

482476
/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
483477
/// by the Delta table and are older than the retention threshold.
484-
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true,
485-
commit_properties=None, post_commithook_properties=None, full = false, keep_versions = None))]
478+
#[pyo3(signature = (
479+
dry_run,
480+
retention_hours = None,
481+
enforce_retention_duration = true,
482+
commit_properties=None,
483+
post_commithook_properties=None,
484+
full = false,
485+
keep_versions = None,
486+
))]
486487
#[allow(clippy::too_many_arguments)]
487488
pub fn vacuum(
488489
&self,
@@ -539,7 +540,14 @@ impl RawDeltaTable {
539540
}
540541

541542
/// Run the UPDATE command on the Delta Table
542-
#[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, commit_properties = None, post_commithook_properties=None))]
543+
#[pyo3(signature = (
544+
updates,
545+
predicate=None,
546+
writer_properties=None,
547+
safe_cast = false,
548+
commit_properties = None,
549+
post_commithook_properties=None,
550+
))]
543551
#[allow(clippy::too_many_arguments)]
544552
pub fn update(
545553
&self,
@@ -659,7 +667,8 @@ impl RawDeltaTable {
659667
min_commit_interval = None,
660668
writer_properties=None,
661669
commit_properties=None,
662-
post_commithook_properties=None))]
670+
post_commithook_properties=None,
671+
))]
663672
pub fn z_order_optimize(
664673
&self,
665674
py: Python,
@@ -750,7 +759,9 @@ impl RawDeltaTable {
750759
Ok(())
751760
}
752761

753-
#[pyo3(signature = (feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None))]
762+
#[pyo3(signature = (
763+
feature, allow_protocol_versions_increase, commit_properties=None, post_commithook_properties=None
764+
))]
754765
pub fn add_feature(
755766
&self,
756767
py: Python,
@@ -847,7 +858,15 @@ impl RawDeltaTable {
847858
Ok(())
848859
}
849860

850-
#[pyo3(signature = (starting_version = None, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, predicate = None, allow_out_of_range = false))]
861+
#[pyo3(signature = (
862+
starting_version = None,
863+
ending_version = None,
864+
starting_timestamp = None,
865+
ending_timestamp = None,
866+
columns = None,
867+
predicate = None,
868+
allow_out_of_range = false,
869+
))]
851870
#[allow(clippy::too_many_arguments)]
852871
pub fn load_cdf(
853872
&self,
@@ -988,7 +1007,9 @@ impl RawDeltaTable {
9881007
}
9891008

9901009
// Run the restore command on the Delta Table: restore table to a given version or datetime
991-
#[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None))]
1010+
#[pyo3(signature = (
1011+
target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None
1012+
))]
9921013
pub fn restore(
9931014
&self,
9941015
target: Option<&Bound<'_, PyAny>>,
@@ -1028,7 +1049,8 @@ impl RawDeltaTable {
10281049
Ok(serde_json::to_string(&metrics).unwrap())
10291050
}
10301051

1031-
/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
1052+
/// Run the History command on the Delta Table: Returns provenance information,
1053+
/// including the operation, user, and so on, for each write to a table.
10321054
#[pyo3(signature = (limit=None))]
10331055
pub fn history(&self, limit: Option<usize>) -> PyResult<Vec<String>> {
10341056
#[allow(clippy::await_holding_lock)]
@@ -1220,7 +1242,15 @@ impl RawDeltaTable {
12201242
}
12211243

12221244
#[allow(clippy::too_many_arguments)]
1223-
#[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))]
1245+
#[pyo3(signature = (
1246+
add_actions,
1247+
mode,
1248+
partition_by,
1249+
schema,
1250+
partitions_filters=None,
1251+
commit_properties=None,
1252+
post_commithook_properties=None,
1253+
))]
12241254
fn create_write_transaction(
12251255
&self,
12261256
py: Python,
@@ -1467,7 +1497,9 @@ impl RawDeltaTable {
14671497
}
14681498

14691499
/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
1470-
#[pyo3(signature = (predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
1500+
#[pyo3(signature = (
1501+
predicate = None, writer_properties=None, commit_properties=None, post_commithook_properties=None
1502+
))]
14711503
pub fn delete(
14721504
&self,
14731505
py: Python,
@@ -1665,7 +1697,21 @@ impl RawDeltaTable {
16651697
}
16661698

16671699
#[allow(clippy::too_many_arguments)]
1668-
#[pyo3(signature = (data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
1700+
#[pyo3(signature = (
1701+
data,
1702+
batch_schema,
1703+
mode,
1704+
schema_mode=None,
1705+
partition_by=None,
1706+
predicate=None,
1707+
target_file_size=None,
1708+
name=None,
1709+
description=None,
1710+
configuration=None,
1711+
writer_properties=None,
1712+
commit_properties=None,
1713+
post_commithook_properties=None
1714+
))]
16691715
fn write(
16701716
&self,
16711717
py: Python,
@@ -2173,14 +2219,6 @@ fn shutdown_tracing() -> PyResult<()> {
21732219
Ok(())
21742220
}
21752221

2176-
fn current_timestamp() -> i64 {
2177-
let start = SystemTime::now();
2178-
let since_the_epoch = start
2179-
.duration_since(UNIX_EPOCH)
2180-
.expect("Time went backwards");
2181-
since_the_epoch.as_millis().try_into().unwrap()
2182-
}
2183-
21842222
#[derive(FromPyObject)]
21852223
pub struct PyAddAction {
21862224
path: String,
@@ -2244,14 +2282,11 @@ pub struct PyPostCommitHookProperties {
22442282
}
22452283

22462284
#[derive(Clone)]
2247-
#[pyclass(name = "Transaction", module = "deltalake._internal")]
2285+
#[pyclass(name = "Transaction", module = "deltalake._internal", get_all)]
22482286
pub struct PyTransaction {
2249-
#[pyo3(get)]
2250-
pub app_id: String,
2251-
#[pyo3(get)]
2252-
pub version: i64,
2253-
#[pyo3(get)]
2254-
pub last_updated: Option<i64>,
2287+
app_id: String,
2288+
version: i64,
2289+
last_updated: Option<i64>,
22552290
}
22562291

22572292
#[pymethods]
@@ -2306,7 +2341,23 @@ pub struct PyCommitProperties {
23062341

23072342
#[pyfunction]
23082343
#[allow(clippy::too_many_arguments)]
2309-
#[pyo3(signature = (table_uri, data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
2344+
#[pyo3(signature = (
2345+
table_uri,
2346+
data,
2347+
batch_schema,
2348+
mode,
2349+
schema_mode=None,
2350+
partition_by=None,
2351+
predicate=None,
2352+
target_file_size=None,
2353+
name=None,
2354+
description=None,
2355+
configuration=None,
2356+
storage_options=None,
2357+
writer_properties=None,
2358+
commit_properties=None,
2359+
post_commithook_properties=None,
2360+
))]
23102361
fn write_to_deltalake(
23112362
py: Python,
23122363
table_uri: String,
@@ -2365,7 +2416,19 @@ fn write_to_deltalake(
23652416

23662417
#[pyfunction]
23672418
#[allow(clippy::too_many_arguments)]
2368-
#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))]
2419+
#[pyo3(signature = (
2420+
table_uri,
2421+
schema,
2422+
partition_by,
2423+
mode,
2424+
raise_if_key_not_exists,
2425+
name=None,
2426+
description=None,
2427+
configuration=None,
2428+
storage_options=None,
2429+
commit_properties=None,
2430+
post_commithook_properties=None,
2431+
))]
23692432
fn create_deltalake(
23702433
py: Python,
23712434
table_uri: String,
@@ -2432,7 +2495,19 @@ fn create_deltalake(
24322495

24332496
#[pyfunction]
24342497
#[allow(clippy::too_many_arguments)]
2435-
#[pyo3(signature = (table_uri, schema, add_actions, mode, partition_by, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))]
2498+
#[pyo3(signature = (
2499+
table_uri,
2500+
schema,
2501+
add_actions,
2502+
mode,
2503+
partition_by,
2504+
name=None,
2505+
description=None,
2506+
configuration=None,
2507+
storage_options=None,
2508+
commit_properties=None,
2509+
post_commithook_properties=None
2510+
))]
24362511
fn create_table_with_add_actions(
24372512
py: Python,
24382513
table_uri: String,
@@ -2498,7 +2573,17 @@ fn create_table_with_add_actions(
24982573

24992574
#[pyfunction]
25002575
#[allow(clippy::too_many_arguments)]
2501-
#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, commit_properties=None, post_commithook_properties=None))]
2576+
#[pyo3(signature = (
2577+
uri,
2578+
partition_schema=None,
2579+
partition_strategy=None,
2580+
name=None,
2581+
description=None,
2582+
configuration=None,
2583+
storage_options=None,
2584+
commit_properties=None,
2585+
post_commithook_properties=None,
2586+
))]
25022587
fn convert_to_deltalake(
25032588
py: Python,
25042589
uri: String,

python/src/reader.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
use arrow_schema::{ArrowError, SchemaRef};
22
use deltalake::arrow::array::RecordBatchReader;
33
use deltalake::arrow::record_batch::RecordBatch;
4-
use deltalake::datafusion::execution::RecordBatchStream;
4+
use deltalake::datafusion::execution::SendableRecordBatchStream;
55
use futures::StreamExt;
6-
use std::pin::Pin;
76

87
use crate::utils::rt;
98

109
/// A lazy adapter to convert an async RecordBatchStream into a sync RecordBatchReader
1110
struct StreamToReaderAdapter {
1211
schema: SchemaRef,
13-
stream: Pin<Box<dyn RecordBatchStream + Send>>,
12+
stream: SendableRecordBatchStream,
1413
}
1514

1615
impl Iterator for StreamToReaderAdapter {
@@ -28,9 +27,9 @@ impl RecordBatchReader for StreamToReaderAdapter {
2827
}
2928
}
3029

31-
/// Converts a RecordBatchStream into a lazy RecordBatchReader
30+
/// Converts a [`SendableRecordBatchStream`] into a lazy RecordBatchReader
3231
pub(crate) fn convert_stream_to_reader(
33-
stream: Pin<Box<dyn RecordBatchStream + Send>>,
32+
stream: SendableRecordBatchStream,
3433
) -> Box<dyn RecordBatchReader + Send> {
3534
Box::new(StreamToReaderAdapter {
3635
schema: stream.schema(),

0 commit comments

Comments
 (0)