Skip to content

Commit df21442

Browse files
authored
fix(rust/sedona,python/sedonadb): Ensure empty batches are not included in RecordBatchReader output (#207)
1 parent 5493b8e commit df21442

File tree

3 files changed

+93
-11
lines changed

3 files changed

+93
-11
lines changed

python/sedonadb/src/reader.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,21 @@ impl Iterator for PySedonaStreamReader {
4343
type Item = std::result::Result<RecordBatch, ArrowError>;
4444

4545
fn next(&mut self) -> Option<Self::Item> {
46-
match wait_for_future_from_rust(&self.runtime, self.stream.try_next()) {
47-
Ok(maybe_batch) => match maybe_batch {
48-
Ok(maybe_batch) => maybe_batch.map(Ok),
49-
Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
50-
},
51-
Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
46+
loop {
47+
match wait_for_future_from_rust(&self.runtime, self.stream.try_next()) {
48+
Ok(Ok(maybe_batch)) => match maybe_batch {
49+
Some(batch) => {
50+
if batch.num_rows() == 0 {
51+
continue;
52+
}
53+
54+
return Some(Ok(batch));
55+
}
56+
None => return None,
57+
},
58+
Ok(Err(df_err)) => return Some(Err(ArrowError::ExternalError(Box::new(df_err)))),
59+
Err(py_err) => return Some(Err(ArrowError::ExternalError(Box::new(py_err)))),
60+
}
5261
}
5362
}
5463
}

python/sedonadb/tests/test_dataframe.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
18+
import tempfile
19+
from pathlib import Path
20+
1721
import geoarrow.pyarrow as ga
1822
import geoarrow.types as gat
1923
import geopandas.testing
2024
import pandas as pd
21-
from pathlib import Path
2225
import pyarrow as pa
2326
import pytest
2427
import sedonadb
25-
import tempfile
28+
from sedonadb.testing import skip_if_not_exists
2629

2730

2831
def test_dataframe_from_dataframe(con):
@@ -255,6 +258,35 @@ def test_dataframe_to_arrow(con):
255258
df.to_arrow_table(schema=pa.schema({}))
256259

257260

261+
def test_dataframe_to_arrow_empty_batches(con, geoarrow_data):
262+
# It's difficult to trigger this with a simpler example
263+
# https://github.com/apache/sedona-db/issues/156
264+
path_water_junc = (
265+
geoarrow_data / "ns-water" / "files" / "ns-water_water-junc_geo.parquet"
266+
)
267+
path_water_point = (
268+
geoarrow_data / "ns-water" / "files" / "ns-water_water-point_geo.parquet"
269+
)
270+
skip_if_not_exists(path_water_junc)
271+
skip_if_not_exists(path_water_point)
272+
273+
con.read_parquet(path_water_junc).to_view("junc", overwrite=True)
274+
con.read_parquet(path_water_point).to_view("point", overwrite=True)
275+
con.sql("""SELECT geometry FROM junc WHERE "OBJECTID" = 1814""").to_view(
276+
"junc_filter", overwrite=True
277+
)
278+
279+
joined = con.sql("""
280+
SELECT "OBJECTID", "FEAT_CODE", point.geometry
281+
FROM point
282+
JOIN junc_filter ON ST_DWithin(junc_filter.geometry, point.geometry, 10000)
283+
""")
284+
285+
reader = pa.RecordBatchReader.from_stream(joined)
286+
batch_rows = [len(batch) for batch in reader]
287+
assert batch_rows == [24]
288+
289+
258290
def test_dataframe_to_pandas(con):
259291
# Check with a geometry column
260292
df_with_geo = con.sql("SELECT 1 as one, ST_GeomFromWKT('POINT (0 1)') as geom")

rust/sedona/src/reader.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,20 @@ impl Iterator for SedonaStreamReader {
4141
type Item = std::result::Result<RecordBatch, ArrowError>;
4242

4343
fn next(&mut self) -> Option<Self::Item> {
44-
match self.runtime.block_on(self.stream.try_next()) {
45-
Ok(maybe_batch) => maybe_batch.map(Ok),
46-
Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
44+
loop {
45+
match self.runtime.block_on(self.stream.try_next()) {
46+
Ok(maybe_batch) => match maybe_batch {
47+
Some(batch) => {
48+
if batch.num_rows() == 0 {
49+
continue;
50+
}
51+
52+
return Some(Ok(batch));
53+
}
54+
None => return None,
55+
},
56+
Err(err) => return Some(Err(ArrowError::ExternalError(Box::new(err)))),
57+
}
4758
}
4859
}
4960
}
@@ -57,7 +68,9 @@ impl RecordBatchReader for SedonaStreamReader {
5768
#[cfg(test)]
5869
mod test {
5970

71+
use arrow_array::record_batch;
6072
use arrow_schema::{DataType, Field, Schema};
73+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
6174

6275
use crate::context::SedonaContext;
6376

@@ -82,4 +95,32 @@ mod test {
8295
assert_eq!(reader.next().unwrap().unwrap(), expected_batches[0]);
8396
assert!(reader.next().is_none());
8497
}
98+
99+
#[test]
100+
fn reader_empty_chunks() {
101+
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
102+
103+
let batch0 = record_batch!(
104+
("a", Int32, [1, 2, 3]),
105+
("b", Float64, [Some(4.0), None, Some(5.0)])
106+
)
107+
.expect("created batch");
108+
let schema = batch0.schema();
109+
110+
let batch1 = RecordBatch::new_empty(schema.clone());
111+
let batch2 = batch0.clone();
112+
113+
let stream = futures::stream::iter(vec![
114+
Ok(batch0.clone()),
115+
Ok(batch1.clone()),
116+
Ok(batch2.clone()),
117+
]);
118+
let adapter = RecordBatchStreamAdapter::new(schema, stream);
119+
let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
120+
121+
let mut reader = SedonaStreamReader::new(runtime, batch_stream);
122+
assert_eq!(reader.next().unwrap().unwrap(), batch0);
123+
assert_eq!(reader.next().unwrap().unwrap(), batch2);
124+
assert!(reader.next().is_none());
125+
}
85126
}

0 commit comments

Comments
 (0)