Skip to content

Commit 5e871fb

Browse files
committed
Last few bits
1 parent 7dae071 commit 5e871fb

File tree

4 files changed

+60
-27
lines changed

4 files changed

+60
-27
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1772,12 +1772,9 @@ def data_file_statistics_from_parquet_metadata(
17721772
)
17731773

17741774

1775-
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterable["WriteTask"]) -> Iterator[DataFile]:
1775+
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
17761776
from pyiceberg.table import PropertyUtil, TableProperties
17771777

1778-
schema = table_metadata.schema()
1779-
arrow_file_schema = schema.as_arrow()
1780-
17811778
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
17821779
row_group_size = PropertyUtil.property_as_int(
17831780
properties=table_metadata.properties,

pyiceberg/manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ class DataFile(Record):
338338
split_offsets: Optional[List[int]]
339339
equality_ids: Optional[List[int]]
340340
sort_order_id: Optional[int]
341-
spec_id: Optional[int]
341+
spec_id: int
342342

343343
def __setattr__(self, name: str, value: Any) -> None:
344344
"""Assign a key/value to a DataFile."""

pyiceberg/table/__init__.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,10 @@ def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str
465465
warnings.warn("PyIceberg only supports copy on write")
466466

467467
with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
468-
delete_snapshot.delete_by_predicate(delete_filter) # type: ignore
468+
delete_snapshot.delete_by_predicate(delete_filter)
469469

470470
# Check if there are any files that require an actual rewrite of a data file
471-
if delete_snapshot.rewrites_needed is True: # type: ignore
471+
if delete_snapshot.rewrites_needed is True:
472472
# When we want to filter out certain rows, we want to invert the expression
473473
# delete id = 22 means that we want to look for that value, and then remove
474474
# if from the Parquet file
@@ -2767,7 +2767,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
27672767
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))
27682768

27692769

2770-
class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
2770+
class _MergingSnapshotProducer(UpdateTableMetadata[U], Generic[U]):
27712771
commit_uuid: uuid.UUID
27722772
_operation: Operation
27732773
_snapshot_id: int
@@ -2798,11 +2798,11 @@ def __init__(
27982798
self.snapshot_properties = snapshot_properties
27992799
self._manifest_counter = itertools.count(0)
28002800

2801-
def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
2801+
def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]:
28022802
self._added_data_files.append(data_file)
28032803
return self
28042804

2805-
def delete_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
2805+
def delete_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]:
28062806
self._deleted_data_files.add(data_file)
28072807
return self
28082808

@@ -2893,7 +2893,7 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
28932893
for data_file in self._deleted_data_files:
28942894
ssc.remove_file(
28952895
data_file=data_file,
2896-
partition_spec=specs.get(data_file.spec_id),
2896+
partition_spec=specs[data_file.spec_id],
28972897
schema=self._transaction.table_metadata.schema(),
28982898
)
28992899

@@ -2953,7 +2953,7 @@ def _commit(self) -> UpdatesAndRequirements:
29532953
)
29542954

29552955

2956-
class DeleteFiles(_MergingSnapshotProducer):
2956+
class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
29572957
"""Will delete manifest entries from the current snapshot based on the predicate.
29582958
29592959
This will produce a DELETE snapshot:
@@ -3102,7 +3102,7 @@ def files_affected(self) -> bool:
31023102
return len(self._deleted_entries()) > 0
31033103

31043104

3105-
class FastAppendFiles(_MergingSnapshotProducer):
3105+
class FastAppendFiles(_MergingSnapshotProducer["FastAppendFiles"]):
31063106
def _existing_manifests(self) -> List[ManifestFile]:
31073107
"""To determine if there are any existing manifest files.
31083108
@@ -3131,7 +3131,7 @@ def _deleted_entries(self) -> List[ManifestEntry]:
31313131
return []
31323132

31333133

3134-
class OverwriteFiles(_MergingSnapshotProducer):
3134+
class OverwriteFiles(_MergingSnapshotProducer["OverwriteFiles"]):
31353135
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
31363136
31373137
Data and delete files were added and removed in a logical overwrite operation.

tests/integration/test_writes/test_writes.py

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,11 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
185185
).collect()
186186

187187
operations = [row.operation for row in rows]
188-
assert operations == ['append', 'append', 'overwrite']
188+
assert operations == ['append', 'append', 'delete', 'overwrite']
189189

190190
summaries = [row.summary for row in rows]
191191

192+
# Append
192193
assert summaries[0] == {
193194
'added-data-files': '1',
194195
'added-files-size': '5459',
@@ -201,6 +202,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
201202
'total-records': '3',
202203
}
203204

205+
# Append
204206
assert summaries[1] == {
205207
'added-data-files': '1',
206208
'added-files-size': '5459',
@@ -213,13 +215,24 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
213215
'total-records': '6',
214216
}
215217

218+
# Delete
216219
assert summaries[2] == {
217-
'added-data-files': '1',
218-
'added-files-size': '5459',
219-
'added-records': '3',
220220
'deleted-data-files': '2',
221221
'deleted-records': '6',
222222
'removed-files-size': '10918',
223+
'total-data-files': '0',
224+
'total-delete-files': '0',
225+
'total-equality-deletes': '0',
226+
'total-files-size': '0',
227+
'total-position-deletes': '0',
228+
'total-records': '0',
229+
}
230+
231+
# Overwrite
232+
assert summaries[3] == {
233+
'added-data-files': '1',
234+
'added-files-size': '5459',
235+
'added-records': '3',
223236
'total-data-files': '1',
224237
'total-delete-files': '0',
225238
'total-equality-deletes': '0',
@@ -247,9 +260,9 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w
247260
"""
248261
).collect()
249262

250-
assert [row.added_data_files_count for row in rows] == [1, 1, 0, 1, 1]
251-
assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0]
252-
assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0]
263+
assert [row.added_data_files_count for row in rows] == [1, 0, 1, 0, 1, 1]
264+
assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0, 0]
265+
assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 1, 0, 0]
253266

254267

255268
@pytest.mark.integration
@@ -476,7 +489,7 @@ def test_summaries_with_only_nulls(
476489
).collect()
477490

478491
operations = [row.operation for row in rows]
479-
assert operations == ['append', 'append', 'overwrite']
492+
assert operations == ['append', 'append', 'delete', 'overwrite']
480493

481494
summaries = [row.summary for row in rows]
482495

@@ -502,14 +515,23 @@ def test_summaries_with_only_nulls(
502515
}
503516

504517
assert summaries[2] == {
518+
'deleted-data-files': '1',
519+
'deleted-records': '2',
505520
'removed-files-size': '4239',
521+
'total-data-files': '0',
522+
'total-delete-files': '0',
506523
'total-equality-deletes': '0',
524+
'total-files-size': '0',
507525
'total-position-deletes': '0',
508-
'deleted-data-files': '1',
526+
'total-records': '0',
527+
}
528+
529+
assert summaries[3] == {
530+
'total-data-files': '0',
509531
'total-delete-files': '0',
532+
'total-equality-deletes': '0',
510533
'total-files-size': '0',
511-
'deleted-records': '2',
512-
'total-data-files': '0',
534+
'total-position-deletes': '0',
513535
'total-records': '0',
514536
}
515537

@@ -731,13 +753,14 @@ def test_inspect_snapshots(
731753
assert isinstance(snapshot_id.as_py(), int)
732754

733755
assert df['parent_id'][0].as_py() is None
734-
assert df['parent_id'][1:] == df['snapshot_id'][:2]
756+
assert df['parent_id'][1:].to_pylist() == df['snapshot_id'][:-1].to_pylist()
735757

736-
assert [operation.as_py() for operation in df['operation']] == ['append', 'overwrite', 'append']
758+
assert [operation.as_py() for operation in df['operation']] == ['append', 'delete', 'overwrite', 'append']
737759

738760
for manifest_list in df['manifest_list']:
739761
assert manifest_list.as_py().startswith("s3://")
740762

763+
# Append
741764
assert df['summary'][0].as_py() == [
742765
('added-files-size', '5459'),
743766
('added-data-files', '1'),
@@ -750,6 +773,19 @@ def test_inspect_snapshots(
750773
('total-equality-deletes', '0'),
751774
]
752775

776+
# Delete
777+
assert df['summary'][1].as_py() == [
778+
('removed-files-size', '5459'),
779+
('deleted-data-files', '1'),
780+
('deleted-records', '3'),
781+
('total-data-files', '0'),
782+
('total-delete-files', '0'),
783+
('total-records', '0'),
784+
('total-files-size', '0'),
785+
('total-position-deletes', '0'),
786+
('total-equality-deletes', '0'),
787+
]
788+
753789
lhs = spark.table(f"{identifier}.snapshots").toPandas()
754790
rhs = df.to_pandas()
755791
for column in df.column_names:

0 commit comments

Comments
 (0)