Skip to content

Commit d8508ae

Browse files
committed
API change: replace Pipeline.__iter__ with iterator(chunksize, prefetch) and add new PipelineIterator class
1 parent 27103a9 commit d8508ae

File tree

3 files changed

+114
-105
lines changed

3 files changed

+114
-105
lines changed

pdal/libpdalpython.pyx

Lines changed: 73 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# distutils: language = c++
22
# cython: c_string_type=unicode, c_string_encoding=utf8
33

4+
import json
45
from types import SimpleNamespace
56

7+
cimport cython
68
from cython.operator cimport dereference as deref
79
from cpython.ref cimport Py_DECREF
810
from libcpp cimport bool
@@ -121,12 +123,33 @@ cdef extern from "PyPipeline.hpp" namespace "pdal::python":
121123
np.PyArrayObject* meshToNumpyArray(const TriangularMesh*) except +
122124

123125

124-
cdef class Pipeline:
125-
cdef unique_ptr[StreamableExecutor] _executor
126+
@cython.internal
127+
cdef class PipelineResultsMixin:
128+
@property
129+
def log(self):
130+
return self._get_executor().getLog()
131+
132+
@property
133+
def schema(self):
134+
return json.loads(self._get_executor().getSchema())
135+
136+
@property
137+
def pipeline(self):
138+
return self._get_executor().getPipeline()
139+
140+
@property
141+
def metadata(self):
142+
return self._get_executor().getMetadata()
143+
144+
cdef PipelineExecutor* _get_executor(self) except NULL:
145+
raise NotImplementedError("Abstract method")
146+
147+
148+
149+
cdef class Pipeline(PipelineResultsMixin):
150+
cdef unique_ptr[PipelineExecutor] _executor
126151
cdef vector[shared_ptr[Array]] _inputs
127152
cdef int _loglevel
128-
cdef int _chunk_size
129-
cdef int _prefetch
130153

131154
#========= writeable properties to be set before execution ===========================
132155

@@ -150,44 +173,8 @@ cdef class Pipeline:
150173
self._loglevel = value
151174
self._del_executor()
152175

153-
@property
154-
def chunk_size(self):
155-
return self._chunk_size
156-
157-
@chunk_size.setter
158-
def chunk_size(self, value):
159-
assert value > 0
160-
self._chunk_size = value
161-
self._del_executor()
162-
163-
@property
164-
def prefetch(self):
165-
return self._prefetch
166-
167-
@prefetch.setter
168-
def prefetch(self, value):
169-
assert value >= 0
170-
self._prefetch = value
171-
self._del_executor()
172-
173176
#========= readable properties to be read after execution ============================
174177

175-
@property
176-
def log(self):
177-
return self._get_executor().getLog()
178-
179-
@property
180-
def schema(self):
181-
return self._get_executor().getSchema()
182-
183-
@property
184-
def pipeline(self):
185-
return self._get_executor().getPipeline()
186-
187-
@property
188-
def metadata(self):
189-
return self._get_executor().getMetadata()
190-
191178
@property
192179
def arrays(self):
193180
cdef PipelineExecutor* executor = self._get_executor()
@@ -215,18 +202,14 @@ cdef class Pipeline:
215202
def execute(self):
216203
return self._get_executor().execute()
217204

218-
def __iter__(self):
219-
cdef StreamableExecutor* executor = self._get_executor()
220-
try:
221-
while True:
222-
arr_ptr = executor.executeNext()
223-
if arr_ptr is NULL:
224-
break
225-
arr = <object>arr_ptr
226-
Py_DECREF(arr)
227-
yield arr
228-
finally:
229-
executor.stop()
205+
def iterator(self, int chunk_size=10000, int prefetch = 0):
206+
cdef StreamableExecutor* executor = new StreamableExecutor(
207+
self._get_json(), chunk_size, prefetch
208+
)
209+
self._configure_executor(executor)
210+
it = PipelineIterator()
211+
it.set_executor(executor)
212+
return it
230213

231214
#========= non-public properties & methods ===========================================
232215

@@ -243,12 +226,44 @@ cdef class Pipeline:
243226
def _del_executor(self):
244227
self._executor.reset()
245228

246-
cdef StreamableExecutor* _get_executor(self) except NULL:
229+
cdef PipelineExecutor* _get_executor(self) except NULL:
247230
if not self._executor:
248-
executor = new StreamableExecutor(self._get_json(),
249-
self._chunk_size, self._prefetch)
250-
executor.setLogLevel(self._loglevel)
251-
executor.read()
252-
addArrayReaders(executor, self._inputs)
231+
executor = new PipelineExecutor(self._get_json())
232+
self._configure_executor(executor)
253233
self._executor.reset(executor)
254234
return self._executor.get()
235+
236+
cdef _configure_executor(self, PipelineExecutor* executor):
237+
executor.setLogLevel(self._loglevel)
238+
executor.read()
239+
addArrayReaders(executor, self._inputs)
240+
241+
242+
cdef class PipelineIterator(PipelineResultsMixin):
243+
cdef unique_ptr[StreamableExecutor] _executor
244+
245+
cdef set_executor(self, StreamableExecutor* executor):
246+
self._executor.reset(executor)
247+
248+
def __dealloc__(self):
249+
self._executor.get().stop()
250+
251+
def __iter__(self):
252+
return self
253+
254+
def __next__(self):
255+
cdef StreamableExecutor* executor = self._executor.get()
256+
if executor.executed():
257+
raise StopIteration
258+
259+
arr_ptr = executor.executeNext()
260+
if arr_ptr is NULL:
261+
executor.stop()
262+
raise StopIteration
263+
264+
arr = <object> arr_ptr
265+
Py_DECREF(arr)
266+
return arr
267+
268+
cdef PipelineExecutor* _get_executor(self) except NULL:
269+
return self._executor.get()

pdal/pipeline.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ def __init__(
2828
spec: Union[None, str, Sequence[Stage]] = None,
2929
arrays: Sequence[np.ndarray] = (),
3030
loglevel: int = logging.ERROR,
31-
chunk_size: int = 10000,
32-
prefetch: int = 0,
3331
):
3432
super().__init__()
3533
self._stages: List[Stage] = []
@@ -39,8 +37,6 @@ def __init__(
3937
self |= stage
4038
self.inputs = arrays
4139
self.loglevel = loglevel
42-
self.chunk_size = chunk_size
43-
self.prefetch = prefetch
4440

4541
@property
4642
def stages(self) -> List[Stage]:
@@ -50,10 +46,6 @@ def stages(self) -> List[Stage]:
5046
def streamable(self) -> bool:
5147
return all(stage.streamable for stage in self._stages)
5248

53-
@property
54-
def schema(self) -> Any:
55-
return json.loads(super().schema)
56-
5749
@property
5850
def loglevel(self) -> int:
5951
return LogLevelFromPDAL[super().loglevel]

test/test_pipeline.py

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import logging
33
import os
44
import sys
5-
from functools import partial
65

76
import numpy as np
87
import pytest
@@ -12,16 +11,12 @@
1211
DATADIRECTORY = os.path.join(os.path.dirname(__file__), "data")
1312

1413

15-
def get_pipeline(filename, *, chunk_size=None, prefetch=None):
14+
def get_pipeline(filename):
1615
with open(os.path.join(DATADIRECTORY, filename), "r") as f:
1716
if filename.endswith(".json"):
1817
pipeline = pdal.Pipeline(f.read())
1918
elif filename.endswith(".py"):
2019
pipeline = eval(f.read(), vars(pdal))
21-
if chunk_size is not None:
22-
pipeline.chunk_size = chunk_size
23-
if prefetch is not None:
24-
pipeline.prefetch = prefetch
2520
return pipeline
2621

2722

@@ -36,11 +31,6 @@ class TestPipeline:
3631
def test_construction(self, filename):
3732
"""Can we construct a PDAL pipeline"""
3833
assert isinstance(get_pipeline(filename), pdal.Pipeline)
39-
assert isinstance(get_pipeline(filename, chunk_size=100), pdal.Pipeline)
40-
assert isinstance(get_pipeline(filename, prefetch=3), pdal.Pipeline)
41-
assert isinstance(
42-
get_pipeline(filename, chunk_size=100, prefetch=3), pdal.Pipeline
43-
)
4434

4535
# construct Pipeline from a sequence of stages
4636
r = pdal.Reader("r")
@@ -421,48 +411,51 @@ class TestPipelineIterator:
421411

422412
def test_array(self):
423413
"""Can we fetch PDAL data as numpy arrays"""
424-
ri = get_pipeline("range.json", chunk_size=100)
425-
arrays = list(ri)
426-
assert len(arrays) == 11
427-
concat_array = np.concatenate(arrays)
428-
429414
r = get_pipeline("range.json")
430415
count = r.execute()
431416
arrays = r.arrays
432417
assert len(arrays) == 1
433418
array = arrays[0]
434419
assert count == len(array)
435420

436-
np.testing.assert_array_equal(array, concat_array)
421+
for _ in range(10):
422+
arrays = list(r.iterator(chunk_size=100))
423+
assert len(arrays) == 11
424+
concat_array = np.concatenate(arrays)
425+
np.testing.assert_array_equal(array, concat_array)
426+
427+
def test_StopIteration(self):
428+
"""Is StopIteration raised when the iterator is exhausted"""
429+
r = get_pipeline("range.json")
430+
it = r.iterator(chunk_size=100)
431+
for array in it:
432+
assert isinstance(array, np.ndarray)
433+
with pytest.raises(StopIteration):
434+
next(it)
435+
assert next(it, None) is None
437436

438437
def test_metadata(self):
439438
"""Can we fetch PDAL metadata"""
440-
ri = get_pipeline("range.json", chunk_size=100)
441-
with pytest.raises(RuntimeError):
442-
ri.metadata
443-
list(ri)
444-
445439
r = get_pipeline("range.json")
446-
with pytest.raises(RuntimeError):
447-
r.metadata
448440
r.execute()
449441

450-
assert ri.metadata == r.metadata
442+
it = r.iterator(chunk_size=100)
443+
for _ in it:
444+
pass
445+
446+
assert r.metadata == it.metadata
451447

452448
@pytest.mark.xfail
453449
def test_schema(self):
454450
"""Fetching a schema works"""
455-
ri = get_pipeline("range.json", chunk_size=100)
456-
with pytest.raises(RuntimeError):
457-
ri.schema
458-
list(ri)
459-
460451
r = get_pipeline("range.json")
461-
with pytest.raises(RuntimeError):
462-
r.schema
463452
r.execute()
464453

465-
assert ri.schema == r.schema
454+
it = r.iterator(chunk_size=100)
455+
for _ in it:
456+
pass
457+
458+
assert r.schema == it.schema
466459

467460
def test_merged_arrays(self):
468461
"""Can we load data from a list of arrays to PDAL"""
@@ -480,8 +473,7 @@ def test_merged_arrays(self):
480473
p.execute()
481474
non_streaming_array = np.concatenate(p.arrays)
482475
for chunk_size in range(5, 100, 5):
483-
streaming_arrays = list(pdal.Pipeline(filter_intensity, arrays,
484-
chunk_size=chunk_size))
476+
streaming_arrays = list(p.iterator(chunk_size=chunk_size))
485477
np.testing.assert_array_equal(np.concatenate(streaming_arrays),
486478
non_streaming_array)
487479

@@ -492,7 +484,17 @@ def test_premature_exit(self):
492484
assert len(r.arrays) == 1
493485
array = r.arrays[0]
494486

495-
ri = get_pipeline("range.json", chunk_size=100)
496-
for array2 in ri:
497-
np.testing.assert_array_equal(array2, array[:len(array2)])
498-
break
487+
for _ in range(10):
488+
for array2 in r.iterator(chunk_size=100):
489+
np.testing.assert_array_equal(array2, array[:len(array2)])
490+
break
491+
492+
def test_multiple_iterators(self):
493+
"""Can we create multiple independent iterators"""
494+
r = get_pipeline("range.json")
495+
it1 = r.iterator(chunk_size=100)
496+
it2 = r.iterator(chunk_size=100)
497+
for a1, a2 in zip(it1, it2):
498+
np.testing.assert_array_equal(a1, a2)
499+
assert next(it1, None) is None
500+
assert next(it2, None) is None

0 commit comments

Comments
 (0)