Skip to content

Commit b2c0c7e

Browse files
authored
Merge pull request #94 from TileDB-Inc/threadstream
Pipeline iterator - parallelized version
2 parents ccb64ec + ce36eef commit b2c0c7e

File tree

11 files changed

+752
-111
lines changed

11 files changed

+752
-111
lines changed

README.rst

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,31 @@ PDAL and Python:
187187
).pipeline(clamped)
188188
print(pipeline.execute()) # 387 points
189189
190+
Executing Streamable Pipelines
191+
................................................................................
192+
Streamable pipelines (pipelines that consist exclusively of streamable PDAL
193+
stages) can be executed in streaming mode via ``Pipeline.iterator()``. This
194+
returns an iterator object that yields Numpy arrays of up to ``chunk_size`` size
195+
(default=10000) at a time.
196+
197+
.. code-block:: python
198+
199+
import pdal
200+
pipeline = pdal.Reader("test/data/autzen-utm.las") | pdal.Filter.range(limits="Intensity[80:120)")
201+
for array in pipeline.iterator(chunk_size=500):
202+
print(len(array))
203+
# or to concatenate all arrays into one
204+
# full_array = np.concatenate(list(pipeline))
205+
206+
``Pipeline.iterator()`` also takes an optional ``prefetch`` parameter (default=0)
207+
to allow prefetching up to to this number of arrays in parallel and buffering
208+
them until they are yielded to the caller.
209+
210+
If you just want to execute a streamable pipeline in streaming mode and don't
211+
need to access the data points (typically when the pipeline has Writer stage(s)),
212+
you can use the ``Pipeline.execute_streaming(chunk_size)`` method instead. This
213+
is functionally equivalent to ``sum(map(len, pipeline.iterator(chunk_size)))``
214+
but more efficient as it avoids allocating and filling any arrays in memory.
190215

191216
Accessing Mesh Data
192217
................................................................................
@@ -236,17 +261,14 @@ USE-CASE : Take a LiDAR map, create a mesh from the ground points, split into ti
236261
.. code-block:: python
237262
238263
import pdal
239-
import json
240264
import psycopg2
241265
import io
242266
243-
pipe = [
244-
'.../python/test/data/1.2-with-color.las',
245-
{"type": "filters.splitter", "length": 1000},
246-
{"type": "filters.delaunay"}
247-
]
248-
249-
pl = pdal.Pipeline(json.dumps(pipe))
267+
pl = (
268+
pdal.Reader(".../python/test/data/1.2-with-color.las")
269+
| pdal.Filter.splitter(length=1000)
270+
| pdal.Filter.delaunay()
271+
)
250272
pl.execute()
251273
252274
conn = psycopg(%CONNNECTION_STRING%)

pdal/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
set(EXTENSION_SRC PyArray.cpp PyPipeline.cpp)
1+
set(EXTENSION_SRC PyArray.cpp PyPipeline.cpp StreamableExecutor.cpp)
22

33
set(extension "libpdalpython")
44
add_cython_target(${extension} "libpdalpython.pyx" CXX PY3)

pdal/PyPipeline.cpp

Lines changed: 119 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,100 @@
3939
#include <dlfcn.h>
4040
#endif
4141

42-
#include <Python.h>
43-
44-
#include <pdal/Stage.hpp>
45-
#include <pdal/pdal_features.hpp>
46-
4742
namespace pdal
4843
{
4944
namespace python
5045
{
5146

5247

53-
void readPipeline(PipelineExecutor* executor, std::string json)
48+
void CountPointTable::reset()
49+
{
50+
for (PointId idx = 0; idx < numPoints(); idx++)
51+
if (!skip(idx))
52+
m_count++;
53+
FixedPointTable::reset();
54+
}
55+
56+
57+
PipelineExecutor::PipelineExecutor(
58+
std::string const& json, std::vector<std::shared_ptr<Array>> arrays, int level)
59+
{
60+
if (level < 0 || level > 8)
61+
throw pdal_error("log level must be between 0 and 8!");
62+
63+
LogPtr log(Log::makeLog("pypipeline", &m_logStream));
64+
log->setLevel(static_cast<pdal::LogLevel>(level));
65+
m_manager.setLog(log);
66+
67+
std::stringstream strm;
68+
strm << json;
69+
m_manager.readPipeline(strm);
70+
71+
addArrayReaders(arrays);
72+
}
73+
74+
75+
point_count_t PipelineExecutor::execute()
76+
{
77+
point_count_t count = m_manager.execute();
78+
m_executed = true;
79+
return count;
80+
}
81+
82+
83+
point_count_t PipelineExecutor::executeStream(point_count_t streamLimit)
84+
{
85+
CountPointTable table(streamLimit);
86+
m_manager.executeStream(table);
87+
m_executed = true;
88+
return table.count();
89+
}
90+
91+
const PointViewSet& PipelineExecutor::views() const
92+
{
93+
if (!m_executed)
94+
throw pdal_error("Pipeline has not been executed!");
95+
96+
return m_manager.views();
97+
}
98+
99+
100+
std::string PipelineExecutor::getPipeline() const
54101
{
55-
std::stringstream strm(json);
56-
executor->getManager().readPipeline(strm);
102+
if (!m_executed)
103+
throw pdal_error("Pipeline has not been executed!");
104+
105+
std::stringstream strm;
106+
pdal::PipelineWriter::writePipeline(m_manager.getStage(), strm);
107+
return strm.str();
57108
}
58109

59110

60-
void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Array>> arrays)
111+
std::string PipelineExecutor::getMetadata() const
112+
{
113+
if (!m_executed)
114+
throw pdal_error("Pipeline has not been executed!");
115+
116+
std::stringstream strm;
117+
MetadataNode root = m_manager.getMetadata().clone("metadata");
118+
pdal::Utils::toJSON(root, strm);
119+
return strm.str();
120+
}
121+
122+
123+
std::string PipelineExecutor::getSchema() const
124+
{
125+
if (!m_executed)
126+
throw pdal_error("Pipeline has not been executed!");
127+
128+
std::stringstream strm;
129+
MetadataNode root = pointTable().layout()->toMetadata().clone("schema");
130+
pdal::Utils::toJSON(root, strm);
131+
return strm.str();
132+
}
133+
134+
135+
void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> arrays)
61136
{
62137
// Make the symbols in pdal_base global so that they're accessible
63138
// to PDAL plugins. Python dlopen's this extension with RTLD_LOCAL,
@@ -72,8 +147,7 @@ void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Arr
72147
if (arrays.empty())
73148
return;
74149

75-
PipelineManager& manager = executor->getManager();
76-
std::vector<Stage *> roots = manager.roots();
150+
std::vector<Stage *> roots = m_manager.roots();
77151
if (roots.size() != 1)
78152
throw pdal_error("Filter pipeline must contain a single root stage.");
79153

@@ -88,7 +162,7 @@ void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Arr
88162
MemoryViewReader::Order::ColumnMajor);
89163
options.add("shape", MemoryViewReader::Shape(array->shape()));
90164

91-
Stage& s = manager.makeReader("", "readers.memoryview", options);
165+
Stage& s = m_manager.makeReader("", "readers.memoryview", options);
92166
MemoryViewReader& r = dynamic_cast<MemoryViewReader &>(s);
93167
for (auto f : array->fields())
94168
r.pushField(f);
@@ -108,11 +182,11 @@ void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Arr
108182
roots[0]->setInput(r);
109183
}
110184

111-
manager.validateStageOptions();
185+
m_manager.validateStageOptions();
112186
}
113187

114188

115-
inline PyObject* buildNumpyDescription(PointViewPtr view)
189+
PyObject* buildNumpyDescriptor(PointLayoutPtr layout)
116190
{
117191
// Build up a numpy dtype dictionary
118192
//
@@ -123,31 +197,42 @@ inline PyObject* buildNumpyDescription(PointViewPtr view)
123197
// 'Classification', 'ScanAngleRank', 'UserData',
124198
// 'PointSourceId', 'GpsTime', 'Red', 'Green', 'Blue']}
125199
//
126-
Dimension::IdList dims = view->dims();
200+
201+
// Ensure that the dimensions are sorted by offset
202+
// Is there a better way? Can they be sorted by offset already?
203+
auto sortByOffset = [layout](Dimension::Id id1, Dimension::Id id2) -> bool
204+
{
205+
return layout->dimOffset(id1) < layout->dimOffset(id2);
206+
};
207+
auto dims = layout->dims();
208+
std::sort(dims.begin(), dims.end(), sortByOffset);
209+
127210
PyObject* names = PyList_New(dims.size());
128211
PyObject* formats = PyList_New(dims.size());
129212
for (size_t i = 0; i < dims.size(); ++i)
130213
{
131214
Dimension::Id id = dims[i];
132-
std::string name = view->dimName(id);
133-
npy_intp stride = view->dimSize(id);
134-
135-
std::string kind;
136-
Dimension::BaseType b = Dimension::base(view->dimType(id));
137-
if (b == Dimension::BaseType::Unsigned)
138-
kind = "u";
139-
else if (b == Dimension::BaseType::Signed)
140-
kind = "i";
141-
else if (b == Dimension::BaseType::Floating)
142-
kind = "f";
143-
else
144-
throw pdal_error("Unable to map kind '" + kind +
145-
"' to PDAL dimension type");
146-
147-
std::stringstream oss;
148-
oss << kind << stride;
215+
auto name = layout->dimName(id);
149216
PyList_SetItem(names, i, PyUnicode_FromString(name.c_str()));
150-
PyList_SetItem(formats, i, PyUnicode_FromString(oss.str().c_str()));
217+
218+
std::stringstream format;
219+
switch (Dimension::base(layout->dimType(id)))
220+
{
221+
case Dimension::BaseType::Unsigned:
222+
format << 'u';
223+
break;
224+
case Dimension::BaseType::Signed:
225+
format << 'i';
226+
break;
227+
case Dimension::BaseType::Floating:
228+
format << 'f';
229+
break;
230+
default:
231+
throw pdal_error("Unable to map dimension '" + name + "' to Numpy");
232+
}
233+
format << layout->dimSize(id);
234+
PyList_SetItem(formats, i, PyUnicode_FromString(format.str().c_str()));
235+
151236
}
152237
PyObject* dtype_dict = PyDict_New();
153238
PyDict_SetItemString(dtype_dict, "names", names);
@@ -161,7 +246,7 @@ PyArrayObject* viewToNumpyArray(PointViewPtr view)
161246
if (_import_array() < 0)
162247
throw pdal_error("Could not import numpy.core.multiarray.");
163248

164-
PyObject* dtype_dict = buildNumpyDescription(view);
249+
PyObject* dtype_dict = buildNumpyDescriptor(view->layout());
165250
PyArray_Descr *dtype = nullptr;
166251
if (PyArray_DescrConverter(dtype_dict, &dtype) == NPY_FAIL)
167252
throw pdal_error("Unable to build numpy dtype");

pdal/PyPipeline.hpp

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,58 @@
3434

3535
#pragma once
3636

37-
#include <pdal/PipelineExecutor.hpp>
38-
37+
#include <pdal/PipelineManager.hpp>
3938
#include <numpy/arrayobject.h>
4039

4140
namespace pdal
4241
{
4342
namespace python
4443
{
4544

46-
class Array;
47-
48-
void readPipeline(PipelineExecutor* executor, std::string json);
49-
void addArrayReaders(PipelineExecutor* executor, std::vector<std::shared_ptr<Array>> arrays);
45+
PyObject* buildNumpyDescriptor(PointLayoutPtr layout);
5046
PyArrayObject* viewToNumpyArray(PointViewPtr view);
5147
PyArrayObject* meshToNumpyArray(const TriangularMesh* mesh);
5248

49+
class Array;
50+
51+
class PDAL_DLL PipelineExecutor {
52+
public:
53+
PipelineExecutor(std::string const& json, std::vector<std::shared_ptr<Array>> arrays, int level);
54+
virtual ~PipelineExecutor() = default;
55+
56+
point_count_t execute();
57+
point_count_t executeStream(point_count_t streamLimit);
58+
59+
const PointViewSet& views() const;
60+
std::string getPipeline() const;
61+
std::string getMetadata() const;
62+
std::string getSchema() const;
63+
std::string getLog() const { return m_logStream.str(); }
64+
65+
protected:
66+
virtual ConstPointTableRef pointTable() const { return m_manager.pointTable(); }
67+
68+
pdal::PipelineManager m_manager;
69+
bool m_executed = false;
70+
71+
private:
72+
void addArrayReaders(std::vector<std::shared_ptr<Array>> arrays);
73+
74+
std::stringstream m_logStream;
75+
};
76+
77+
class CountPointTable : public FixedPointTable
78+
{
79+
public:
80+
CountPointTable(point_count_t capacity) : FixedPointTable(capacity), m_count(0) {}
81+
point_count_t count() const { return m_count; }
82+
83+
protected:
84+
virtual void reset();
85+
86+
private:
87+
point_count_t m_count;
88+
};
89+
5390
} // namespace python
5491
} // namespace pdal

0 commit comments

Comments
 (0)