Skip to content

Commit 277135f

Browse files
committed
Simplify StreamableExecutor
1 parent 139e00a commit 277135f

File tree

4 files changed

+22
-46
lines changed

4 files changed

+22
-46
lines changed

pdal/PyPipeline.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class PDAL_DLL PipelineExecutor {
5555

5656
point_count_t execute();
5757

58-
bool executed() const { return m_executed; }
5958
const PointViewSet& views() const;
6059
std::string getPipeline() const;
6160
std::string getMetadata() const;

pdal/StreamableExecutor.cpp

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -190,49 +190,37 @@ StreamableExecutor::StreamableExecutor(std::string const& json,
190190
{
191191
if (!m_manager.pipelineStreamable())
192192
throw pdal_error("Pipeline is not streamable");
193-
}
194-
195-
StreamableExecutor::~StreamableExecutor()
196-
{
197-
stop();
198-
}
199193

200-
PyArrayObject *StreamableExecutor::executeNext()
201-
{
202-
if (!m_thread)
194+
m_thread.reset(new std::thread([this]()
203195
{
204-
m_thread.reset(new std::thread([this]()
205-
{
206-
m_manager.executeStream(m_table);
207-
m_table.done();
208-
}));
209-
}
210-
211-
// Blocks until something is ready.
212-
PyArrayObject *arr = m_table.fetchArray();
213-
if (arr == nullptr)
214-
done();
215-
return arr;
196+
m_manager.executeStream(m_table);
197+
m_table.done();
198+
}));
216199
}
217200

218-
void StreamableExecutor::stop()
201+
StreamableExecutor::~StreamableExecutor()
219202
{
220-
if (m_thread)
203+
if (!m_executed)
221204
{
222205
m_table.disable();
223206
while (PyArrayObject* arr = m_table.fetchArray())
224207
Py_XDECREF(arr);
225-
done();
226208
}
227-
}
228-
229-
void StreamableExecutor::done()
230-
{
231209
Py_BEGIN_ALLOW_THREADS
232210
m_thread->join();
233211
Py_END_ALLOW_THREADS
234-
m_thread.reset();
235-
m_executed = true;
212+
}
213+
214+
PyArrayObject *StreamableExecutor::executeNext()
215+
{
216+
PyArrayObject* arr = nullptr;
217+
if (!m_executed)
218+
{
219+
arr = m_table.fetchArray();
220+
if (arr == nullptr)
221+
m_executed = true;
222+
}
223+
return arr;
236224
}
237225

238226
} // namespace python

pdal/StreamableExecutor.hpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,9 @@ class StreamableExecutor : public PipelineExecutor
8484
int prefetch);
8585
~StreamableExecutor();
8686

87-
std::string getSchema() const;
8887
PyArrayObject* executeNext();
89-
void stop();
9088

9189
private:
92-
void done();
9390
ConstPointTableRef pointTable() const { return m_table; }
9491

9592
PythonPointTable m_table;

pdal/libpdalpython.pyx

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ from types import SimpleNamespace
77
cimport cython
88
from cython.operator cimport dereference as deref
99
from cpython.ref cimport Py_DECREF
10-
from libcpp cimport bool
1110
from libcpp.memory cimport make_shared, shared_ptr, unique_ptr
1211
from libcpp.set cimport set as cpp_set
1312
from libcpp.string cimport string
@@ -93,7 +92,6 @@ cdef extern from "PyPipeline.hpp" namespace "pdal::python":
9392
cdef cppclass PipelineExecutor:
9493
PipelineExecutor(string, vector[shared_ptr[Array]], int) except +
9594
int execute() except +
96-
bool executed() except +
9795
string getPipeline() except +
9896
string getMetadata() except +
9997
string getSchema() except +
@@ -105,7 +103,6 @@ cdef extern from "StreamableExecutor.hpp" namespace "pdal::python":
105103
cdef cppclass StreamableExecutor(PipelineExecutor):
106104
StreamableExecutor(string, vector[shared_ptr[Array]], int, int, int) except +
107105
np.PyArrayObject* executeNext() except +
108-
void stop() except +
109106

110107

111108
cdef extern from "PyArray.hpp" namespace "pdal::python":
@@ -222,25 +219,20 @@ cdef class Pipeline(PipelineResultsMixin):
222219
cdef class PipelineIterator(PipelineResultsMixin):
223220
cdef unique_ptr[StreamableExecutor] _executor
224221

225-
cdef set_executor(self, StreamableExecutor* executor):
226-
self._executor.reset(executor)
227-
228222
def __iter__(self):
229223
return self
230224

231225
def __next__(self):
232-
cdef StreamableExecutor* executor = self._executor.get()
233-
if executor.executed():
234-
raise StopIteration
235-
236-
arr_ptr = executor.executeNext()
226+
arr_ptr = self._executor.get().executeNext()
237227
if arr_ptr is NULL:
238-
executor.stop()
239228
raise StopIteration
240229

241230
arr = <object> arr_ptr
242231
Py_DECREF(arr)
243232
return arr
244233

234+
cdef set_executor(self, StreamableExecutor* executor):
235+
self._executor.reset(executor)
236+
245237
cdef PipelineExecutor* _get_executor(self) except NULL:
246238
return self._executor.get()

0 commit comments

Comments
 (0)