Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,6 @@ jobs:
with:
python-version: 3.11
- name: Install code quality tools
run: pip install black
run: pip install black==25.1.0
- name: Check code formatting with black
run: black --check .
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ coverage.xml
gde/
upg/
docker/
/examples/output/output-*
2 changes: 1 addition & 1 deletion examples/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM nycplanning/docker-geosupport:latest

RUN pip install pandas nyc-parser
RUN pip install pandas nyc-parser tqdm

WORKDIR /examples

Expand Down
139 changes: 103 additions & 36 deletions examples/pandas_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,42 @@
# from: https://gist.github.com/ishiland/824ddd386fcd0b90fc55aea573a28b22
# written by ishiland: https://github.com/ishiland
# Minor edits by torreyma: https://github.com/torreyma
#
from geosupport import Geosupport, GeosupportError
from nycparser import Parser
"""
Example of how to use python-geosupport, Pandas and Multiprocessing to speed up geocoding workflows.
"""

import os
import pandas as pd
from typing import Callable
from multiprocessing import Pool, cpu_count
from functools import partial
import numpy as np
from tqdm import tqdm # Progress bar

"""
Example of how to use python-geosupport, Pandas and Multiprocessing to speed up geocoding workflows.
"""
from geosupport import Geosupport, GeosupportError
from nycparser import Parser

# Determine reasonable CPU count (avoid memory issues)
cpus = min(cpu_count(), 8)

# Proper path handling relative to script location
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_CSV = os.path.join(SCRIPT_DIR, "data/input.csv")
OUTPUT_CSV = os.path.join(SCRIPT_DIR, "data/output-pandas-multiprocessing.csv")


# Create a single global instance
g = Geosupport()
p = Parser()

cpus = cpu_count()

INPUT_CSV = "/examples/data/input.csv"
OUTPUT_CSV = "/examples/data/output-pandas-multiprocessing.csv"
def geo_by_address(row: pd.Series) -> pd.Series:
"""
Geocodes a pandas row containing address attributes.

Args:
row: Pandas Series with 'Address' and 'Borough' columns

def geo_by_address(row):
Returns:
Pandas Series with lat, lon & Geosupport message.
"""
Geocodes a pandas row containing address atributes.

:param row: Pandas Series
:return: Pandas Series with lat, lon & Geosupport message.
"""
try:
# parse the address to separate PHN and street
parsed = p.address(row["Address"])
Expand All @@ -40,38 +48,97 @@ def geo_by_address(row):
)
lat = result.get("Latitude")
lon = result.get("Longitude")
msg = result.get("Message")
msg = result.get("Message", "")
except GeosupportError as ge:
lat = ""
lon = ""
msg = str(ge)
except Exception as e:
lat = ""
lon = ""
msg = f"Error: {str(e)}"

return pd.Series([lat, lon, msg])


def parallelize(data, func, num_of_processes=cpus):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func: Callable, data_subset: pd.DataFrame) -> pd.DataFrame:
"""Apply a function to each row of a dataframe subset"""
return data_subset.apply(func, axis=1)


def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize(
data: pd.DataFrame, func: Callable, num_of_processes: int = cpus
) -> pd.DataFrame:
"""
Split dataframe and apply function in parallel

Args:
data: Input DataFrame
func: Function to apply to each chunk
num_of_processes: Number of parallel processes

Returns:
DataFrame with results
"""
# Create roughly equal sized chunks using pandas methods
splits = []
chunk_size = max(1, len(data) // num_of_processes)

# Create chunks without using numpy arrays
for i in range(0, len(data), chunk_size):
splits.append(data.iloc[i : min(i + chunk_size, len(data))].copy())

# Use tqdm for progress tracking
with Pool(num_of_processes) as pool:
results = list(
tqdm(pool.imap(func, splits), total=len(splits), desc="Geocoding")
)

def parallelize_on_rows(data, func, num_of_processes=cpus):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
return pd.concat(results)


if __name__ == "__main__":
print(f"Starting geocoding with {cpus} processes")

# read in csv
df = pd.read_csv(INPUT_CSV)
# Process in batches for large datasets
batch_size = 100000

# Check if input file exists
if not os.path.exists(INPUT_CSV):
print(f"Error: Input file not found: {INPUT_CSV}")
exit(1)

# add 3 Geosupport columns - Latitude, Longitude and Geosupport message
df[["lat", "lon", "msg"]] = parallelize_on_rows(df, geo_by_address)
# Create output directory if it doesn't exist
os.makedirs(os.path.dirname(OUTPUT_CSV), exist_ok=True)

# Read the input csv
df = pd.read_csv(INPUT_CSV)
total_rows = len(df)
print(f"Processing {total_rows} addresses")

# Process in batches if large dataset
if total_rows > batch_size:
for i in range(0, total_rows, batch_size):
print(
f"Processing batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1}"
)
batch = df.iloc[i : i + batch_size].copy()

# Geocode the batch
batch[["lat", "lon", "msg"]] = parallelize(
batch, partial(run_on_subset, geo_by_address), num_of_processes=cpus
)

# Write each batch (append mode after first batch)
mode = "w" if i == 0 else "a"
header = i == 0
batch.to_csv(OUTPUT_CSV, mode=mode, header=header, index=False)
print(f"Batch {i//batch_size + 1} complete")
else:
# For small datasets, process all at once
df[["lat", "lon", "msg"]] = parallelize(
df, partial(run_on_subset, geo_by_address), num_of_processes=cpus
)
df.to_csv(OUTPUT_CSV, index=False)

# output to csv with the 3 new columns.
df.to_csv(OUTPUT_CSV)
print(f"Geocoding complete! Results saved to {OUTPUT_CSV}")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
extras_require={
"dev": [
"coverage",
"black",
"black==25.1.0",
]
},
)