Skip to content

Commit 799276a

Browse files
committed
Fix determining when the execution is really done in the presence of multiple readers
1 parent a9c962b commit 799276a

File tree

3 files changed

+15
-22
lines changed

3 files changed

+15
-22
lines changed

pdal/StreamableExecutor.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -199,25 +199,18 @@ void PythonPointTable::reset()
199199
if (np)
200200
{
201201
m_arrays.push(m_curArray);
202-
m_curArray = nullptr;
202+
m_curArray = py_createArray();
203+
m_producedCv.notify_one();
203204
}
204-
205-
bool done = numPoints() < m_limit;
206-
207-
// If we just pushed the last chunk, push a nullptr so that a reader knows.
208-
if (done)
209-
m_arrays.push(nullptr);
210-
m_producedCv.notify_one();
211-
212-
if (done)
213-
return;
214-
215205
while (m_arrays.size() > m_prefetch)
216206
m_consumedCv.wait(l);
217207
}
208+
}
218209

219-
// Make a new array for data.
220-
m_curArray = py_createArray();
210+
void PythonPointTable::done()
211+
{
212+
m_arrays.push(nullptr);
213+
m_producedCv.notify_one();
221214
}
222215

223216
PyArrayObject *PythonPointTable::fetchArray()
@@ -267,6 +260,7 @@ PyArrayObject *StreamableExecutor::executeNext()
267260
m_thread.reset(new std::thread([this]()
268261
{
269262
m_manager.executeStream(m_table);
263+
m_table.done();
270264
}));
271265
}
272266

pdal/StreamableExecutor.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class PythonPointTable : public StreamPointTable
5959
~PythonPointTable();
6060

6161
virtual void finalize();
62+
void done();
6263
PyArrayObject *fetchArray();
6364

6465
protected:

test/test_pipeline.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,6 @@ def test_schema(self):
464464

465465
assert ri.schema == r.schema
466466

467-
@pytest.mark.skip("segfaults")
468467
def test_merged_arrays(self):
469468
"""Can we load data from a list of arrays to PDAL"""
470469
data = np.load(os.path.join(DATADIRECTORY, "test3d.npy"))
@@ -477,12 +476,11 @@ def test_merged_arrays(self):
477476
}
478477
]
479478
}"""
480-
arrays1 = list(pdal.Pipeline(filter_intensity, arrays, chunk_size=100))
481-
482479
p = pdal.Pipeline(filter_intensity, arrays)
483480
p.execute()
484-
arrays2 = p.arrays
485-
486-
assert len(arrays1) == len(arrays2)
487-
for array1, array2 in zip(arrays1, arrays2):
488-
np.testing.assert_array_equal(array1, array2)
481+
non_streaming_array = np.concatenate(p.arrays)
482+
for chunk_size in range(5, 100, 5):
483+
streaming_arrays = list(pdal.Pipeline(filter_intensity, arrays,
484+
chunk_size=chunk_size))
485+
np.testing.assert_array_equal(np.concatenate(streaming_arrays),
486+
non_streaming_array)

0 commit comments

Comments
 (0)