Skip to content

Commit 0130a72

Browse files
committed
feat: ensure proper resource management in DataFrame streaming
1 parent 31e8ed1 commit 0130a72

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

python/datafusion/dataframe.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1130,10 +1130,13 @@ def __iter__(self) -> Iterator[pa.RecordBatch]:
11301130
partitioned streaming APIs so ``collect`` is never invoked and batch
11311131
order across partitions is preserved.
11321132
"""
1133+
from contextlib import closing
1134+
11331135
import pyarrow as pa
11341136

11351137
reader = pa.RecordBatchReader._import_from_c_capsule(self.__arrow_c_stream__())
1136-
yield from reader
1138+
with closing(reader):
1139+
yield from reader
11371140

11381141
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11391142
"""Apply a function to the current DataFrame which returns another DataFrame.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import pyarrow as pa
19+
20+
21+
def test_iter_releases_reader(monkeypatch, ctx):
22+
batches = [
23+
pa.RecordBatch.from_pydict({"a": [1]}),
24+
pa.RecordBatch.from_pydict({"a": [2]}),
25+
]
26+
27+
class DummyReader:
28+
def __init__(self, batches):
29+
self._iter = iter(batches)
30+
self.closed = False
31+
32+
def __iter__(self):
33+
return self
34+
35+
def __next__(self):
36+
return next(self._iter)
37+
38+
def close(self):
39+
self.closed = True
40+
41+
dummy_reader = DummyReader(batches)
42+
43+
class FakeRecordBatchReader:
44+
@staticmethod
45+
def _import_from_c_capsule(*_args, **_kwargs):
46+
return dummy_reader
47+
48+
monkeypatch.setattr(pa, "RecordBatchReader", FakeRecordBatchReader)
49+
50+
df = ctx.from_pydict({"a": [1, 2]})
51+
52+
for _ in df:
53+
break
54+
55+
assert dummy_reader.closed

0 commit comments

Comments
 (0)