Skip to content

Commit 08da7b9

Browse files
committed
Catch any exception thrown by the StreamableExecutor thread and rethrow it from executeNext
1 parent 0264cb0 commit 08da7b9

File tree

3 files changed

+12
-6
lines changed

3 files changed

+12
-6
lines changed

pdal/StreamableExecutor.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,15 @@ StreamableExecutor::StreamableExecutor(std::string const& json,
188188
int prefetch)
189189
: PipelineExecutor(json, arrays, level)
190190
, m_table(chunkSize, prefetch)
191+
, m_exc(nullptr)
191192
{
192-
if (!m_manager.pipelineStreamable())
193-
throw pdal_error("Pipeline is not streamable");
194-
195193
m_thread.reset(new std::thread([this]()
196194
{
197-
m_manager.executeStream(m_table);
195+
try {
196+
m_manager.executeStream(m_table);
197+
} catch (...) {
198+
m_exc = std::current_exception();
199+
}
198200
m_table.done();
199201
}));
200202
}
@@ -220,6 +222,8 @@ PyArrayObject *StreamableExecutor::executeNext()
220222
arr = m_table.fetchArray();
221223
if (arr == nullptr)
222224
m_executed = true;
225+
if (m_exc)
226+
std::rethrow_exception(m_exc);
223227
}
224228
return arr;
225229
}

pdal/StreamableExecutor.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class StreamableExecutor : public PipelineExecutor
9191

9292
PythonPointTable m_table;
9393
std::unique_ptr<std::thread> m_thread;
94+
std::exception_ptr m_exc;
9495
};
9596

9697
} // namespace python

test/test_pipeline.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,9 +436,10 @@ class TestPipelineIterator:
436436
@pytest.mark.parametrize("filename", ["sort.json", "sort.py"])
437437
def test_non_streamable(self, filename):
438438
r = get_pipeline(filename)
439+
assert not r.streamable
439440
with pytest.raises(RuntimeError) as info:
440-
r.iterator()
441-
assert "Pipeline is not streamable" in str(info.value)
441+
next(r.iterator(chunk_size=100))
442+
assert "Attempting to use stream mode" in str(info.value)
442443

443444
@pytest.mark.parametrize("filename", ["range.json", "range.py"])
444445
def test_array(self, filename):

0 commit comments

Comments
 (0)