diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f9cace..15a4bd4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,6 +98,6 @@ jobs: with: python-version: 3.11 - name: Install code quality tools - run: pip install black + run: pip install black==25.1.0 - name: Check code formatting with black run: black --check . diff --git a/.gitignore b/.gitignore index a185a55..79d3cf1 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ coverage.xml gde/ upg/ docker/ +/examples/output/output-* diff --git a/examples/Dockerfile b/examples/Dockerfile index d58145c..da2cb8f 100644 --- a/examples/Dockerfile +++ b/examples/Dockerfile @@ -1,6 +1,6 @@ FROM nycplanning/docker-geosupport:latest -RUN pip install pandas nyc-parser +RUN pip install pandas nyc-parser tqdm WORKDIR /examples diff --git a/examples/pandas_multiprocessing.py b/examples/pandas_multiprocessing.py index 1fbf660..6ef3b8b 100644 --- a/examples/pandas_multiprocessing.py +++ b/examples/pandas_multiprocessing.py @@ -1,34 +1,42 @@ -# from: https://gist.github.com/ishiland/824ddd386fcd0b90fc55aea573a28b22 -# written by ishiland: https://github.com/ishiland -# Minor edits by torreyma: https://github.com/torreyma -# -from geosupport import Geosupport, GeosupportError -from nycparser import Parser +""" +Example of how to use python-geosupport, Pandas and Multiprocessing to speed up geocoding workflows. +""" + +import os import pandas as pd +from typing import Callable from multiprocessing import Pool, cpu_count from functools import partial -import numpy as np +from tqdm import tqdm # Progress bar -""" -Example of how to use python-geosupport, Pandas and Multiprocessing to speed up geocoding workflows. -""" +from geosupport import Geosupport, GeosupportError +from nycparser import Parser + +# Determine reasonable CPU count (avoid memory issues) +cpus = min(cpu_count(), 8) + +# Proper path handling relative to script location +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +INPUT_CSV = os.path.join(SCRIPT_DIR, "data/input.csv") +OUTPUT_CSV = os.path.join(SCRIPT_DIR, "data/output-pandas-multiprocessing.csv") + +# Create a single global instance g = Geosupport() p = Parser() -cpus = cpu_count() -INPUT_CSV = "/examples/data/input.csv" -OUTPUT_CSV = "/examples/data/output-pandas-multiprocessing.csv" +def geo_by_address(row: pd.Series) -> pd.Series: + """ + Geocodes a pandas row containing address attributes. + Args: + row: Pandas Series with 'Address' and 'Borough' columns -def geo_by_address(row): + Returns: + Pandas Series with lat, lon & Geosupport message. """ - Geocodes a pandas row containing address atributes. - :param row: Pandas Series - :return: Pandas Series with lat, lon & Geosupport message. - """ try: # parse the address to separate PHN and street parsed = p.address(row["Address"]) @@ -40,38 +48,97 @@ def geo_by_address(row): ) lat = result.get("Latitude") lon = result.get("Longitude") - msg = result.get("Message") + msg = result.get("Message", "") except GeosupportError as ge: lat = "" lon = "" msg = str(ge) + except Exception as e: + lat = "" + lon = "" + msg = f"Error: {str(e)}" + return pd.Series([lat, lon, msg]) -def parallelize(data, func, num_of_processes=cpus): - data_split = np.array_split(data, num_of_processes) - pool = Pool(num_of_processes) - data = pd.concat(pool.map(func, data_split)) - pool.close() - pool.join() - return data +def run_on_subset(func: Callable, data_subset: pd.DataFrame) -> pd.DataFrame: + """Apply a function to each row of a dataframe subset""" + return data_subset.apply(func, axis=1) -def run_on_subset(func, data_subset): - return data_subset.apply(func, axis=1) +def parallelize( + data: pd.DataFrame, func: Callable, num_of_processes: int = cpus +) -> pd.DataFrame: + """ + Split dataframe and apply function in parallel + + Args: + data: Input DataFrame + func: Function to apply to each chunk + num_of_processes: Number of parallel processes + Returns: + DataFrame with results + """ + # Create roughly equal sized chunks using pandas methods + splits = [] + chunk_size = max(1, len(data) // num_of_processes) + + # Create chunks without using numpy arrays + for i in range(0, len(data), chunk_size): + splits.append(data.iloc[i : min(i + chunk_size, len(data))].copy()) + + # Use tqdm for progress tracking + with Pool(num_of_processes) as pool: + results = list( + tqdm(pool.imap(func, splits), total=len(splits), desc="Geocoding") + ) -def parallelize_on_rows(data, func, num_of_processes=cpus): - return parallelize(data, partial(run_on_subset, func), num_of_processes) + return pd.concat(results) if __name__ == "__main__": + print(f"Starting geocoding with {cpus} processes") - # read in csv - df = pd.read_csv(INPUT_CSV) + # Process in batches for large datasets + batch_size = 100000 + + # Check if input file exists + if not os.path.exists(INPUT_CSV): + print(f"Error: Input file not found: {INPUT_CSV}") + exit(1) - # add 3 Geosupport columns - Latitude, Longitude and Geosupport message - df[["lat", "lon", "msg"]] = parallelize_on_rows(df, geo_by_address) + # Create output directory if it doesn't exist + os.makedirs(os.path.dirname(OUTPUT_CSV), exist_ok=True) + + # Read the input csv + df = pd.read_csv(INPUT_CSV) + total_rows = len(df) + print(f"Processing {total_rows} addresses") + + # Process in batches if large dataset + if total_rows > batch_size: + for i in range(0, total_rows, batch_size): + print( + f"Processing batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1}" + ) + batch = df.iloc[i : i + batch_size].copy() + + # Geocode the batch + batch[["lat", "lon", "msg"]] = parallelize( + batch, partial(run_on_subset, geo_by_address), num_of_processes=cpus + ) + + # Write each batch (append mode after first batch) + mode = "w" if i == 0 else "a" + header = i == 0 + batch.to_csv(OUTPUT_CSV, mode=mode, header=header, index=False) + print(f"Batch {i//batch_size + 1} complete") + else: + # For small datasets, process all at once + df[["lat", "lon", "msg"]] = parallelize( + df, partial(run_on_subset, geo_by_address), num_of_processes=cpus + ) + df.to_csv(OUTPUT_CSV, index=False) - # output to csv with the 3 new columns. - df.to_csv(OUTPUT_CSV) + print(f"Geocoding complete! Results saved to {OUTPUT_CSV}") diff --git a/setup.py b/setup.py index 4c19eb7..c5d4a4a 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ extras_require={ "dev": [ "coverage", - "black", + "black==25.1.0", ] }, )