Skip to content

Commit 139e00a

Browse files
committed
Replace pdal::PipelineExecutor with a new pdal::python::PipelineExecutor
1 parent cf00bce commit 139e00a

File tree

5 files changed

+135
-83
lines changed

5 files changed

+135
-83
lines changed

pdal/PyPipeline.cpp

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,82 @@
3939
#include <dlfcn.h>
4040
#endif
4141

42-
#include <Python.h>
43-
44-
#include <pdal/Stage.hpp>
45-
#include <pdal/pdal_features.hpp>
46-
4742
namespace pdal
4843
{
4944
namespace python
5045
{
5146

47+
PipelineExecutor::PipelineExecutor(
48+
std::string const& json, std::vector<std::shared_ptr<Array>> arrays, int level)
49+
{
50+
if (level < 0 || level > 8)
51+
throw pdal_error("log level must be between 0 and 8!");
52+
53+
LogPtr log(Log::makeLog("pypipeline", &m_logStream));
54+
log->setLevel(static_cast<pdal::LogLevel>(level));
55+
m_manager.setLog(log);
56+
57+
std::stringstream strm;
58+
strm << json;
59+
m_manager.readPipeline(strm);
60+
61+
addArrayReaders(arrays);
62+
}
63+
64+
65+
point_count_t PipelineExecutor::execute()
66+
{
67+
point_count_t count = m_manager.execute();
68+
m_executed = true;
69+
return count;
70+
}
71+
72+
73+
const PointViewSet& PipelineExecutor::views() const
74+
{
75+
if (!m_executed)
76+
throw pdal_error("Pipeline has not been executed!");
77+
78+
return m_manager.views();
79+
}
80+
81+
82+
std::string PipelineExecutor::getPipeline() const
83+
{
84+
if (!m_executed)
85+
throw pdal_error("Pipeline has not been executed!");
86+
87+
std::stringstream strm;
88+
pdal::PipelineWriter::writePipeline(m_manager.getStage(), strm);
89+
return strm.str();
90+
}
91+
92+
93+
std::string PipelineExecutor::getMetadata() const
94+
{
95+
if (!m_executed)
96+
throw pdal_error("Pipeline has not been executed!");
97+
98+
std::stringstream strm;
99+
MetadataNode root = m_manager.getMetadata().clone("metadata");
100+
pdal::Utils::toJSON(root, strm);
101+
return strm.str();
102+
}
103+
104+
105+
std::string PipelineExecutor::getSchema() const
106+
{
107+
if (!m_executed)
108+
throw pdal_error("Pipeline has not been executed!");
109+
110+
std::stringstream strm;
111+
MetadataNode root = pointTable().layout()->toMetadata().clone("schema");
112+
pdal::Utils::toJSON(root, strm);
113+
return strm.str();
114+
}
115+
52116

53-
void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Array>> arrays)
117+
void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> arrays)
54118
{
55119
// Make the symbols in pdal_base global so that they're accessible
56120
// to PDAL plugins. Python dlopen's this extension with RTLD_LOCAL,
@@ -65,8 +129,7 @@ void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Arr
65129
if (arrays.empty())
66130
return;
67131

68-
PipelineManager& manager = executor->getManager();
69-
std::vector<Stage *> roots = manager.roots();
132+
std::vector<Stage *> roots = m_manager.roots();
70133
if (roots.size() != 1)
71134
throw pdal_error("Filter pipeline must contain a single root stage.");
72135

@@ -81,7 +144,7 @@ void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Arr
81144
MemoryViewReader::Order::ColumnMajor);
82145
options.add("shape", MemoryViewReader::Shape(array->shape()));
83146

84-
Stage& s = manager.makeReader("", "readers.memoryview", options);
147+
Stage& s = m_manager.makeReader("", "readers.memoryview", options);
85148
MemoryViewReader& r = dynamic_cast<MemoryViewReader &>(s);
86149
for (auto f : array->fields())
87150
r.pushField(f);
@@ -101,7 +164,7 @@ void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Arr
101164
roots[0]->setInput(r);
102165
}
103166

104-
manager.validateStageOptions();
167+
m_manager.validateStageOptions();
105168
}
106169

107170

pdal/PyPipeline.hpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,45 @@
3434

3535
#pragma once
3636

37-
#include <pdal/PipelineExecutor.hpp>
38-
37+
#include <pdal/PipelineManager.hpp>
3938
#include <numpy/arrayobject.h>
4039

4140
namespace pdal
4241
{
4342
namespace python
4443
{
4544

46-
class Array;
47-
4845
PyObject* buildNumpyDescriptor(PointLayoutPtr layout);
49-
void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Array>> arrays);
5046
PyArrayObject* viewToNumpyArray(PointViewPtr view);
5147
PyArrayObject* meshToNumpyArray(const TriangularMesh* mesh);
5248

49+
class Array;
50+
51+
class PDAL_DLL PipelineExecutor {
52+
public:
53+
PipelineExecutor(std::string const& json, std::vector<std::shared_ptr<Array>> arrays, int level);
54+
virtual ~PipelineExecutor() = default;
55+
56+
point_count_t execute();
57+
58+
bool executed() const { return m_executed; }
59+
const PointViewSet& views() const;
60+
std::string getPipeline() const;
61+
std::string getMetadata() const;
62+
std::string getSchema() const;
63+
std::string getLog() const { return m_logStream.str(); }
64+
65+
protected:
66+
virtual ConstPointTableRef pointTable() const { return m_manager.pointTable(); }
67+
68+
pdal::PipelineManager m_manager;
69+
bool m_executed = false;
70+
71+
private:
72+
void addArrayReaders(std::vector<std::shared_ptr<Array>> arrays);
73+
74+
std::stringstream m_logStream;
75+
};
76+
5377
} // namespace python
5478
} // namespace pdal

pdal/StreamableExecutor.cpp

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,17 @@ char *PythonPointTable::getPoint(PointId idx)
180180

181181
// StreamableExecutor
182182

183-
StreamableExecutor::StreamableExecutor(std::string const& json, point_count_t chunkSize, int prefetch) :
184-
PipelineExecutor(json), m_table(chunkSize, prefetch)
185-
{}
183+
StreamableExecutor::StreamableExecutor(std::string const& json,
184+
std::vector<std::shared_ptr<Array>> arrays,
185+
int level,
186+
point_count_t chunkSize,
187+
int prefetch)
188+
: PipelineExecutor(json, arrays, level)
189+
, m_table(chunkSize, prefetch)
190+
{
191+
if (!m_manager.pipelineStreamable())
192+
throw pdal_error("Pipeline is not streamable");
193+
}
186194

187195
StreamableExecutor::~StreamableExecutor()
188196
{
@@ -227,16 +235,5 @@ void StreamableExecutor::done()
227235
m_executed = true;
228236
}
229237

230-
std::string StreamableExecutor::getSchema() const
231-
{
232-
if (!m_executed)
233-
throw pdal_error("Pipeline has not been executed!");
234-
235-
std::stringstream strm;
236-
MetadataNode root = m_table.layout()->toMetadata().clone("schema");
237-
pdal::Utils::toJSON(root, strm);
238-
return strm.str();
239-
}
240-
241238
} // namespace python
242239
} // namespace pdal

pdal/StreamableExecutor.hpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,10 @@
3434

3535
#pragma once
3636

37-
#include <mutex>
3837
#include <condition_variable>
39-
#include <queue>
40-
#include <memory>
4138
#include <thread>
42-
#include <sstream>
4339

44-
#include <numpy/arrayobject.h>
45-
46-
#include <pdal/PipelineExecutor.hpp>
47-
#include <pdal/PipelineManager.hpp>
48-
#include <pdal/PointTable.hpp>
40+
#include "PyPipeline.hpp"
4941

5042
namespace pdal
5143
{
@@ -85,7 +77,11 @@ class PythonPointTable : public StreamPointTable
8577
class StreamableExecutor : public PipelineExecutor
8678
{
8779
public:
88-
StreamableExecutor(std::string const& json, point_count_t chunkSize, int prefetch);
80+
StreamableExecutor(std::string const& json,
81+
std::vector<std::shared_ptr<Array>> arrays,
82+
int level,
83+
point_count_t chunkSize,
84+
int prefetch);
8985
~StreamableExecutor();
9086

9187
std::string getSchema() const;
@@ -94,6 +90,7 @@ class StreamableExecutor : public PipelineExecutor
9490

9591
private:
9692
void done();
93+
ConstPointTableRef pointTable() const { return m_table; }
9794

9895
PythonPointTable m_table;
9996
std::unique_ptr<std::thread> m_thread;

pdal/libpdalpython.pyx

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -86,29 +86,24 @@ cdef extern from "pdal/PointView.hpp" namespace "pdal":
8686
ctypedef cpp_set[PointViewPtr] PointViewSet
8787

8888

89-
cdef extern from "pdal/PipelineManager.hpp" namespace "pdal":
90-
cdef cppclass PipelineManager:
91-
const PointViewSet& views() const
92-
bool pipelineStreamable() const
93-
89+
cdef extern from "PyPipeline.hpp" namespace "pdal::python":
90+
np.PyArrayObject* viewToNumpyArray(PointViewPtr) except +
91+
np.PyArrayObject* meshToNumpyArray(const TriangularMesh*) except +
9492

95-
cdef extern from "pdal/PipelineExecutor.hpp" namespace "pdal":
9693
cdef cppclass PipelineExecutor:
97-
PipelineExecutor(string) except +
98-
const PipelineManager& getManagerConst() except +
99-
bool executed() except +
100-
void read() except +
94+
PipelineExecutor(string, vector[shared_ptr[Array]], int) except +
10195
int execute() except +
96+
bool executed() except +
10297
string getPipeline() except +
10398
string getMetadata() except +
10499
string getSchema() except +
105100
string getLog() except +
106-
void setLogLevel(int) except +
101+
const PointViewSet& views() except +
107102

108103

109104
cdef extern from "StreamableExecutor.hpp" namespace "pdal::python":
110105
cdef cppclass StreamableExecutor(PipelineExecutor):
111-
StreamableExecutor(string, int, int) except +
106+
StreamableExecutor(string, vector[shared_ptr[Array]], int, int, int) except +
112107
np.PyArrayObject* executeNext() except +
113108
void stop() except +
114109

@@ -118,12 +113,6 @@ cdef extern from "PyArray.hpp" namespace "pdal::python":
118113
Array(np.PyArrayObject*) except +
119114

120115

121-
cdef extern from "PyPipeline.hpp" namespace "pdal::python":
122-
void addArrayReaders(PipelineExecutor*, vector[shared_ptr[Array]]) except +
123-
np.PyArrayObject* viewToNumpyArray(PointViewPtr) except +
124-
np.PyArrayObject* meshToNumpyArray(const TriangularMesh*) except +
125-
126-
127116
@cython.internal
128117
cdef class PipelineResultsMixin:
129118
@property
@@ -179,21 +168,17 @@ cdef class Pipeline(PipelineResultsMixin):
179168
@property
180169
def arrays(self):
181170
cdef PipelineExecutor* executor = self._get_executor()
182-
if not executor.executed():
183-
raise RuntimeError("Pipeline has not been executed!")
184171
output = []
185-
for view in executor.getManagerConst().views():
172+
for view in executor.views():
186173
output.append(<object>viewToNumpyArray(view))
187174
Py_DECREF(output[-1])
188175
return output
189176

190177
@property
191178
def meshes(self):
192179
cdef PipelineExecutor* executor = self._get_executor()
193-
if not executor.executed():
194-
raise RuntimeError("Pipeline has not been executed!")
195180
output = []
196-
for view in executor.getManagerConst().views():
181+
for view in executor.views():
197182
output.append(<object>meshToNumpyArray(deref(view).mesh()))
198183
Py_DECREF(output[-1])
199184
return output
@@ -204,15 +189,10 @@ cdef class Pipeline(PipelineResultsMixin):
204189
return self._get_executor().execute()
205190

206191
def iterator(self, int chunk_size=10000, int prefetch = 0):
207-
cdef StreamableExecutor* executor = new StreamableExecutor(
208-
self._get_json(), chunk_size, prefetch
209-
)
210-
self._configure_executor(executor)
211-
if not executor.getManagerConst().pipelineStreamable():
212-
raise RuntimeError("Pipeline is not streamable")
213-
214192
it = PipelineIterator()
215-
it.set_executor(executor)
193+
it.set_executor(new StreamableExecutor(
194+
self._get_json(), self._inputs, self._loglevel, chunk_size, prefetch
195+
))
216196
return it
217197

218198
#========= non-public properties & methods ===========================================
@@ -232,16 +212,11 @@ cdef class Pipeline(PipelineResultsMixin):
232212

233213
cdef PipelineExecutor* _get_executor(self) except NULL:
234214
if not self._executor:
235-
executor = new PipelineExecutor(self._get_json())
236-
self._configure_executor(executor)
237-
self._executor.reset(executor)
215+
self._executor.reset(new PipelineExecutor(
216+
self._get_json(), self._inputs, self._loglevel)
217+
)
238218
return self._executor.get()
239219

240-
cdef _configure_executor(self, PipelineExecutor* executor):
241-
executor.setLogLevel(self._loglevel)
242-
executor.read()
243-
addArrayReaders(executor, self._inputs)
244-
245220

246221
@cython.internal
247222
cdef class PipelineIterator(PipelineResultsMixin):
@@ -250,10 +225,6 @@ cdef class PipelineIterator(PipelineResultsMixin):
250225
cdef set_executor(self, StreamableExecutor* executor):
251226
self._executor.reset(executor)
252227

253-
@property
254-
def schema(self):
255-
return json.loads(self._executor.get().getSchema())
256-
257228
def __iter__(self):
258229
return self
259230

0 commit comments

Comments
 (0)