Skip to content

Commit 32328ac

Browse files
authored
Adding support for streaming through the input arrays of a pipeline (#182)
* Adding support for streaming the input arrays of a pipeline * Code cleanup based on code review. * Updated documentation to include mention to reading with stream handler.
1 parent 2d035ab commit 32328ac

File tree

7 files changed

+356
-37
lines changed

7 files changed

+356
-37
lines changed

README.rst

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,104 @@ PDAL and Python:
208208
with tiledb.open("clamped") as a:
209209
print(a.schema)
210210
211+
Reading using Numpy Arrays as buffers (advanced)
212+
................................................................................
213+
214+
It's also possible to treat the Numpy arrays passed to PDAL as buffers that are iteratively populated through
215+
custom python functions during the execution of the pipeline.
216+
217+
This may be useful in cases where you want the reading of the input data to be handled in a streamable fashion,
218+
like for example:
219+
220+
* When the total Numpy array data wouldn't fit into memory.
221+
* To initiate execution of a streamable PDAL pipeline while the input data is still being read.
222+
223+
To enable this mode, you just need to include the python populate function along with each corresponding Numpy array.
224+
225+
.. code-block:: python
226+
227+
# Numpy array to be used as buffer
228+
in_buffer = np.zeros(max_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])
229+
230+
# The function to populate the buffer iteratively
231+
def load_next_chunk() -> int:
232+
"""
233+
Function called by PDAL before reading the data from the buffer.
234+
235+
IMPORTANT: must return the total number of items to be read from the buffer.
236+
The Pipeline execution will keep calling this function in a loop until 0 is returned.
237+
"""
238+
#
239+
# Replace here with your code that populates the buffer and returns the number of elements to read
240+
#
241+
chunk_size = next_chunk.size
242+
in_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
243+
in_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
244+
in_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]
245+
246+
return chunk_size
247+
248+
# Configure input array and handler during Pipeline initialization...
249+
p = pdal.Pipeline(pipeline_json, arrays=[in_buffer], stream_handlers=[load_next_chunk])
250+
251+
# ...alternatively you can use the setter on an existing Pipeline
252+
# p.inputs = [(in_buffer, load_next_chunk)]
253+
254+
The following snippet provides a simple example of how to use a Numpy array as buffer to support writing through PDAL
255+
with total control over the maximum amount of memory to use.
256+
257+
.. raw:: html
258+
259+
<details>
260+
<summary>Example: Streaming the read and write of a very large LAZ file with low memory footprint</summary>
261+
262+
.. code-block:: python
263+
264+
import numpy as np
265+
import pdal
266+
267+
in_chunk_size = 10_000_000
268+
in_pipeline = pdal.Reader.las(**{
269+
"filename": "in_test.laz"
270+
}).pipeline()
271+
272+
in_pipeline_it = in_pipeline.iterator(in_chunk_size).__iter__()
273+
274+
out_chunk_size = 50_000_000
275+
out_file = "out_test.laz"
276+
out_pipeline = pdal.Writer.las(
277+
filename=out_file
278+
).pipeline()
279+
280+
out_buffer = np.zeros(in_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])
281+
282+
def load_next_chunk():
283+
try:
284+
next_chunk = next(in_pipeline_it)
285+
except StopIteration:
286+
# Stops the streaming
287+
return 0
288+
289+
chunk_size = next_chunk.size
290+
out_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
291+
out_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
292+
out_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]
293+
294+
print(f"Loaded next chunk -> {chunk_size}")
295+
296+
return chunk_size
297+
298+
out_pipeline.inputs = [(out_buffer, load_next_chunk)]
299+
300+
out_pipeline.loglevel = 20 # INFO
301+
count = out_pipeline.execute_streaming(out_chunk_size)
302+
303+
print(f"\nWROTE - {count}")
304+
305+
.. raw:: html
306+
307+
</details>
308+
211309
Executing Streamable Pipelines
212310
................................................................................
213311
Streamable pipelines (pipelines that consist exclusively of streamable PDAL

src/pdal/PyArray.cpp

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
#include "PyArray.hpp"
3636
#include <pdal/io/MemoryViewReader.hpp>
37-
#include <numpy/arrayobject.h>
3837

3938
namespace pdal
4039
{
@@ -95,7 +94,8 @@ std::string pyObjectToString(PyObject *pname)
9594
#define PyDataType_NAMES(descr) ((descr)->names)
9695
#endif
9796

98-
Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true)
97+
Array::Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler)
98+
: m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler))
9999
{
100100
Py_XINCREF(array);
101101

@@ -164,51 +164,93 @@ Array::~Array()
164164
Py_XDECREF(m_array);
165165
}
166166

167-
168-
ArrayIter& Array::iterator()
167+
std::shared_ptr<ArrayIter> Array::iterator()
169168
{
170-
ArrayIter *it = new ArrayIter(m_array);
171-
m_iterators.emplace_back((it));
172-
return *it;
169+
return std::make_shared<ArrayIter>(m_array, m_stream_handler);
173170
}
174171

175-
176-
ArrayIter::ArrayIter(PyArrayObject* np_array)
172+
ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr<ArrayStreamHandler> stream_handler)
173+
: m_stream_handler(std::move(stream_handler))
177174
{
175+
// Create iterator
178176
m_iter = NpyIter_New(np_array,
179-
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
180-
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
177+
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
178+
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
181179
if (!m_iter)
182180
throw pdal_error("Unable to create numpy iterator.");
183181

182+
initIterator();
183+
}
184+
185+
void ArrayIter::initIterator()
186+
{
187+
// For a stream handler, first execute it to get the buffer populated and know the size of the data to iterate
188+
int64_t stream_chunk_size = 0;
189+
if (m_stream_handler) {
190+
stream_chunk_size = (*m_stream_handler)();
191+
if (!stream_chunk_size) {
192+
m_done = true;
193+
return;
194+
}
195+
}
196+
197+
// Initialize the iterator function
184198
char *itererr;
185199
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
186200
if (!m_iterNext)
187201
{
188202
NpyIter_Deallocate(m_iter);
189-
throw pdal_error(std::string("Unable to create numpy iterator: ") +
190-
itererr);
203+
m_iter = nullptr;
204+
throw pdal_error(std::string("Unable to retrieve iteration function from numpy iterator: ") + itererr);
191205
}
192206
m_data = NpyIter_GetDataPtrArray(m_iter);
193-
m_stride = NpyIter_GetInnerStrideArray(m_iter);
194-
m_size = NpyIter_GetInnerLoopSizePtr(m_iter);
207+
m_stride = *NpyIter_GetInnerStrideArray(m_iter);
208+
m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
209+
if (stream_chunk_size) {
210+
// Ensure chunk size is valid and then limit iteration accordingly
211+
if (0 < stream_chunk_size && stream_chunk_size <= m_size) {
212+
m_size = stream_chunk_size;
213+
} else {
214+
throw pdal_error(std::string("Stream chunk size not in the range of array length: ") +
215+
std::to_string(stream_chunk_size));
216+
}
217+
}
195218
m_done = false;
196219
}
197220

221+
void ArrayIter::resetIterator()
222+
{
223+
// Reset the iterator to the initial state
224+
if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) {
225+
NpyIter_Deallocate(m_iter);
226+
m_iter = nullptr;
227+
throw pdal_error("Unable to reset numpy iterator.");
228+
}
229+
230+
initIterator();
231+
}
232+
198233
ArrayIter::~ArrayIter()
199234
{
200-
NpyIter_Deallocate(m_iter);
235+
if (m_iter != nullptr) {
236+
NpyIter_Deallocate(m_iter);
237+
}
201238
}
202239

203240
ArrayIter& ArrayIter::operator++()
204241
{
205242
if (m_done)
206243
return *this;
207244

208-
if (--(*m_size))
209-
*m_data += *m_stride;
210-
else if (!m_iterNext(m_iter))
211-
m_done = true;
245+
if (--m_size) {
246+
*m_data += m_stride;
247+
} else if (!m_iterNext(m_iter)) {
248+
if (m_stream_handler) {
249+
resetIterator();
250+
} else {
251+
m_done = true;
252+
}
253+
}
212254
return *this;
213255
}
214256

src/pdal/PyArray.hpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,15 @@ namespace python
5858

5959
class ArrayIter;
6060

61+
using ArrayStreamHandler = std::function<int64_t()>;
6162

6263
class PDAL_DLL Array
6364
{
6465
public:
6566
using Shape = std::array<size_t, 3>;
6667
using Fields = std::vector<MemoryViewReader::Field>;
6768

68-
Array(PyArrayObject* array);
69+
Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler = {});
6970
~Array();
7071

7172
Array(Array&& a) = default;
@@ -77,14 +78,14 @@ class PDAL_DLL Array
7778
bool rowMajor() const { return m_rowMajor; };
7879
Shape shape() const { return m_shape; }
7980
const Fields& fields() const { return m_fields; };
80-
ArrayIter& iterator();
81+
std::shared_ptr<ArrayIter> iterator();
8182

8283
private:
8384
PyArrayObject* m_array;
8485
Fields m_fields;
8586
bool m_rowMajor;
8687
Shape m_shape {};
87-
std::vector<std::unique_ptr<ArrayIter>> m_iterators;
88+
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
8889
};
8990

9091

@@ -94,20 +95,24 @@ class PDAL_DLL ArrayIter
9495
ArrayIter(const ArrayIter&) = delete;
9596
ArrayIter() = delete;
9697

97-
ArrayIter(PyArrayObject*);
98+
ArrayIter(PyArrayObject*, std::shared_ptr<ArrayStreamHandler>);
9899
~ArrayIter();
99100

100101
ArrayIter& operator++();
101102
operator bool () const { return !m_done; }
102103
char* operator*() const { return *m_data; }
103104

104105
private:
105-
NpyIter *m_iter;
106+
NpyIter *m_iter = nullptr;
106107
NpyIter_IterNextFunc *m_iterNext;
107108
char **m_data;
108-
npy_intp *m_size;
109-
npy_intp *m_stride;
109+
npy_intp m_size;
110+
npy_intp m_stride;
110111
bool m_done;
112+
113+
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
114+
void initIterator();
115+
void resetIterator();
111116
};
112117

113118
} // namespace python

src/pdal/PyPipeline.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,20 @@ void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> array
245245
for (auto f : array->fields())
246246
r.pushField(f);
247247

248-
ArrayIter& iter = array->iterator();
249-
auto incrementer = [&iter](PointId id) -> char *
248+
auto arrayIter = array->iterator();
249+
auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char *
250250
{
251-
if (! iter)
251+
ArrayIter& iter = *arrayIter;
252+
if (!firstPoint && iter) {
253+
++iter;
254+
} else {
255+
firstPoint = false;
256+
}
257+
258+
if (!iter)
252259
return nullptr;
253260

254261
char *c = *iter;
255-
++iter;
256262
return c;
257263
};
258264

src/pdal/libpdalpython.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <pybind11/pybind11.h>
22
#include <pybind11/stl.h>
33
#include <pybind11/numpy.h>
4+
#include <pybind11/functional.h>
45
#include <pybind11/stl/filesystem.h>
56
#include <iostream>
67

@@ -189,11 +190,22 @@ namespace pdal {
189190
));
190191
}
191192

192-
void setInputs(std::vector<py::array> ndarrays) {
193+
void setInputs(const std::vector<py::object>& inputs) {
193194
_inputs.clear();
194-
for (const auto& ndarray: ndarrays) {
195-
PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr();
196-
_inputs.push_back(std::make_shared<pdal::python::Array>(ndarray_ptr));
195+
for (const auto& input_obj: inputs) {
196+
if (py::isinstance<py::array>(input_obj)) {
197+
// Backward compatibility for accepting list of numpy arrays
198+
auto ndarray = input_obj.cast<py::array>();
199+
_inputs.push_back(std::make_shared<pdal::python::Array>((PyArrayObject*)ndarray.ptr()));
200+
} else {
201+
// Now expected to be a list of pairs: (numpy array, <optional> stream handler)
202+
auto input = input_obj.cast<std::pair<py::array, pdal::python::ArrayStreamHandler>>();
203+
_inputs.push_back(std::make_shared<pdal::python::Array>(
204+
(PyArrayObject*)input.first.ptr(),
205+
input.second ?
206+
std::make_shared<pdal::python::ArrayStreamHandler>(input.second)
207+
: nullptr));
208+
}
197209
}
198210
delExecutor();
199211
}

src/pdal/pipeline.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import json
44
import logging
5-
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast
5+
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable
66

77
import numpy as np
88
import pathlib
@@ -41,6 +41,7 @@ def __init__(
4141
loglevel: int = logging.ERROR,
4242
json: Optional[str] = None,
4343
dataframes: Sequence[DataFrame] = (),
44+
stream_handlers: Sequence[Callable[[], int]] = (),
4445
):
4546

4647
if json:
@@ -58,7 +59,14 @@ def __init__(
5859
stages = _parse_stages(spec) if isinstance(spec, str) else spec
5960
for stage in stages:
6061
self |= stage
61-
self.inputs = arrays
62+
63+
if stream_handlers:
64+
if len(stream_handlers) != len(arrays):
65+
raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
66+
self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
67+
else:
68+
self.inputs = [(a, None) for a in arrays]
69+
6270
self.loglevel = loglevel
6371

6472
def __getstate__(self):

0 commit comments

Comments
 (0)