diff --git a/CHANGES.md b/CHANGES.md index ee4a01f5..ad9af7d8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ - Add listing of GDAL data types and subtypes to `read_info` (#556). - Add support to read list fields without arrow (#558). +- Improve performance of `read_dataframe`, especially if a filter is used (#577). ### Bug fixes diff --git a/pyogrio/_io.pyx b/pyogrio/_io.pyx index a4aed843..67d939da 100644 --- a/pyogrio/_io.pyx +++ b/pyogrio/_io.pyx @@ -1076,55 +1076,59 @@ cdef get_features( uint8_t return_fids, bint datetime_as_string ): - cdef OGRFeatureH ogr_feature = NULL cdef int n_fields cdef int i cdef int field_index - # make sure layer is read from beginning + # Make sure the layer is read from the beginning. OGR_L_ResetReading(ogr_layer) if skip_features > 0: apply_skip_features(ogr_layer, skip_features) - if return_fids: - fid_data = np.empty(shape=(num_features), dtype=np.int64) - fid_view = fid_data[:] - else: - fid_data = None - - if read_geometry: - geometries = np.empty(shape=(num_features, ), dtype="object") - geom_view = geometries[:] + # We don't (always) know beforehand how many features there will be read, so + # allocate output arrays as needed (in chunks), and return the concatenated results. + last_chunk_size = 1_000 + max_chunk_size = 50_000 - else: - geometries = None + if num_features >= 0: + last_chunk_size = num_features + fid_chunks = [] + geom_chunks = [] n_fields = fields.shape[0] - field_indexes = fields[:, 0] - field_ogr_types = fields[:, 1] + field_data_chunks: list[list[np.ndarray]] = [] - field_data = [] - for field_index in range(n_fields): - if datetime_as_string and fields[field_index, 3].startswith("datetime"): - dtype = "object" - elif fields[field_index, 3].startswith("list"): - dtype = "object" - else: - dtype = fields[field_index, 3] - - field_data.append(np.empty(shape=(num_features, ), dtype=dtype)) + def add_data_chunk(size): + """Add a new data chunk to the output array lists.""" + if return_fids: + fid_chunks.append(np.empty(shape=(size, ), dtype=np.int64)) + if read_geometry: + geom_chunks.append(np.empty(shape=(size, ), dtype="object")) + + field_data_chunk: list[np.ndarray] = [] + for field_index in range(n_fields): + if datetime_as_string and fields[field_index, 3].startswith("datetime"): + dtype = "object" + elif fields[field_index, 3].startswith("list"): + dtype = "object" + else: + dtype = fields[field_index, 3] - field_data_view = [field_data[field_index][:] for field_index in range(n_fields)] + field_data_chunk.append(np.empty(shape=(size, ), dtype=dtype)) - if num_features == 0: - return fid_data, geometries, field_data + field_data_chunks.append(field_data_chunk) i = 0 + i_total = 0 + last_chunk_index = -1 + field_indexes = fields[:, 0] + field_ogr_types = fields[:, 1] + while True: try: - if num_features > 0 and i == num_features: + if num_features >= 0 and i_total == num_features: break try: @@ -1135,9 +1139,12 @@ cdef get_features( break except CPLE_BaseError as exc: + if "failed to prepare SQL" in str(exc): + raise ValueError(f"Invalid SQL query: {str(exc)}") from None + raise FeatureError(str(exc)) - if i >= num_features: + if num_features > 0 and i_total >= num_features: raise FeatureError( "GDAL returned more records than expected based on the count of " "records that may meet your combination of filters against this " @@ -1146,6 +1153,24 @@ cdef get_features( "encountering this error." ) from None + if i_total == 0 or i == last_chunk_size: + # No chunk yet or last chunk filled up: add a new chunk. + i = 0 + last_chunk_index += 1 + if num_features < 0 and last_chunk_size < max_chunk_size: + # Double chunk size for next chunk, up to max_chunk_size + last_chunk_size = min(last_chunk_size * 2, max_chunk_size) + + add_data_chunk(last_chunk_size) + if return_fids: + fid_view = fid_chunks[last_chunk_index][:] + if read_geometry: + geom_view = geom_chunks[last_chunk_index][:] + field_data_view = [ + field_data_chunks[last_chunk_index][field_index][:] + for field_index in range(n_fields) + ] + if return_fids: fid_view[i] = OGR_F_GetFID(ogr_feature) @@ -1153,27 +1178,56 @@ cdef get_features( process_geometry(ogr_feature, i, geom_view, force_2d) process_fields( - ogr_feature, i, n_fields, field_data, field_data_view, - field_indexes, field_ogr_types, encoding, datetime_as_string + ogr_feature, i, n_fields, field_data_chunks[last_chunk_index], + field_data_view, field_indexes, field_ogr_types, encoding, + datetime_as_string ) + i += 1 + i_total += 1 + finally: if ogr_feature != NULL: OGR_F_Destroy(ogr_feature) ogr_feature = NULL - # There may be fewer rows available than expected from OGR_L_GetFeatureCount, - # such as features with bounding boxes that intersect the bbox - # but do not themselves intersect the bbox. - # Empty rows are dropped. - if i < num_features: + # If no features were read, add an empty chunk + if i_total == 0: + add_data_chunk(0) + last_chunk_size = 0 + last_chunk_index = 0 + + # If the last chunk wasn't completely full, drop empty rows + if i < last_chunk_size: if return_fids: - fid_data = fid_data[:i] + fid_chunks[last_chunk_index] = fid_chunks[last_chunk_index][:i] if read_geometry: - geometries = geometries[:i] - field_data = [data_field[:i] for data_field in field_data] + geom_chunks[last_chunk_index] = geom_chunks[last_chunk_index][:i] + for field_index in range(n_fields): + field_data_chunks[last_chunk_index][field_index] = ( + field_data_chunks[last_chunk_index][field_index][:i] + ) + + if last_chunk_index == 0: + # Only one chunk, so avoid concatenation + fid_data = fid_chunks[0] if return_fids else None + geom_data = geom_chunks[0] if read_geometry else None + + return fid_data, geom_data, field_data_chunks[0] + + else: + # Concatenate all chunks + fid_data = np.concatenate(fid_chunks) if return_fids else None + geom_data = np.concatenate(geom_chunks) if read_geometry else None + + field_data = [] + for field_index in range(n_fields): + data_to_concat = [] + for chunk in range(last_chunk_index + 1): + data_to_concat.append(field_data_chunks[chunk][field_index]) + field_data.append(np.concatenate(data_to_concat)) - return fid_data, geometries, field_data + return fid_data, geom_data, field_data @cython.boundscheck(False) # Deactivate bounds checking @@ -1490,9 +1544,14 @@ def ogr_read( apply_geometry_filter(ogr_layer, mask) # Limit feature range to available range - skip_features, num_features = validate_feature_range( - ogr_layer, skip_features, max_features - ) + num_features = -1 + if max_features > 0: + num_features = max_features + + if skip_features > 0: + skip_features, num_features = validate_feature_range( + ogr_layer, skip_features, max_features + ) fid_data, geometries, field_data = get_features( ogr_layer, diff --git a/pyogrio/tests/test_geopandas_io.py b/pyogrio/tests/test_geopandas_io.py index e9a71aac..714174c2 100644 --- a/pyogrio/tests/test_geopandas_io.py +++ b/pyogrio/tests/test_geopandas_io.py @@ -2071,6 +2071,28 @@ def test_read_invalid_poly_ring(tmp_path, use_arrow, on_invalid, message, expect assert df.geometry.iloc[0].wkt == expected_wkt +def test_read_multi_chunks(tmp_path): + """Test reading a file where multiple chunks are used. + + `ogr_read` reads features in chunks of features. Read a suffucient number of + featuers in this test so multiple chunks will be used. + """ + # Create test file with enough features to require multiple chunks. + # > 3000 features will result in 3 chunks. + nb_features = 3300 + df = gp.GeoDataFrame( + {"col": [1.0] * nb_features}, + geometry=[Point(1, 1)] * nb_features, + crs="EPSG:4326", + ) + test_path = tmp_path / "test.gpkg" + write_dataframe(df, test_path) + + # Read the test file and compare to original dataframe + result = read_dataframe(test_path, use_arrow=False) + assert_geodataframe_equal(result, df) + + def test_read_multisurface(multisurface_file, use_arrow): if use_arrow: # TODO: revisit once https://github.com/geopandas/pyogrio/issues/478