Skip to content

Commit 01458d7

Browse files
authored
Reset DuckDB connection between operations to prevent OOM in building partitioning (#313)
1 parent 6e9bb73 commit 01458d7

File tree

1 file changed

+34
-30
lines changed

1 file changed

+34
-30
lines changed

ocr/pipeline/partition.py

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,55 +18,59 @@ def partition_buildings_by_geography(config: OCRConfig):
1818
install_load_extensions(aws=needs_s3, spatial=True, httpfs=True, con=connection)
1919
apply_s3_creds(region='us-west-2', con=connection)
2020

21-
if config.debug:
22-
console.log(f'Loading buildings data from: {path}')
23-
24-
connection.execute(f"""
25-
SET preserve_insertion_order=false;
26-
CREATE TEMP TABLE buildings_temp AS
27-
SELECT
28-
*,
29-
SUBSTRING(GEOID, 1, 2) AS state_fips,
30-
SUBSTRING(GEOID, 3, 3) AS county_fips
31-
FROM '{path}';
32-
""")
21+
consolidated_buildings_parquet = (
22+
f'{config.vector.building_geoparquet_uri.parent / "consolidated-buildings.parquet"}'
23+
)
3324

3425
if config.debug:
35-
console.log(f'Partitioning geoparquet regions to: {output_path}')
26+
console.log(f'Creating a consolidated parquet file at: {consolidated_buildings_parquet}')
3627

3728
connection.execute(f"""
38-
COPY buildings_temp
39-
TO '{output_path}' (
29+
SET preserve_insertion_order=false;
30+
COPY (
31+
SELECT *
32+
FROM '{path}'
33+
)
34+
TO '{consolidated_buildings_parquet}'
35+
(
4036
FORMAT 'parquet',
41-
PARTITION_BY (state_fips, county_fips),
4237
COMPRESSION 'zstd',
4338
OVERWRITE_OR_IGNORE true
4439
);""")
4540

4641
if config.debug:
47-
console.log(f'Partitioned buildings written to: {output_path}')
42+
console.log(f'Consolidated buildings written to: {consolidated_buildings_parquet}')
4843

49-
consolidated_buildings_parquet = (
50-
f'{config.vector.building_geoparquet_uri.parent / "consolidated-buildings.parquet"}'
51-
)
44+
connection.close()
5245

5346
if config.debug:
54-
console.log(f'Creating a consolidated parquet file at: {consolidated_buildings_parquet}')
47+
console.log('Reconnecting to database for partitioned parquet creation')
48+
49+
connection = duckdb.connect(database=':memory:')
50+
install_load_extensions(aws=needs_s3, spatial=True, httpfs=True, con=connection)
51+
apply_s3_creds(region='us-west-2', con=connection)
52+
53+
if config.debug:
54+
console.log(f'Partitioning geoparquet regions from: {path}')
5555

5656
connection.execute(f"""
57+
SET preserve_insertion_order=false;
5758
COPY (
58-
SELECT * EXCLUDE (state_fips, county_fips)
59-
FROM buildings_temp
59+
SELECT *
60+
FROM (
61+
SELECT
62+
*,
63+
SUBSTRING(GEOID, 1, 2) AS state_fips,
64+
SUBSTRING(GEOID, 3, 3) AS county_fips
65+
FROM '{path}'
6066
)
61-
TO '{consolidated_buildings_parquet}'
62-
(
67+
)
68+
TO '{output_path}' (
6369
FORMAT 'parquet',
70+
PARTITION_BY (state_fips, county_fips),
6471
COMPRESSION 'zstd',
65-
OVERWRITE_OR_IGNORE true,
66-
ROW_GROUP_SIZE 10000000
72+
OVERWRITE_OR_IGNORE true
6773
);""")
6874

6975
if config.debug:
70-
console.log(f'Consolidated buildings written to: {consolidated_buildings_parquet}')
71-
72-
connection.execute('DROP TABLE buildings_temp')
76+
console.log(f'Partitioned buildings written to: {output_path}')

0 commit comments

Comments
 (0)