Skip to content

Commit 033c61e

Browse files
committed
Add Pipeline.execute_streaming() method
1 parent 277135f commit 033c61e

File tree

5 files changed

+60
-2
lines changed

5 files changed

+60
-2
lines changed

README.rst

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ PDAL and Python:
187187
).pipeline(clamped)
188188
print(pipeline.execute()) # 387 points
189189
190-
Iterating Streamable Pipelines
190+
Executing Streamable Pipelines
191191
................................................................................
192192
Streamable pipelines (pipelines that consist exclusively of streamable PDAL
193193
stages) can be executed in streaming mode via ``Pipeline.iterator()``. This
@@ -207,6 +207,12 @@ returns an iterator object that yields Numpy arrays of up to ``chunk_size`` size
207207
to allow prefetching up to to this number of arrays in parallel and buffering
208208
them until they are yielded to the caller.
209209

210+
If you just want to execute a streamable pipeline in streaming mode and don't
211+
need to access the data points (typically when the pipeline has Writer stage(s)),
212+
you can use the ``Pipeline.execute_streaming(chunk_size)`` method instead. This
213+
is functionally equivalent to ``sum(map(len, pipeline.iterator(chunk_size)))``
214+
but more efficient as it avoids allocating and filling any arrays in memory.
215+
210216
Accessing Mesh Data
211217
................................................................................
212218

pdal/PyPipeline.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ namespace pdal
4444
namespace python
4545
{
4646

47+
48+
void CountPointTable::reset()
49+
{
50+
for (PointId idx = 0; idx < numPoints(); idx++)
51+
if (!skip(idx))
52+
m_count++;
53+
FixedPointTable::reset();
54+
}
55+
56+
4757
PipelineExecutor::PipelineExecutor(
4858
std::string const& json, std::vector<std::shared_ptr<Array>> arrays, int level)
4959
{
@@ -70,6 +80,14 @@ point_count_t PipelineExecutor::execute()
7080
}
7181

7282

83+
point_count_t PipelineExecutor::executeStream(point_count_t streamLimit)
84+
{
85+
CountPointTable table(streamLimit);
86+
m_manager.executeStream(table);
87+
m_executed = true;
88+
return table.count();
89+
}
90+
7391
const PointViewSet& PipelineExecutor::views() const
7492
{
7593
if (!m_executed)

pdal/PyPipeline.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PDAL_DLL PipelineExecutor {
5454
virtual ~PipelineExecutor() = default;
5555

5656
point_count_t execute();
57+
point_count_t executeStream(point_count_t streamLimit);
5758

5859
const PointViewSet& views() const;
5960
std::string getPipeline() const;
@@ -73,5 +74,18 @@ class PDAL_DLL PipelineExecutor {
7374
std::stringstream m_logStream;
7475
};
7576

77+
class CountPointTable : public FixedPointTable
78+
{
79+
public:
80+
CountPointTable(point_count_t capacity) : FixedPointTable(capacity), m_count(0) {}
81+
point_count_t count() const { return m_count; }
82+
83+
protected:
84+
virtual void reset();
85+
86+
private:
87+
point_count_t m_count;
88+
};
89+
7690
} // namespace python
7791
} // namespace pdal

pdal/libpdalpython.pyx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ cdef extern from "PyPipeline.hpp" namespace "pdal::python":
9292
cdef cppclass PipelineExecutor:
9393
PipelineExecutor(string, vector[shared_ptr[Array]], int) except +
9494
int execute() except +
95+
int executeStream(int) except +
9596
string getPipeline() except +
9697
string getMetadata() except +
9798
string getSchema() except +
@@ -185,6 +186,9 @@ cdef class Pipeline(PipelineResultsMixin):
185186
def execute(self):
186187
return self._get_executor().execute()
187188

189+
def execute_streaming(self, int chunk_size=10000):
190+
return self._get_executor().executeStream(chunk_size)
191+
188192
def iterator(self, int chunk_size=10000, int prefetch = 0):
189193
it = PipelineIterator()
190194
it.set_executor(new StreamableExecutor(

test/test_pipeline.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,28 @@ def test_invalid_json(self, pipeline):
5656
pdal.Pipeline(pipeline)
5757

5858
@pytest.mark.parametrize("filename", ["sort.json", "sort.py"])
59-
def test_execution(self, filename):
59+
def test_execute(self, filename):
6060
"""Can we execute a PDAL pipeline"""
6161
r = get_pipeline(filename)
6262
r.execute()
6363
assert len(r.pipeline) > 200
6464

65+
@pytest.mark.parametrize("filename", ["range.json", "range.py"])
66+
def test_execute_streaming(self, filename):
67+
r = get_pipeline(filename)
68+
assert r.streamable
69+
count = r.execute()
70+
count2 = r.execute_streaming(chunk_size=100)
71+
assert count == count2
72+
73+
@pytest.mark.parametrize("filename", ["sort.json", "sort.py"])
74+
def test_execute_streaming_non_streamable(self, filename):
75+
r = get_pipeline(filename)
76+
assert not r.streamable
77+
with pytest.raises(RuntimeError) as info:
78+
r.execute_streaming()
79+
assert "Attempting to use stream mode" in str(info.value)
80+
6581
@pytest.mark.parametrize("filename", ["bad.json", "bad.py"])
6682
def test_validate(self, filename):
6783
"""Do we complain with bad pipelines"""

0 commit comments

Comments
 (0)