Skip to content

Commit 27103a9

Browse files
committed
Handle premature loop termination
1 parent 2c1ff3c commit 27103a9

File tree

4 files changed

+54
-13
lines changed

4 files changed

+54
-13
lines changed

pdal/StreamableExecutor.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ void PythonPointTable::reset()
138138
}
139139
}
140140

141+
void PythonPointTable::disable()
142+
{
143+
// TODO: uncomment the next line when/if StreamPointTable.m_capacity
144+
// changes from private to protected
145+
// m_capacity = 0;
146+
}
147+
141148
void PythonPointTable::done()
142149
{
143150
m_arrays.push(nullptr);
@@ -198,14 +205,28 @@ PyArrayObject *StreamableExecutor::executeNext()
198205
// Blocks until something is ready.
199206
PyArrayObject *arr = m_table.fetchArray();
200207
if (arr == nullptr)
208+
done();
209+
return arr;
210+
}
211+
212+
void StreamableExecutor::stop()
213+
{
214+
if (m_thread)
201215
{
202-
Py_BEGIN_ALLOW_THREADS
203-
m_thread->join();
204-
Py_END_ALLOW_THREADS
205-
m_thread.reset();
206-
m_executed = true;
216+
m_table.disable();
217+
while (PyArrayObject* arr = m_table.fetchArray())
218+
Py_XDECREF(arr);
219+
done();
207220
}
208-
return arr;
221+
}
222+
223+
void StreamableExecutor::done()
224+
{
225+
Py_BEGIN_ALLOW_THREADS
226+
m_thread->join();
227+
Py_END_ALLOW_THREADS
228+
m_thread.reset();
229+
m_executed = true;
209230
}
210231

211232
} // namespace python

pdal/StreamableExecutor.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class PythonPointTable : public StreamPointTable
5959
~PythonPointTable();
6060

6161
virtual void finalize();
62+
void disable();
6263
void done();
6364
PyArrayObject *fetchArray();
6465

@@ -88,8 +89,11 @@ class StreamableExecutor : public PipelineExecutor
8889
~StreamableExecutor();
8990

9091
PyArrayObject* executeNext();
92+
void stop();
9193

9294
private:
95+
void done();
96+
9397
PythonPointTable m_table;
9498
std::unique_ptr<std::thread> m_thread;
9599
};

pdal/libpdalpython.pyx

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ cdef extern from "StreamableExecutor.hpp" namespace "pdal::python":
107107
cdef cppclass StreamableExecutor(PipelineExecutor):
108108
StreamableExecutor(string, int, int) except +
109109
np.PyArrayObject* executeNext() except +
110+
void stop() except +
110111

111112

112113
cdef extern from "PyArray.hpp" namespace "pdal::python":
@@ -216,13 +217,16 @@ cdef class Pipeline:
216217

217218
def __iter__(self):
218219
cdef StreamableExecutor* executor = self._get_executor()
219-
while True:
220-
arr_ptr = executor.executeNext()
221-
if arr_ptr is NULL:
222-
break
223-
arr = <object>arr_ptr
224-
Py_DECREF(arr)
225-
yield arr
220+
try:
221+
while True:
222+
arr_ptr = executor.executeNext()
223+
if arr_ptr is NULL:
224+
break
225+
arr = <object>arr_ptr
226+
Py_DECREF(arr)
227+
yield arr
228+
finally:
229+
executor.stop()
226230

227231
#========= non-public properties & methods ===========================================
228232

test/test_pipeline.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,3 +484,15 @@ def test_merged_arrays(self):
484484
chunk_size=chunk_size))
485485
np.testing.assert_array_equal(np.concatenate(streaming_arrays),
486486
non_streaming_array)
487+
488+
def test_premature_exit(self):
489+
"""Can we stop iterating before all arrays are fetched"""
490+
r = get_pipeline("range.json")
491+
r.execute()
492+
assert len(r.arrays) == 1
493+
array = r.arrays[0]
494+
495+
ri = get_pipeline("range.json", chunk_size=100)
496+
for array2 in ri:
497+
np.testing.assert_array_equal(array2, array[:len(array2)])
498+
break

0 commit comments

Comments
 (0)