Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

- Add listing of GDAL data types and subtypes to `read_info` (#556).
- Add support to read list fields without arrow (#558).
- Improve performance of `read_dataframe`, especially if a filter is used (#577).

### Bug fixes

Expand Down
147 changes: 103 additions & 44 deletions pyogrio/_io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1076,55 +1076,59 @@ cdef get_features(
uint8_t return_fids,
bint datetime_as_string
):

cdef OGRFeatureH ogr_feature = NULL
cdef int n_fields
cdef int i
cdef int field_index

# make sure layer is read from beginning
# Make sure the layer is read from the beginning.
OGR_L_ResetReading(ogr_layer)

if skip_features > 0:
apply_skip_features(ogr_layer, skip_features)

if return_fids:
fid_data = np.empty(shape=(num_features), dtype=np.int64)
fid_view = fid_data[:]
else:
fid_data = None

if read_geometry:
geometries = np.empty(shape=(num_features, ), dtype="object")
geom_view = geometries[:]
# We don't (always) know beforehand how many features there will be read, so
# allocate output arrays as needed (in chunks), and return the concatenated results.
last_chunk_size = 1_000
max_chunk_size = 50_000

else:
geometries = None
if num_features >= 0:
last_chunk_size = num_features

fid_chunks = []
geom_chunks = []
n_fields = fields.shape[0]
field_indexes = fields[:, 0]
field_ogr_types = fields[:, 1]
field_data_chunks: list[list[np.ndarray]] = []

field_data = []
for field_index in range(n_fields):
if datetime_as_string and fields[field_index, 3].startswith("datetime"):
dtype = "object"
elif fields[field_index, 3].startswith("list"):
dtype = "object"
else:
dtype = fields[field_index, 3]

field_data.append(np.empty(shape=(num_features, ), dtype=dtype))
def add_data_chunk(size):
"""Add a new data chunk to the output array lists."""
if return_fids:
fid_chunks.append(np.empty(shape=(size, ), dtype=np.int64))
if read_geometry:
geom_chunks.append(np.empty(shape=(size, ), dtype="object"))

field_data_chunk: list[np.ndarray] = []
for field_index in range(n_fields):
if datetime_as_string and fields[field_index, 3].startswith("datetime"):
dtype = "object"
elif fields[field_index, 3].startswith("list"):
dtype = "object"
else:
dtype = fields[field_index, 3]

field_data_view = [field_data[field_index][:] for field_index in range(n_fields)]
field_data_chunk.append(np.empty(shape=(size, ), dtype=dtype))

if num_features == 0:
return fid_data, geometries, field_data
field_data_chunks.append(field_data_chunk)

i = 0
i_total = 0
last_chunk_index = -1
field_indexes = fields[:, 0]
field_ogr_types = fields[:, 1]

while True:
try:
if num_features > 0 and i == num_features:
if num_features >= 0 and i_total == num_features:
break

try:
Expand All @@ -1135,9 +1139,12 @@ cdef get_features(
break

except CPLE_BaseError as exc:
if "failed to prepare SQL" in str(exc):
raise ValueError(f"Invalid SQL query: {str(exc)}") from None

raise FeatureError(str(exc))

if i >= num_features:
if num_features > 0 and i_total >= num_features:
raise FeatureError(
"GDAL returned more records than expected based on the count of "
"records that may meet your combination of filters against this "
Expand All @@ -1146,34 +1153,81 @@ cdef get_features(
"encountering this error."
) from None

if i_total == 0 or i == last_chunk_size:
# No chunk yet or last chunk filled up: add a new chunk.
i = 0
last_chunk_index += 1
if num_features < 0 and last_chunk_size < max_chunk_size:
# Double chunk size for next chunk, up to max_chunk_size
last_chunk_size = min(last_chunk_size * 2, max_chunk_size)

add_data_chunk(last_chunk_size)
if return_fids:
fid_view = fid_chunks[last_chunk_index][:]
if read_geometry:
geom_view = geom_chunks[last_chunk_index][:]
field_data_view = [
field_data_chunks[last_chunk_index][field_index][:]
for field_index in range(n_fields)
]

if return_fids:
fid_view[i] = OGR_F_GetFID(ogr_feature)

if read_geometry:
process_geometry(ogr_feature, i, geom_view, force_2d)

process_fields(
ogr_feature, i, n_fields, field_data, field_data_view,
field_indexes, field_ogr_types, encoding, datetime_as_string
ogr_feature, i, n_fields, field_data_chunks[last_chunk_index],
field_data_view, field_indexes, field_ogr_types, encoding,
datetime_as_string
)

i += 1
i_total += 1

finally:
if ogr_feature != NULL:
OGR_F_Destroy(ogr_feature)
ogr_feature = NULL

# There may be fewer rows available than expected from OGR_L_GetFeatureCount,
# such as features with bounding boxes that intersect the bbox
# but do not themselves intersect the bbox.
# Empty rows are dropped.
if i < num_features:
# If no features were read, add an empty chunk
if i_total == 0:
add_data_chunk(0)
last_chunk_size = 0
last_chunk_index = 0

# If the last chunk wasn't completely full, drop empty rows
if i < last_chunk_size:
if return_fids:
fid_data = fid_data[:i]
fid_chunks[last_chunk_index] = fid_chunks[last_chunk_index][:i]
if read_geometry:
geometries = geometries[:i]
field_data = [data_field[:i] for data_field in field_data]
geom_chunks[last_chunk_index] = geom_chunks[last_chunk_index][:i]
for field_index in range(n_fields):
field_data_chunks[last_chunk_index][field_index] = (
field_data_chunks[last_chunk_index][field_index][:i]
)

if last_chunk_index == 0:
# Only one chunk, so avoid concatenation
fid_data = fid_chunks[0] if return_fids else None
geom_data = geom_chunks[0] if read_geometry else None

return fid_data, geom_data, field_data_chunks[0]

else:
# Concatenate all chunks
fid_data = np.concatenate(fid_chunks) if return_fids else None
geom_data = np.concatenate(geom_chunks) if read_geometry else None

field_data = []
for field_index in range(n_fields):
data_to_concat = []
for chunk in range(last_chunk_index + 1):
data_to_concat.append(field_data_chunks[chunk][field_index])
field_data.append(np.concatenate(data_to_concat))

return fid_data, geometries, field_data
return fid_data, geom_data, field_data


@cython.boundscheck(False) # Deactivate bounds checking
Expand Down Expand Up @@ -1490,9 +1544,14 @@ def ogr_read(
apply_geometry_filter(ogr_layer, mask)

# Limit feature range to available range
skip_features, num_features = validate_feature_range(
ogr_layer, skip_features, max_features
)
num_features = -1
if max_features > 0:
num_features = max_features

if skip_features > 0:
skip_features, num_features = validate_feature_range(
ogr_layer, skip_features, max_features
)

fid_data, geometries, field_data = get_features(
ogr_layer,
Expand Down
22 changes: 22 additions & 0 deletions pyogrio/tests/test_geopandas_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,28 @@ def test_read_invalid_poly_ring(tmp_path, use_arrow, on_invalid, message, expect
assert df.geometry.iloc[0].wkt == expected_wkt


def test_read_multi_chunks(tmp_path):
"""Test reading a file where multiple chunks are used.

`ogr_read` reads features in chunks of features. Read a suffucient number of
featuers in this test so multiple chunks will be used.
"""
# Create test file with enough features to require multiple chunks.
# > 3000 features will result in 3 chunks.
nb_features = 3300
df = gp.GeoDataFrame(
{"col": [1.0] * nb_features},
geometry=[Point(1, 1)] * nb_features,
crs="EPSG:4326",
)
test_path = tmp_path / "test.gpkg"
write_dataframe(df, test_path)

# Read the test file and compare to original dataframe
result = read_dataframe(test_path, use_arrow=False)
assert_geodataframe_equal(result, df)


def test_read_multisurface(multisurface_file, use_arrow):
if use_arrow:
# TODO: revisit once https://github.com/geopandas/pyogrio/issues/478
Expand Down