Skip to content

Commit 159805d

Browse files
authored
Handle Empty RecordBatch within _task_to_record_batches (apache#1026)
1 parent 3a06237 commit 159805d

File tree

3 files changed

+101
-2
lines changed

3 files changed

+101
-2
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,9 +1235,11 @@ def _task_to_record_batches(
12351235
columns=[col.name for col in file_project_schema.columns],
12361236
)
12371237

1238-
current_index = 0
1238+
next_index = 0
12391239
batches = fragment_scanner.to_batches()
12401240
for batch in batches:
1241+
next_index = next_index + len(batch)
1242+
current_index = next_index - len(batch)
12411243
if positional_deletes:
12421244
# Create the mask of indices that we're interested in
12431245
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
@@ -1249,11 +1251,12 @@ def _task_to_record_batches(
12491251
# https://github.com/apache/arrow/issues/39220
12501252
arrow_table = pa.Table.from_batches([batch])
12511253
arrow_table = arrow_table.filter(pyarrow_filter)
1254+
if len(arrow_table) == 0:
1255+
continue
12521256
batch = arrow_table.to_batches()[0]
12531257
yield _to_requested_schema(
12541258
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
12551259
)
1256-
current_index += len(batch)
12571260

12581261

12591262
def _task_to_table(

tests/integration/test_deletes.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,74 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio
224224
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}
225225

226226

227+
@pytest.mark.integration
228+
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
229+
def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, session_catalog: RestCatalog) -> None:
230+
identifier = "default.test_delete_partitioned_table_positional_deletes_empty_batch"
231+
232+
run_spark_commands(
233+
spark,
234+
[
235+
f"DROP TABLE IF EXISTS {identifier}",
236+
f"""
237+
CREATE TABLE {identifier} (
238+
number_partitioned int,
239+
number int
240+
)
241+
USING iceberg
242+
PARTITIONED BY (number_partitioned)
243+
TBLPROPERTIES(
244+
'format-version' = 2,
245+
'write.delete.mode'='merge-on-read',
246+
'write.update.mode'='merge-on-read',
247+
'write.merge.mode'='merge-on-read',
248+
'write.parquet.row-group-limit'=1
249+
)
250+
""",
251+
],
252+
)
253+
254+
tbl = session_catalog.load_table(identifier)
255+
256+
arrow_table = pa.Table.from_arrays(
257+
[
258+
pa.array([10, 10, 10]),
259+
pa.array([1, 2, 3]),
260+
],
261+
schema=pa.schema([pa.field("number_partitioned", pa.int32()), pa.field("number", pa.int32())]),
262+
)
263+
264+
tbl.append(arrow_table)
265+
266+
assert len(tbl.scan().to_arrow()) == 3
267+
268+
run_spark_commands(
269+
spark,
270+
[
271+
# Generate a positional delete
272+
f"""
273+
DELETE FROM {identifier} WHERE number = 1
274+
""",
275+
],
276+
)
277+
# Assert that there is just a single Parquet file, that has one merge on read file
278+
tbl = tbl.refresh()
279+
280+
files = list(tbl.scan().plan_files())
281+
assert len(files) == 1
282+
assert len(files[0].delete_files) == 1
283+
284+
assert len(tbl.scan().to_arrow()) == 2
285+
286+
assert len(tbl.scan(row_filter="number_partitioned == 10").to_arrow()) == 2
287+
288+
assert len(tbl.scan(row_filter="number_partitioned == 1").to_arrow()) == 0
289+
290+
reader = tbl.scan(row_filter="number_partitioned == 1").to_arrow_batch_reader()
291+
assert isinstance(reader, pa.RecordBatchReader)
292+
assert len(reader.read_all()) == 0
293+
294+
227295
@pytest.mark.integration
228296
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
229297
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None:

tests/integration/test_reads.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,3 +791,31 @@ def test_empty_scan_ordered_str(catalog: Catalog) -> None:
791791
table_empty_scan_ordered_str = catalog.load_table("default.test_empty_scan_ordered_str")
792792
arrow_table = table_empty_scan_ordered_str.scan(EqualTo("id", "b")).to_arrow()
793793
assert len(arrow_table) == 0
794+
795+
796+
@pytest.mark.integration
797+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
798+
def test_table_scan_empty_table(catalog: Catalog) -> None:
799+
identifier = "default.test_table_scan_empty_table"
800+
arrow_table = pa.Table.from_arrays(
801+
[
802+
pa.array([]),
803+
],
804+
schema=pa.schema([pa.field("colA", pa.string())]),
805+
)
806+
807+
try:
808+
catalog.drop_table(identifier)
809+
except NoSuchTableError:
810+
pass
811+
812+
tbl = catalog.create_table(
813+
identifier,
814+
schema=arrow_table.schema,
815+
)
816+
817+
tbl.append(arrow_table)
818+
819+
result_table = tbl.scan().to_arrow()
820+
821+
assert len(result_table) == 0

0 commit comments

Comments
 (0)