Skip to content

Commit 6f017ca

Browse files
abellgithubgsakkis
authored andcommitted
Interface using numpy array as point table and streaming.
- Needs to handle skips - Need to check on error handling with premature termination, etc.
1 parent ccb64ec commit 6f017ca

File tree

7 files changed

+517
-2
lines changed

7 files changed

+517
-2
lines changed

pdal/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
set(EXTENSION_SRC PyArray.cpp PyPipeline.cpp)
1+
set(EXTENSION_SRC PyArray.cpp PyPipeline.cpp StreamableExecutor.cpp)
22

33
set(extension "libpdalpython")
44
add_cython_target(${extension} "libpdalpython.pyx" CXX PY3)

pdal/StreamableExecutor.cpp

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/******************************************************************************
2+
* Copyright (c) 2016, Howard Butler ([email protected])
3+
*
4+
* All rights reserved.
5+
*
6+
* Redistribution and use in source and binary forms, with or without
7+
* modification, are permitted provided that the following
8+
* conditions are met:
9+
*
10+
* * Redistributions of source code must retain the above copyright
11+
* notice, this list of conditions and the following disclaimer.
12+
* * Redistributions in binary form must reproduce the above copyright
13+
* notice, this list of conditions and the following disclaimer in
14+
* the documentation and/or other materials provided
15+
* with the distribution.
16+
* * Neither the name of Hobu, Inc. or Flaxen Geo Consulting nor the
17+
* names of its contributors may be used to endorse or promote
18+
* products derived from this software without specific prior
19+
* written permission.
20+
*
21+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24+
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25+
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26+
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27+
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
28+
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29+
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31+
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
32+
* OF SUCH DAMAGE.
33+
****************************************************************************/
34+
35+
#include "StreamableExecutor.hpp"
36+
37+
#include <Python.h>
38+
#include <numpy/arrayobject.h>
39+
40+
#include <pdal/Stage.hpp>
41+
#include <pdal/pdal_features.hpp>
42+
43+
namespace pdal
44+
{
45+
namespace python
46+
{
47+
48+
// PythonPointTable
49+
50+
PythonPointTable::PythonPointTable(point_count_t limit, int prefetch) :
51+
StreamPointTable(m_layout, limit), m_limit(limit), m_prefetch(prefetch),
52+
m_curArray(nullptr), m_dtype(nullptr)
53+
{}
54+
55+
PythonPointTable::~PythonPointTable()
56+
{
57+
py_destroy();
58+
}
59+
60+
void PythonPointTable::finalize()
61+
{
62+
BasePointTable::finalize();
63+
py_createDescriptor();
64+
m_curArray = py_createArray();
65+
}
66+
67+
void PythonPointTable::py_destroy()
68+
{
69+
auto gil = PyGILState_Ensure();
70+
71+
Py_XDECREF(m_dtype);
72+
Py_XDECREF(m_curArray);
73+
74+
PyGILState_Release(gil);
75+
}
76+
77+
PyArrayObject *PythonPointTable::py_createArray() const
78+
{
79+
auto gil = PyGILState_Ensure();
80+
81+
npy_intp size = (npy_intp)m_limit;
82+
Py_INCREF(m_dtype);
83+
PyArrayObject *arr = (PyArrayObject *)PyArray_NewFromDescr(&PyArray_Type, m_dtype,
84+
1, &size, 0, nullptr, NPY_ARRAY_CARRAY, nullptr);
85+
86+
PyGILState_Release(gil);
87+
return arr;
88+
}
89+
90+
void PythonPointTable::py_createDescriptor()
91+
{
92+
auto gil = PyGILState_Ensure();
93+
94+
if (_import_array() < 0)
95+
std::cerr << "Could not import array!\n";
96+
PyObject *dtype_dict = py_buildNumpyDescriptor();
97+
if (PyArray_DescrConverter(dtype_dict, &m_dtype) == NPY_FAIL)
98+
m_dtype = nullptr;
99+
Py_XDECREF(dtype_dict);
100+
101+
PyGILState_Release(gil);
102+
}
103+
104+
void PythonPointTable::py_resizeArray(int np)
105+
{
106+
npy_intp sizes[1];
107+
sizes[0] = np;
108+
PyArray_Dims dims{ sizes, 1 };
109+
110+
auto gil = PyGILState_Ensure();
111+
PyArray_Resize(m_curArray, &dims, true, NPY_CORDER);
112+
PyGILState_Release(gil);
113+
}
114+
115+
PyObject *PythonPointTable::py_buildNumpyDescriptor() const
116+
{
117+
// Build up a numpy dtype dictionary
118+
//
119+
// {'formats': ['f8', 'f8', 'f8', 'u2', 'u1', 'u1', 'u1', 'u1', 'u1',
120+
// 'f4', 'u1', 'u2', 'f8', 'u2', 'u2', 'u2'],
121+
// 'names': ['X', 'Y', 'Z', 'Intensity', 'ReturnNumber',
122+
// 'NumberOfReturns', 'ScanDirectionFlag', 'EdgeOfFlightLine',
123+
// 'Classification', 'ScanAngleRank', 'UserData',
124+
// 'PointSourceId', 'GpsTime', 'Red', 'Green', 'Blue']}
125+
//
126+
127+
DimTypeList dims = layout()->dimTypes();
128+
PyObject* names = PyList_New(dims.size());
129+
PyObject* formats = PyList_New(dims.size());
130+
for (size_t i = 0; i < dims.size(); ++i)
131+
{
132+
DimType& dt = dims[i];
133+
std::string name = m_layout.dimName(dt.m_id);
134+
npy_intp stride = Dimension::size(dt.m_type);
135+
136+
std::string kind;
137+
Dimension::BaseType b = Dimension::base(dt.m_type);
138+
if (b == Dimension::BaseType::Unsigned)
139+
kind = "u";
140+
else if (b == Dimension::BaseType::Signed)
141+
kind = "i";
142+
else if (b == Dimension::BaseType::Floating)
143+
kind = "f";
144+
else
145+
throw pdal_error("Unable to map kind '" + kind +
146+
"' to PDAL dimension type");
147+
148+
std::string type = kind + std::to_string(stride);
149+
PyList_SetItem(names, i, PyUnicode_FromString(name.c_str()));
150+
PyList_SetItem(formats, i, PyUnicode_FromString(type.c_str()));
151+
}
152+
153+
PyObject* dict = PyDict_New();
154+
PyDict_SetItemString(dict, "names", names);
155+
PyDict_SetItemString(dict, "formats", formats);
156+
return dict;
157+
}
158+
159+
void PythonPointTable::reset()
160+
{
161+
point_count_t np = numPoints();
162+
163+
// If this is the last chunk, the size might be less than what's expected, so
164+
// resize the array to match its true size.
165+
// ABELL - This isn't quite right. We want to know if there are any skips and deal with those
166+
// but I'm leaving that for the moment.
167+
if (np && np != m_limit)
168+
py_resizeArray(np);
169+
170+
// This will keep putting arrays on the list until done, whether or not the consumer
171+
// can handle them that fast. We can modify as appropriate to block if desired.
172+
std::unique_lock<std::mutex> l(m_mutex);
173+
{
174+
// It's possible that this is called with 0 points processed, in which case
175+
// we don't push the current array.
176+
if (np)
177+
{
178+
m_arrays.push(m_curArray);
179+
m_curArray = nullptr;
180+
}
181+
182+
bool done = np < m_limit;
183+
184+
// If we just pushed the last chunk, push a nullptr so that a reader knows.
185+
if (done)
186+
m_arrays.push(nullptr);
187+
m_producedCv.notify_one();
188+
189+
if (done)
190+
return;
191+
192+
while (m_arrays.size() > m_prefetch)
193+
m_consumedCv.wait(l);
194+
}
195+
196+
// Make a new array for data.
197+
m_curArray = py_createArray();
198+
}
199+
200+
PyArrayObject *PythonPointTable::fetchArray()
201+
{
202+
PyArrayObject *arr = nullptr;
203+
204+
// Lock scope.
205+
Py_BEGIN_ALLOW_THREADS
206+
{
207+
std::unique_lock<std::mutex> l(m_mutex);
208+
while (m_arrays.empty())
209+
m_producedCv.wait(l);
210+
211+
// Grab the array from the front of the list and notify that we did so.
212+
arr = m_arrays.front();
213+
m_arrays.pop();
214+
}
215+
Py_END_ALLOW_THREADS
216+
// Notify that we consumed an array.
217+
m_consumedCv.notify_one();
218+
return arr;
219+
}
220+
221+
char *PythonPointTable::getPoint(PointId idx)
222+
{
223+
return (char *)PyArray_GETPTR1(m_curArray, (npy_intp)idx);
224+
}
225+
226+
227+
// StreamableExecutor
228+
229+
StreamableExecutor::StreamableExecutor(const char *json, int chunkSize, int prefetch) :
230+
m_json(json), m_table(chunkSize, prefetch), m_manager(chunkSize),
231+
m_log(Log::makeLog("pdal_python", &m_logStream))
232+
{
233+
m_manager.setLog(m_log);
234+
}
235+
236+
StreamableExecutor::~StreamableExecutor()
237+
{
238+
//ABELL - Hmmm.
239+
if (m_thread)
240+
m_thread->join();
241+
}
242+
243+
PyArrayObject *StreamableExecutor::executeNext()
244+
{
245+
if (!m_thread)
246+
{
247+
m_thread.reset(new std::thread([this]()
248+
{
249+
m_manager.executeStream(m_table);
250+
}));
251+
}
252+
253+
// Blocks until something is ready.
254+
PyArrayObject *arr = m_table.fetchArray();
255+
if (arr == nullptr)
256+
{
257+
Py_BEGIN_ALLOW_THREADS
258+
m_thread->join();
259+
Py_END_ALLOW_THREADS
260+
m_thread.reset();
261+
}
262+
return arr;
263+
}
264+
265+
std::string StreamableExecutor::getMetadata() const
266+
{
267+
return pdal::Utils::toJSON(m_manager.getMetadata().clone("metadata"));
268+
}
269+
270+
std::string StreamableExecutor::getPipeline() const
271+
{
272+
std::stringstream strm;
273+
pdal::PipelineWriter::writePipeline(m_manager.getStage(), strm);
274+
return strm.str();
275+
}
276+
277+
int StreamableExecutor::getLogLevel() const
278+
{
279+
return static_cast<int>(m_log->getLevel());
280+
}
281+
282+
std::string StreamableExecutor::getLog() const
283+
{
284+
return m_logStream.str();
285+
}
286+
287+
// Returns the active log level.
288+
int StreamableExecutor::setLogLevel(int level)
289+
{
290+
if (level < 0 || level > 8)
291+
return getLogLevel();
292+
293+
m_log->setLevel(static_cast<pdal::LogLevel>(level));
294+
return level;
295+
}
296+
297+
bool StreamableExecutor::validate()
298+
{
299+
std::stringstream strm;
300+
strm << m_json;
301+
m_manager.readPipeline(strm);
302+
m_manager.prepare();
303+
304+
return true;
305+
}
306+
307+
} // namespace python
308+
} // namespace pdal

0 commit comments

Comments
 (0)