Skip to content

Commit 28fb1de

Browse files
Standardize input-data region aggregation (tract) (#301)
* changed summary stats geoparquet filepaths from output to intermediate * changed tract aggregation to match block/county approach * revert config * updated run cmd * typo nit
1 parent 0c8e613 commit 28fb1de

File tree

1 file changed

+27
-93
lines changed

1 file changed

+27
-93
lines changed

ocr/input_datasets/vector/census_tiger.py

Lines changed: 27 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
import coiled
1515
import geopandas as gpd
16+
import pandas as pd
17+
from tqdm import tqdm
1618

1719
from ocr.console import console
1820
from ocr.input_datasets.base import BaseDatasetProcessor, InputDatasetConfig
19-
from ocr.utils import apply_s3_creds, install_load_extensions
2021

2122
# FIPS codes for CONUS states + DC (excludes Alaska and Hawaii)
2223
FIPS_CODES: dict[str, str] = {
@@ -149,19 +150,9 @@ def s3_blocks_key(self) -> str:
149150
return f'{self.config.base_prefix}/vector/{self.dataset_name}/blocks/blocks.parquet'
150151

151152
@property
152-
def s3_tracts_base(self) -> str:
153-
"""S3 base path for tracts (contains FIPS/ subdirectory)."""
154-
return f'{self.config.base_prefix}/vector/{self.dataset_name}/tracts'
155-
156-
@property
157-
def s3_tracts_fips_prefix(self) -> str:
158-
"""S3 prefix for per-state tract files."""
159-
return f'{self.s3_tracts_base}/FIPS'
160-
161-
@property
162-
def s3_tracts_aggregated_key(self) -> str:
163-
"""S3 key for aggregated tracts parquet file."""
164-
return f'{self.s3_tracts_base}/tracts.parquet'
153+
def s3_tracts_key(self) -> str:
154+
"""S3 base path for tracts parquet file."""
155+
return f'{self.config.base_prefix}/vector/{self.dataset_name}/tracts/tracts.parquet'
165156

166157
@property
167158
def s3_counties_key(self) -> str:
@@ -182,9 +173,6 @@ def _process_blocks(
182173
dry_run: bool = False,
183174
) -> None:
184175
"""Process Census blocks for all states into a single GeoParquet file."""
185-
import pandas as pd
186-
from tqdm import tqdm
187-
188176
from ocr.console import console
189177

190178
console.log(f'Processing Census blocks (TIGER {blocks_version})...')
@@ -215,76 +203,42 @@ def _process_blocks(
215203
console.log(f'Successfully wrote blocks to {output_s3_uri}')
216204

217205
@staticmethod
218-
def _process_tracts_per_state(
206+
def _process_tracts(
219207
fips_codes: dict[str, str],
220208
tracts_version: str,
221-
output_prefix: str,
209+
output_s3_uri: str,
222210
dry_run: bool = False,
223211
) -> None:
224-
"""Process Census tracts for each state into separate GeoParquet files."""
225-
from tqdm import tqdm
212+
"""Process Census blocks for all states into a single GeoParquet file."""
226213

227214
from ocr.console import console
228215

229-
console.log(f'Processing Census tracts per state (TIGER {tracts_version})...')
216+
console.log(f'Processing Census tracts (TIGER {tracts_version})...')
230217

231218
if dry_run:
232219
console.log(f'[DRY RUN] Would download tracts for {len(fips_codes)} states')
233-
console.log(f'[DRY RUN] Would write per-state files to {output_prefix}/FIPS_*.parquet')
220+
console.log(f'[DRY RUN] Would write to {output_s3_uri}')
234221
return
235222

223+
gdfs = []
236224
for state, fips in tqdm(fips_codes.items(), desc='Processing tracts'):
237225
tract_url = f'https://www2.census.gov/geo/tiger/TIGER{tracts_version}/TRACT/tl_{tracts_version}_{fips}_tract.zip'
238-
output_path = f'{output_prefix}/FIPS_{fips}.parquet'
239-
240226
console.log(f'Reading {state} tracts from {tract_url}')
241227
gdf = gpd.read_file(tract_url)
228+
gdfs.append(gdf)
242229

243-
console.log(f'Writing to {output_path}')
244-
gdf.to_parquet(
245-
output_path,
246-
compression='zstd',
247-
geometry_encoding='WKB',
248-
write_covering_bbox=True,
249-
schema_version='1.1.0',
250-
)
251-
252-
console.log(f'Successfully wrote {len(fips_codes)} per-state tract files')
253-
254-
@staticmethod
255-
def _aggregate_tracts(
256-
input_glob: str,
257-
output_s3_uri: str,
258-
dry_run: bool = False,
259-
) -> None:
260-
"""Aggregate per-state tract files into a single GeoParquet file using DuckDB."""
261-
import duckdb
262-
263-
from ocr.console import console
264-
265-
console.log('Aggregating per-state tract files...')
266-
267-
if dry_run:
268-
console.log(f'[DRY RUN] Would aggregate {input_glob}')
269-
console.log(f'[DRY RUN] Would write to {output_s3_uri}')
270-
return
230+
console.log('Combining all tracts into single GeoDataFrame.')
231+
combined_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
271232

272-
install_load_extensions()
273-
apply_s3_creds()
274-
275-
console.log(f'Aggregating {input_glob} -> {output_s3_uri}')
276-
duckdb.query(
277-
f"""
278-
COPY (
279-
SELECT * FROM read_parquet('{input_glob}')
280-
) TO '{output_s3_uri}' (
281-
FORMAT 'parquet',
282-
COMPRESSION 'zstd',
283-
OVERWRITE_OR_IGNORE true
284-
)
285-
"""
233+
console.log(f'Writing {len(combined_gdf)} tracts to {output_s3_uri}')
234+
combined_gdf.to_parquet(
235+
output_s3_uri,
236+
compression='zstd',
237+
geometry_encoding='WKB',
238+
write_covering_bbox=True,
239+
schema_version='1.1.0',
286240
)
287-
console.log('Successfully aggregated tracts')
241+
console.log(f'Successfully wrote tracts to {output_s3_uri}')
288242

289243
@staticmethod
290244
def _process_counties(
@@ -355,38 +309,18 @@ def process(self) -> None:
355309

356310
if self.geography_type in ('tracts', 'all'):
357311
if client:
358-
console.log('Submitting tract processing to Coiled cluster...')
359-
# Step 1: Process per-state tract files
312+
console.log('Submitting tracts processing to Coiled cluster...')
360313
future = client.submit(
361-
self._process_tracts_per_state,
362-
fips_codes=fips_codes,
314+
self._process_tracts,
363315
tracts_version=self.version,
364-
output_prefix=f's3://{self.config.s3_bucket}/{self.s3_tracts_fips_prefix}',
316+
output_s3_uri=f's3://{self.config.s3_bucket}/{self.s3_tracts_key}',
365317
dry_run=self.dry_run,
366318
)
367319
future.result()
368320
else:
369-
self._process_tracts_per_state(
370-
fips_codes=fips_codes,
321+
self._process_tracts(
371322
tracts_version=self.version,
372-
output_prefix=f's3://{self.config.s3_bucket}/{self.s3_tracts_fips_prefix}',
373-
dry_run=self.dry_run,
374-
)
375-
376-
# Step 2: Aggregate all tract files
377-
if client:
378-
console.log('Submitting tract aggregation to Coiled cluster...')
379-
future = client.submit(
380-
self._aggregate_tracts,
381-
input_glob=f's3://{self.config.s3_bucket}/{self.s3_tracts_fips_prefix}/*.parquet',
382-
output_s3_uri=f's3://{self.config.s3_bucket}/{self.s3_tracts_aggregated_key}',
383-
dry_run=self.dry_run,
384-
)
385-
future.result()
386-
else:
387-
self._aggregate_tracts(
388-
input_glob=f's3://{self.config.s3_bucket}/{self.s3_tracts_fips_prefix}/*.parquet',
389-
output_s3_uri=f's3://{self.config.s3_bucket}/{self.s3_tracts_aggregated_key}',
323+
output_s3_uri=f's3://{self.config.s3_bucket}/{self.s3_tracts_key}',
390324
dry_run=self.dry_run,
391325
)
392326

0 commit comments

Comments
 (0)