Skip to content

Commit b379d7b

Browse files
committed
Adding support for streaming the input arrays of a pipeline
1 parent fdabd27 commit b379d7b

File tree

6 files changed

+226
-39
lines changed

6 files changed

+226
-39
lines changed

src/pdal/PyArray.cpp

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
#include "PyArray.hpp"
3636
#include <pdal/io/MemoryViewReader.hpp>
37-
#include <numpy/arrayobject.h>
3837

3938
namespace pdal
4039
{
@@ -94,7 +93,8 @@ std::string toString(PyObject *pname)
9493
#define PyDataType_FIELDS(descr) ((descr)->fields)
9594
#endif
9695

97-
Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true)
96+
Array::Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler)
97+
: m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler))
9898
{
9999
Py_XINCREF(array);
100100

@@ -163,51 +163,85 @@ Array::~Array()
163163
Py_XDECREF(m_array);
164164
}
165165

166-
167-
ArrayIter& Array::iterator()
166+
std::shared_ptr<ArrayIter> Array::iterator()
168167
{
169-
ArrayIter *it = new ArrayIter(m_array);
170-
m_iterators.emplace_back((it));
171-
return *it;
168+
return std::make_shared<ArrayIter>(m_array, m_stream_handler);
172169
}
173170

171+
ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr<ArrayStreamHandler> stream_handler)
172+
: m_stream_handler(std::move(stream_handler))
173+
{
174+
resetIterator(np_array);
175+
}
174176

175-
ArrayIter::ArrayIter(PyArrayObject* np_array)
177+
void ArrayIter::resetIterator(std::optional<PyArrayObject*> np_array = {})
176178
{
177-
m_iter = NpyIter_New(np_array,
178-
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
179-
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
180-
if (!m_iter)
181-
throw pdal_error("Unable to create numpy iterator.");
179+
std::optional<int> stream_chunk_size = std::nullopt;
180+
if (m_stream_handler) {
181+
stream_chunk_size = (*m_stream_handler)();
182+
if (*stream_chunk_size == 0) {
183+
m_done = true;
184+
return;
185+
}
186+
}
187+
188+
if (np_array) {
189+
// Init iterator
190+
m_iter = NpyIter_New(np_array.value(),
191+
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
192+
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
193+
if (!m_iter)
194+
throw pdal_error("Unable to create numpy iterator.");
195+
} else {
196+
// Otherwise, reset the iterator to the initial state
197+
if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) {
198+
NpyIter_Deallocate(m_iter);
199+
throw pdal_error("Unable to reset numpy iterator.");
200+
}
201+
}
182202

183203
char *itererr;
184204
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
185205
if (!m_iterNext)
186206
{
187207
NpyIter_Deallocate(m_iter);
188-
throw pdal_error(std::string("Unable to create numpy iterator: ") +
189-
itererr);
208+
throw pdal_error(std::string("Unable to create numpy iterator: ") + itererr);
190209
}
191210
m_data = NpyIter_GetDataPtrArray(m_iter);
192-
m_stride = NpyIter_GetInnerStrideArray(m_iter);
193-
m_size = NpyIter_GetInnerLoopSizePtr(m_iter);
211+
m_stride = *NpyIter_GetInnerStrideArray(m_iter);
212+
m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
213+
if (stream_chunk_size) {
214+
if (0 <= *stream_chunk_size && *stream_chunk_size <= m_size) {
215+
m_size = *stream_chunk_size;
216+
} else {
217+
throw pdal_error(std::string("Stream chunk size not in the range of array length: ") +
218+
std::to_string(*stream_chunk_size));
219+
}
220+
}
194221
m_done = false;
195222
}
196223

197224
ArrayIter::~ArrayIter()
198225
{
199-
NpyIter_Deallocate(m_iter);
226+
if (m_iter != nullptr) {
227+
NpyIter_Deallocate(m_iter);
228+
}
200229
}
201230

202231
ArrayIter& ArrayIter::operator++()
203232
{
204233
if (m_done)
205234
return *this;
206235

207-
if (--(*m_size))
208-
*m_data += *m_stride;
209-
else if (!m_iterNext(m_iter))
210-
m_done = true;
236+
if (--m_size) {
237+
*m_data += m_stride;
238+
} else if (!m_iterNext(m_iter)) {
239+
if (m_stream_handler) {
240+
resetIterator();
241+
} else {
242+
m_done = true;
243+
}
244+
}
211245
return *this;
212246
}
213247

src/pdal/PyArray.hpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
#include <vector>
5151
#include <memory>
52+
#include <optional>
5253

5354
namespace pdal
5455
{
@@ -57,14 +58,15 @@ namespace python
5758

5859
class ArrayIter;
5960

61+
using ArrayStreamHandler = std::function<int64_t()>;
6062

6163
class PDAL_DLL Array
6264
{
6365
public:
6466
using Shape = std::array<size_t, 3>;
6567
using Fields = std::vector<MemoryViewReader::Field>;
6668

67-
Array(PyArrayObject* array);
69+
Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler = {});
6870
~Array();
6971

7072
Array(Array&& a) = default;
@@ -76,14 +78,14 @@ class PDAL_DLL Array
7678
bool rowMajor() const { return m_rowMajor; };
7779
Shape shape() const { return m_shape; }
7880
const Fields& fields() const { return m_fields; };
79-
ArrayIter& iterator();
81+
std::shared_ptr<ArrayIter> iterator();
8082

8183
private:
8284
PyArrayObject* m_array;
8385
Fields m_fields;
8486
bool m_rowMajor;
8587
Shape m_shape {};
86-
std::vector<std::unique_ptr<ArrayIter>> m_iterators;
88+
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
8789
};
8890

8991

@@ -93,20 +95,23 @@ class ArrayIter
9395
ArrayIter(const ArrayIter&) = delete;
9496
ArrayIter() = delete;
9597

96-
ArrayIter(PyArrayObject*);
98+
ArrayIter(PyArrayObject*, std::shared_ptr<ArrayStreamHandler>);
9799
~ArrayIter();
98100

99101
ArrayIter& operator++();
100102
operator bool () const { return !m_done; }
101103
char* operator*() const { return *m_data; }
102104

103105
private:
104-
NpyIter *m_iter;
106+
NpyIter *m_iter = nullptr;
105107
NpyIter_IterNextFunc *m_iterNext;
106108
char **m_data;
107-
npy_intp *m_size;
108-
npy_intp *m_stride;
109+
npy_intp m_size;
110+
npy_intp m_stride;
109111
bool m_done;
112+
113+
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
114+
void resetIterator(std::optional<PyArrayObject*> np_array);
110115
};
111116

112117
} // namespace python

src/pdal/PyPipeline.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,20 @@ void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> array
235235
for (auto f : array->fields())
236236
r.pushField(f);
237237

238-
ArrayIter& iter = array->iterator();
239-
auto incrementer = [&iter](PointId id) -> char *
238+
auto arrayIter = array->iterator();
239+
auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char *
240240
{
241-
if (! iter)
241+
ArrayIter& iter = *arrayIter;
242+
if (!firstPoint && iter) {
243+
++iter;
244+
} else {
245+
firstPoint = false;
246+
}
247+
248+
if (!iter)
242249
return nullptr;
243250

244251
char *c = *iter;
245-
++iter;
246252
return c;
247253
};
248254

src/pdal/libpdalpython.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
33
#include <pybind11/numpy.h>
4+
#include <pybind11/functional.h>
45
#include <pybind11/stl/filesystem.h>
56
#include <iostream>
67

@@ -190,11 +191,22 @@ namespace pdal {
190191
));
191192
}
192193

193-
void setInputs(std::vector<py::array> ndarrays) {
194+
void setInputs(const std::vector<py::object>& inputs) {
194195
_inputs.clear();
195-
for (const auto& ndarray: ndarrays) {
196-
PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr();
197-
_inputs.push_back(std::make_shared<pdal::python::Array>(ndarray_ptr));
196+
for (const auto& input_obj: inputs) {
197+
if (py::isinstance<py::array>(input_obj)) {
198+
// Backward compatibility for accepting list of numpy arrays
199+
auto ndarray = input_obj.cast<py::array>();
200+
_inputs.push_back(std::make_shared<pdal::python::Array>((PyArrayObject*)ndarray.ptr()));
201+
} else {
202+
// Now expected to be a list of pairs: (numpy array, <optional> stream handler)
203+
auto input = input_obj.cast<std::pair<py::array, pdal::python::ArrayStreamHandler>>();
204+
_inputs.push_back(std::make_shared<pdal::python::Array>(
205+
(PyArrayObject*)input.first.ptr(),
206+
input.second ?
207+
std::make_shared<pdal::python::ArrayStreamHandler>(input.second)
208+
: nullptr));
209+
}
198210
}
199211
delExecutor();
200212
}

src/pdal/pipeline.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import json
44
import logging
5-
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast
5+
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable
66

77
import numpy as np
88
import pathlib
@@ -41,6 +41,7 @@ def __init__(
4141
loglevel: int = logging.ERROR,
4242
json: Optional[str] = None,
4343
dataframes: Sequence[DataFrame] = (),
44+
stream_handlers: Sequence[Callable[[], int]] = (),
4445
):
4546

4647
if json:
@@ -58,7 +59,14 @@ def __init__(
5859
stages = _parse_stages(spec) if isinstance(spec, str) else spec
5960
for stage in stages:
6061
self |= stage
61-
self.inputs = arrays
62+
63+
if stream_handlers:
64+
if len(stream_handlers) != len(arrays):
65+
raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
66+
self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
67+
else:
68+
self.inputs = [(a, None) for a in arrays]
69+
6270
self.loglevel = loglevel
6371

6472
def __getstate__(self):

0 commit comments

Comments
 (0)