Skip to content

Commit 6c5271e

Browse files
committed
Inherit StreamableExecutor from PipelineExecutor
1 parent 6f017ca commit 6c5271e

File tree

8 files changed

+137
-163
lines changed

8 files changed

+137
-163
lines changed

pdal/PyPipeline.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,6 @@ namespace python
5050
{
5151

5252

53-
void readPipeline(PipelineExecutor* executor, std::string json)
54-
{
55-
std::stringstream strm(json);
56-
executor->getManager().readPipeline(strm);
57-
}
58-
59-
6053
void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Array>> arrays)
6154
{
6255
// Make the symbols in pdal_base global so that they're accessible

pdal/PyPipeline.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ namespace python
4545

4646
class Array;
4747

48-
void readPipeline(PipelineExecutor* executor, std::string json);
4948
void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Array>> arrays);
5049
PyArrayObject* viewToNumpyArray(PointViewPtr view);
5150
PyArrayObject* meshToNumpyArray(const TriangularMesh* mesh);

pdal/StreamableExecutor.cpp

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,9 @@ char *PythonPointTable::getPoint(PointId idx)
226226

227227
// StreamableExecutor
228228

229-
StreamableExecutor::StreamableExecutor(const char *json, int chunkSize, int prefetch) :
230-
m_json(json), m_table(chunkSize, prefetch), m_manager(chunkSize),
231-
m_log(Log::makeLog("pdal_python", &m_logStream))
232-
{
233-
m_manager.setLog(m_log);
234-
}
229+
StreamableExecutor::StreamableExecutor(std::string const& json, point_count_t chunkSize, int prefetch) :
230+
PipelineExecutor(json), m_table(chunkSize, prefetch)
231+
{}
235232

236233
StreamableExecutor::~StreamableExecutor()
237234
{
@@ -258,51 +255,10 @@ PyArrayObject *StreamableExecutor::executeNext()
258255
m_thread->join();
259256
Py_END_ALLOW_THREADS
260257
m_thread.reset();
258+
m_executed = true;
261259
}
262260
return arr;
263261
}
264262

265-
std::string StreamableExecutor::getMetadata() const
266-
{
267-
return pdal::Utils::toJSON(m_manager.getMetadata().clone("metadata"));
268-
}
269-
270-
std::string StreamableExecutor::getPipeline() const
271-
{
272-
std::stringstream strm;
273-
pdal::PipelineWriter::writePipeline(m_manager.getStage(), strm);
274-
return strm.str();
275-
}
276-
277-
int StreamableExecutor::getLogLevel() const
278-
{
279-
return static_cast<int>(m_log->getLevel());
280-
}
281-
282-
std::string StreamableExecutor::getLog() const
283-
{
284-
return m_logStream.str();
285-
}
286-
287-
// Returns the active log level.
288-
int StreamableExecutor::setLogLevel(int level)
289-
{
290-
if (level < 0 || level > 8)
291-
return getLogLevel();
292-
293-
m_log->setLevel(static_cast<pdal::LogLevel>(level));
294-
return level;
295-
}
296-
297-
bool StreamableExecutor::validate()
298-
{
299-
std::stringstream strm;
300-
strm << m_json;
301-
m_manager.readPipeline(strm);
302-
m_manager.prepare();
303-
304-
return true;
305-
}
306-
307263
} // namespace python
308264
} // namespace pdal

pdal/StreamableExecutor.hpp

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
#include <numpy/arrayobject.h>
4545

46+
#include <pdal/PipelineExecutor.hpp>
4647
#include <pdal/PipelineManager.hpp>
4748
#include <pdal/PointTable.hpp>
4849

@@ -83,28 +84,17 @@ class PythonPointTable : public StreamPointTable
8384
std::queue<PyArrayObject *> m_arrays;
8485
};
8586

86-
class StreamableExecutor
87+
class StreamableExecutor : public PipelineExecutor
8788
{
8889
public:
89-
StreamableExecutor(const char *json, int chunkSize, int prefetch);
90+
StreamableExecutor(std::string const& json, point_count_t chunkSize, int prefetch);
9091
~StreamableExecutor();
9192

92-
PyArrayObject *executeNext();
93-
std::string getMetadata() const;
94-
std::string getPipeline() const;
95-
std::string getSchema() const;
96-
std::string getLog() const;
97-
int getLogLevel() const;
98-
int setLogLevel(int level);
99-
bool validate();
93+
PyArrayObject* executeNext();
10094

10195
private:
102-
std::string m_json;
10396
int m_prefetch;
10497
PythonPointTable m_table;
105-
PipelineManager m_manager;
106-
std::stringstream m_logStream;
107-
LogPtr m_log;
10898
std::unique_ptr<std::thread> m_thread;
10999
};
110100

pdal/__init__.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,8 @@
11
__version__ = "2.4.2"
2-
__all__ = [
3-
"Pipeline",
4-
"PipelineIterator",
5-
"Stage",
6-
"Reader",
7-
"Filter",
8-
"Writer",
9-
"dimensions",
10-
"info",
11-
]
2+
__all__ = ["Pipeline", "Stage", "Reader", "Filter", "Writer", "dimensions", "info"]
123

134
from . import libpdalpython
145
from .drivers import inject_pdal_drivers
15-
from .libpdalpython import PipelineIterator
166
from .pipeline import Filter, Pipeline, Reader, Stage, Writer
177

188
inject_pdal_drivers()

pdal/libpdalpython.pyx

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ cdef extern from "pdal/PipelineExecutor.hpp" namespace "pdal":
9494
PipelineExecutor(string) except +
9595
const PipelineManager& getManagerConst() except +
9696
bool executed() except +
97+
void read() except +
9798
int execute() except +
9899
string getPipeline() except +
99100
string getMetadata() except +
@@ -102,63 +103,29 @@ cdef extern from "pdal/PipelineExecutor.hpp" namespace "pdal":
102103
void setLogLevel(int) except +
103104

104105

106+
cdef extern from "StreamableExecutor.hpp" namespace "pdal::python":
107+
cdef cppclass StreamableExecutor(PipelineExecutor):
108+
StreamableExecutor(string, int, int) except +
109+
np.PyArrayObject* executeNext() except +
110+
111+
105112
cdef extern from "PyArray.hpp" namespace "pdal::python":
106113
cdef cppclass Array:
107114
Array(np.PyArrayObject*) except +
108115

109116

110117
cdef extern from "PyPipeline.hpp" namespace "pdal::python":
111-
void readPipeline(PipelineExecutor*, string) except +
112118
void addArrayReaders(PipelineExecutor*, vector[shared_ptr[Array]]) except +
113119
np.PyArrayObject* viewToNumpyArray(PointViewPtr) except +
114120
np.PyArrayObject* meshToNumpyArray(const TriangularMesh*) except +
115121

116-
cdef extern from "StreamableExecutor.hpp" namespace "pdal::python":
117-
cdef cppclass StreamableExecutor:
118-
StreamableExecutor(const char *, int, int) except +
119-
np.PyArrayObject *executeNext() except +
120-
bool validate() except +
121-
string getMetadata() except +
122-
string getPipeline() except +
123-
string getSchema() except +
124-
string getLog() except +
125-
int getLogLevel() except +
126-
int setLogLevel(int level) except +
127-
128-
129-
cdef class PipelineIterator:
130-
cdef int _chunk_size
131-
cdef StreamableExecutor *_executor;
132-
133-
def __cinit__(self, unicode json, list arrays=None, int chunk_size=10000, int prefetch=0):
134-
self._chunk_size
135-
self._executor = new StreamableExecutor(json.encode("UTF-8"), chunk_size, prefetch)
136-
# self._add_arrays(arrays)
137-
138-
property chunk_size:
139-
def __get__(self):
140-
return self._chunk_size
141-
142-
def validate(self):
143-
return self._executor.validate()
144-
145-
def __iter__(self):
146-
while True:
147-
arrPtr = self._executor.executeNext()
148-
if arrPtr is NULL:
149-
break
150-
arr = <object>arrPtr
151-
Py_DECREF(arr)
152-
yield arr
153-
154122

155123
cdef class Pipeline:
156-
cdef unique_ptr[PipelineExecutor] _executor
124+
cdef unique_ptr[StreamableExecutor] _executor
157125
cdef vector[shared_ptr[Array]] _inputs
158126
cdef int _loglevel
159-
160-
def execute(self):
161-
return self._get_executor().execute()
127+
cdef int _chunk_size
128+
cdef int _prefetch
162129

163130
#========= writeable properties to be set before execution ===========================
164131

@@ -182,6 +149,26 @@ cdef class Pipeline:
182149
self._loglevel = value
183150
self._del_executor()
184151

152+
@property
153+
def chunk_size(self):
154+
return self._chunk_size
155+
156+
@chunk_size.setter
157+
def chunk_size(self, value):
158+
assert value > 0
159+
self._chunk_size = value
160+
self._del_executor()
161+
162+
@property
163+
def prefetch(self):
164+
return self._prefetch
165+
166+
@prefetch.setter
167+
def prefetch(self, value):
168+
assert value >= 0
169+
self._prefetch = value
170+
self._del_executor()
171+
185172
#========= readable properties to be read after execution ============================
186173

187174
@property
@@ -222,6 +209,21 @@ cdef class Pipeline:
222209
Py_DECREF(output[-1])
223210
return output
224211

212+
#========= execution methods =========================================================
213+
214+
def execute(self):
215+
return self._get_executor().execute()
216+
217+
def __iter__(self):
218+
cdef StreamableExecutor* executor = self._get_executor()
219+
while True:
220+
arr_ptr = executor.executeNext()
221+
if arr_ptr is NULL:
222+
break
223+
arr = <object>arr_ptr
224+
Py_DECREF(arr)
225+
yield arr
226+
225227
#========= non-public properties & methods ===========================================
226228

227229
def _get_json(self):
@@ -237,12 +239,12 @@ cdef class Pipeline:
237239
def _del_executor(self):
238240
self._executor.reset()
239241

240-
cdef PipelineExecutor* _get_executor(self) except NULL:
242+
cdef StreamableExecutor* _get_executor(self) except NULL:
241243
if not self._executor:
242-
json = self._get_json()
243-
executor = new PipelineExecutor(json)
244+
executor = new StreamableExecutor(self._get_json(),
245+
self._chunk_size, self._prefetch)
244246
executor.setLogLevel(self._loglevel)
245-
readPipeline(executor, json)
247+
executor.read()
246248
addArrayReaders(executor, self._inputs)
247249
self._executor.reset(executor)
248250
return self._executor.get()

pdal/pipeline.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ def __init__(
2828
spec: Union[None, str, Sequence[Stage]] = None,
2929
arrays: Sequence[np.ndarray] = (),
3030
loglevel: int = logging.ERROR,
31+
chunk_size: int = 10000,
32+
prefetch: int = 0,
3133
):
3234
super().__init__()
3335
self._stages: List[Stage] = []
@@ -37,6 +39,8 @@ def __init__(
3739
self |= stage
3840
self.inputs = arrays
3941
self.loglevel = loglevel
42+
self.chunk_size = chunk_size
43+
self.prefetch = prefetch
4044

4145
@property
4246
def stages(self) -> List[Stage]:

0 commit comments

Comments
 (0)