Skip to content

Commit aa3f7b6

Browse files
committed
Adding support for streaming the input arrays of a pipeline
1 parent 2d035ab commit aa3f7b6

File tree

6 files changed

+226
-39
lines changed

6 files changed

+226
-39
lines changed

src/pdal/PyArray.cpp

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
#include "PyArray.hpp"
3636
#include <pdal/io/MemoryViewReader.hpp>
37-
#include <numpy/arrayobject.h>
3837

3938
namespace pdal
4039
{
@@ -95,7 +94,8 @@ std::string pyObjectToString(PyObject *pname)
9594
#define PyDataType_NAMES(descr) ((descr)->names)
9695
#endif
9796

98-
Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true)
97+
Array::Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler)
98+
: m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler))
9999
{
100100
Py_XINCREF(array);
101101

@@ -164,51 +164,85 @@ Array::~Array()
164164
Py_XDECREF(m_array);
165165
}
166166

167-
168-
ArrayIter& Array::iterator()
167+
std::shared_ptr<ArrayIter> Array::iterator()
169168
{
170-
ArrayIter *it = new ArrayIter(m_array);
171-
m_iterators.emplace_back((it));
172-
return *it;
169+
return std::make_shared<ArrayIter>(m_array, m_stream_handler);
173170
}
174171

172+
ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr<ArrayStreamHandler> stream_handler)
173+
: m_stream_handler(std::move(stream_handler))
174+
{
175+
resetIterator(np_array);
176+
}
175177

176-
ArrayIter::ArrayIter(PyArrayObject* np_array)
178+
void ArrayIter::resetIterator(std::optional<PyArrayObject*> np_array = {})
177179
{
178-
m_iter = NpyIter_New(np_array,
179-
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
180-
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
181-
if (!m_iter)
182-
throw pdal_error("Unable to create numpy iterator.");
180+
std::optional<int> stream_chunk_size = std::nullopt;
181+
if (m_stream_handler) {
182+
stream_chunk_size = (*m_stream_handler)();
183+
if (*stream_chunk_size == 0) {
184+
m_done = true;
185+
return;
186+
}
187+
}
188+
189+
if (np_array) {
190+
// Init iterator
191+
m_iter = NpyIter_New(np_array.value(),
192+
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
193+
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
194+
if (!m_iter)
195+
throw pdal_error("Unable to create numpy iterator.");
196+
} else {
197+
// Otherwise, reset the iterator to the initial state
198+
if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) {
199+
NpyIter_Deallocate(m_iter);
200+
throw pdal_error("Unable to reset numpy iterator.");
201+
}
202+
}
183203

184204
char *itererr;
185205
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
186206
if (!m_iterNext)
187207
{
188208
NpyIter_Deallocate(m_iter);
189-
throw pdal_error(std::string("Unable to create numpy iterator: ") +
190-
itererr);
209+
throw pdal_error(std::string("Unable to create numpy iterator: ") + itererr);
191210
}
192211
m_data = NpyIter_GetDataPtrArray(m_iter);
193-
m_stride = NpyIter_GetInnerStrideArray(m_iter);
194-
m_size = NpyIter_GetInnerLoopSizePtr(m_iter);
212+
m_stride = *NpyIter_GetInnerStrideArray(m_iter);
213+
m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
214+
if (stream_chunk_size) {
215+
if (0 <= *stream_chunk_size && *stream_chunk_size <= m_size) {
216+
m_size = *stream_chunk_size;
217+
} else {
218+
throw pdal_error(std::string("Stream chunk size not in the range of array length: ") +
219+
std::to_string(*stream_chunk_size));
220+
}
221+
}
195222
m_done = false;
196223
}
197224

198225
ArrayIter::~ArrayIter()
199226
{
200-
NpyIter_Deallocate(m_iter);
227+
if (m_iter != nullptr) {
228+
NpyIter_Deallocate(m_iter);
229+
}
201230
}
202231

203232
ArrayIter& ArrayIter::operator++()
204233
{
205234
if (m_done)
206235
return *this;
207236

208-
if (--(*m_size))
209-
*m_data += *m_stride;
210-
else if (!m_iterNext(m_iter))
211-
m_done = true;
237+
if (--m_size) {
238+
*m_data += m_stride;
239+
} else if (!m_iterNext(m_iter)) {
240+
if (m_stream_handler) {
241+
resetIterator();
242+
} else {
243+
m_done = true;
244+
}
245+
}
212246
return *this;
213247
}
214248

src/pdal/PyArray.hpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
#include <vector>
5151
#include <memory>
52+
#include <optional>
5253

5354
namespace pdal
5455
{
@@ -58,14 +59,15 @@ namespace python
5859

5960
class ArrayIter;
6061

62+
using ArrayStreamHandler = std::function<int64_t()>;
6163

6264
class PDAL_DLL Array
6365
{
6466
public:
6567
using Shape = std::array<size_t, 3>;
6668
using Fields = std::vector<MemoryViewReader::Field>;
6769

68-
Array(PyArrayObject* array);
70+
Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler = {});
6971
~Array();
7072

7173
Array(Array&& a) = default;
@@ -77,14 +79,14 @@ class PDAL_DLL Array
7779
bool rowMajor() const { return m_rowMajor; };
7880
Shape shape() const { return m_shape; }
7981
const Fields& fields() const { return m_fields; };
80-
ArrayIter& iterator();
82+
std::shared_ptr<ArrayIter> iterator();
8183

8284
private:
8385
PyArrayObject* m_array;
8486
Fields m_fields;
8587
bool m_rowMajor;
8688
Shape m_shape {};
87-
std::vector<std::unique_ptr<ArrayIter>> m_iterators;
89+
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
8890
};
8991

9092

@@ -94,20 +96,23 @@ class PDAL_DLL ArrayIter
9496
ArrayIter(const ArrayIter&) = delete;
9597
ArrayIter() = delete;
9698

97-
ArrayIter(PyArrayObject*);
99+
ArrayIter(PyArrayObject*, std::shared_ptr<ArrayStreamHandler>);
98100
~ArrayIter();
99101

100102
ArrayIter& operator++();
101103
operator bool () const { return !m_done; }
102104
char* operator*() const { return *m_data; }
103105

104106
private:
105-
NpyIter *m_iter;
107+
NpyIter *m_iter = nullptr;
106108
NpyIter_IterNextFunc *m_iterNext;
107109
char **m_data;
108-
npy_intp *m_size;
109-
npy_intp *m_stride;
110+
npy_intp m_size;
111+
npy_intp m_stride;
110112
bool m_done;
113+
114+
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
115+
void resetIterator(std::optional<PyArrayObject*> np_array);
111116
};
112117

113118
} // namespace python

src/pdal/PyPipeline.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,20 @@ void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> array
245245
for (auto f : array->fields())
246246
r.pushField(f);
247247

248-
ArrayIter& iter = array->iterator();
249-
auto incrementer = [&iter](PointId id) -> char *
248+
auto arrayIter = array->iterator();
249+
auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char *
250250
{
251-
if (! iter)
251+
ArrayIter& iter = *arrayIter;
252+
if (!firstPoint && iter) {
253+
++iter;
254+
} else {
255+
firstPoint = false;
256+
}
257+
258+
if (!iter)
252259
return nullptr;
253260

254261
char *c = *iter;
255-
++iter;
256262
return c;
257263
};
258264

src/pdal/libpdalpython.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
33
#include <pybind11/numpy.h>
4+
#include <pybind11/functional.h>
45
#include <pybind11/stl/filesystem.h>
56
#include <iostream>
67

@@ -189,11 +190,22 @@ namespace pdal {
189190
));
190191
}
191192

192-
void setInputs(std::vector<py::array> ndarrays) {
193+
void setInputs(const std::vector<py::object>& inputs) {
193194
_inputs.clear();
194-
for (const auto& ndarray: ndarrays) {
195-
PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr();
196-
_inputs.push_back(std::make_shared<pdal::python::Array>(ndarray_ptr));
195+
for (const auto& input_obj: inputs) {
196+
if (py::isinstance<py::array>(input_obj)) {
197+
// Backward compatibility for accepting list of numpy arrays
198+
auto ndarray = input_obj.cast<py::array>();
199+
_inputs.push_back(std::make_shared<pdal::python::Array>((PyArrayObject*)ndarray.ptr()));
200+
} else {
201+
// Now expected to be a list of pairs: (numpy array, <optional> stream handler)
202+
auto input = input_obj.cast<std::pair<py::array, pdal::python::ArrayStreamHandler>>();
203+
_inputs.push_back(std::make_shared<pdal::python::Array>(
204+
(PyArrayObject*)input.first.ptr(),
205+
input.second ?
206+
std::make_shared<pdal::python::ArrayStreamHandler>(input.second)
207+
: nullptr));
208+
}
197209
}
198210
delExecutor();
199211
}

src/pdal/pipeline.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import json
44
import logging
5-
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast
5+
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable
66

77
import numpy as np
88
import pathlib
@@ -41,6 +41,7 @@ def __init__(
4141
loglevel: int = logging.ERROR,
4242
json: Optional[str] = None,
4343
dataframes: Sequence[DataFrame] = (),
44+
stream_handlers: Sequence[Callable[[], int]] = (),
4445
):
4546

4647
if json:
@@ -58,7 +59,14 @@ def __init__(
5859
stages = _parse_stages(spec) if isinstance(spec, str) else spec
5960
for stage in stages:
6061
self |= stage
61-
self.inputs = arrays
62+
63+
if stream_handlers:
64+
if len(stream_handlers) != len(arrays):
65+
raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
66+
self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
67+
else:
68+
self.inputs = [(a, None) for a in arrays]
69+
6270
self.loglevel = loglevel
6371

6472
def __getstate__(self):

0 commit comments

Comments
 (0)