@@ -1467,7 +1467,8 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression:
1467
1467
1468
1468
1469
1469
class TableScan (ABC ):
1470
- table : Table
1470
+ table_metadata : TableMetadata
1471
+ io : FileIO
1471
1472
row_filter : BooleanExpression
1472
1473
selected_fields : Tuple [str , ...]
1473
1474
case_sensitive : bool
@@ -1477,15 +1478,17 @@ class TableScan(ABC):
1477
1478
1478
1479
def __init__ (
1479
1480
self ,
1480
- table : Table ,
1481
+ table_metadata : TableMetadata ,
1482
+ io : FileIO ,
1481
1483
row_filter : Union [str , BooleanExpression ] = ALWAYS_TRUE ,
1482
1484
selected_fields : Tuple [str , ...] = ("*" ,),
1483
1485
case_sensitive : bool = True ,
1484
1486
snapshot_id : Optional [int ] = None ,
1485
1487
options : Properties = EMPTY_DICT ,
1486
1488
limit : Optional [int ] = None ,
1487
1489
):
1488
- self .table = table
1490
+ self .table_metadata = table_metadata
1491
+ self .io = io
1489
1492
self .row_filter = _parse_row_filter (row_filter )
1490
1493
self .selected_fields = selected_fields
1491
1494
self .case_sensitive = case_sensitive
@@ -1495,16 +1498,16 @@ def __init__(
1495
1498
1496
1499
def snapshot (self ) -> Optional [Snapshot ]:
1497
1500
if self .snapshot_id :
1498
- return self .table .snapshot_by_id (self .snapshot_id )
1499
- return self .table .current_snapshot ()
1501
+ return self .table_metadata .snapshot_by_id (self .snapshot_id )
1502
+ return self .table_metadata .current_snapshot ()
1500
1503
1501
1504
def projection (self ) -> Schema :
1502
- current_schema = self .table .schema ()
1505
+ current_schema = self .table_metadata .schema ()
1503
1506
if self .snapshot_id is not None :
1504
- snapshot = self .table .snapshot_by_id (self .snapshot_id )
1507
+ snapshot = self .table_metadata .snapshot_by_id (self .snapshot_id )
1505
1508
if snapshot is not None :
1506
1509
if snapshot .schema_id is not None :
1507
- snapshot_schema = self .table .schemas ().get (snapshot .schema_id )
1510
+ snapshot_schema = self .table_metadata .schemas ().get (snapshot .schema_id )
1508
1511
if snapshot_schema is not None :
1509
1512
current_schema = snapshot_schema
1510
1513
else :
@@ -1625,17 +1628,6 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
1625
1628
1626
1629
1627
1630
class DataScan (TableScan ):
1628
- def __init__ (
1629
- self ,
1630
- table : Table ,
1631
- row_filter : Union [str , BooleanExpression ] = ALWAYS_TRUE ,
1632
- selected_fields : Tuple [str , ...] = ("*" ,),
1633
- case_sensitive : bool = True ,
1634
- snapshot_id : Optional [int ] = None ,
1635
- options : Properties = EMPTY_DICT ,
1636
- limit : Optional [int ] = None ,
1637
- ):
1638
- super ().__init__ (table , row_filter , selected_fields , case_sensitive , snapshot_id , options , limit )
1639
1631
1640
1632
def _build_partition_projection (self , spec_id : int ) -> BooleanExpression :
1641
1633
project = inclusive_projection (self .table .schema (), self .table .specs ()[spec_id ])
@@ -2912,7 +2904,9 @@ def _commit(self) -> UpdatesAndRequirements:
2912
2904
)
2913
2905
2914
2906
2915
- class DeleteFiles (_MergingSnapshotProducer ):
2907
+ class MetadataDeleteFiles (_MergingSnapshotProducer ):
2908
+ """Will delete manifest entries from the current snapshot based on the predicate"""
2909
+
2916
2910
_predicate : BooleanExpression
2917
2911
2918
2912
def __init__ (
@@ -2954,7 +2948,7 @@ def delete(self, predicate: BooleanExpression) -> None:
2954
2948
self ._predicate = Or (self ._predicate , predicate )
2955
2949
2956
2950
@cached_property
2957
- def _compute_deletes (self ) -> Tuple [List [ManifestFile ], List [ManifestEntry ]]:
2951
+ def _compute_deletes (self ) -> Tuple [List [ManifestFile ], List [ManifestEntry ], bool ]:
2958
2952
schema = self ._transaction .table_metadata .schema ()
2959
2953
2960
2954
def _copy_with_new_status (entry : ManifestEntry , status : ManifestEntryStatus ) -> ManifestEntry :
@@ -2972,6 +2966,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
2972
2966
2973
2967
existing_manifests = []
2974
2968
total_deleted_entries = []
2969
+ partial_rewrites_needed = False
2975
2970
if snapshot := self ._transaction .table_metadata .current_snapshot ():
2976
2971
for num , manifest_file in enumerate (snapshot .manifests (io = self ._io )):
2977
2972
if not manifest_evaluators [manifest_file .partition_spec_id ](manifest_file ):
@@ -2987,7 +2982,8 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
2987
2982
elif inclusive_metrics_evaluator (entry .data_file ) == ROWS_CANNOT_MATCH :
2988
2983
existing_entries .append (_copy_with_new_status (entry , ManifestEntryStatus .EXISTING ))
2989
2984
else :
2990
- raise ValueError ("Deletes do not support rewrites of data files" )
2985
+ # Based on the metadata, it is unsure to say if the file can be deleted
2986
+ partial_rewrites_needed = True
2991
2987
2992
2988
if len (deleted_entries ) > 0 :
2993
2989
total_deleted_entries += deleted_entries
@@ -3006,17 +3002,22 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
3006
3002
) as writer :
3007
3003
for existing_entry in existing_entries :
3008
3004
writer .add_entry (existing_entry )
3005
+ existing_manifests .append (writer .to_manifest_file ())
3009
3006
else :
3010
3007
existing_manifests .append (manifest_file )
3011
3008
3012
- return existing_manifests , total_deleted_entries
3009
+ return existing_manifests , total_deleted_entries , partial_rewrites_needed
3013
3010
3014
3011
def _existing_manifests (self ) -> List [ManifestFile ]:
3015
3012
return self ._compute_deletes [0 ]
3016
3013
3017
3014
def _deleted_entries (self ) -> List [ManifestEntry ]:
3018
3015
return self ._compute_deletes [1 ]
3019
3016
3017
+ def rewrites_needed (self ) -> bool :
3018
+ """Indicates if data files need to be rewritten"""
3019
+ return self ._compute_deletes [2 ]
3020
+
3020
3021
3021
3022
class FastAppendFiles (_MergingSnapshotProducer ):
3022
3023
def _existing_manifests (self ) -> List [ManifestFile ]:
@@ -3115,8 +3116,8 @@ def overwrite(self) -> OverwriteFiles:
3115
3116
snapshot_properties = self ._snapshot_properties ,
3116
3117
)
3117
3118
3118
- def delete (self ) -> DeleteFiles :
3119
- return DeleteFiles (
3119
+ def delete (self ) -> MetadataDeleteFiles :
3120
+ return MetadataDeleteFiles (
3120
3121
operation = Operation .DELETE ,
3121
3122
transaction = self ._transaction ,
3122
3123
io = self ._io ,
0 commit comments