Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,5 @@ cython_debug/
.python-version

.DS_Store

testing/
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.15.0
- Update to reflect enhanced `property_v2.search` endpoint.
- Chunking logic for `property_v2.search` endpoint for parcl_property_id requests.

### v1.14.5
- Update `property_v2.search` to fix pagination logic bug.

Expand Down
2 changes: 1 addition & 1 deletion parcllabs/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "1.14.5"
VERSION = "1.15.0"
2 changes: 2 additions & 0 deletions parcllabs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@


ZIP_CODE_LENGTH = 5
PARCL_PROPERTY_IDS_LIMIT = 10000
PARCL_PROPERTY_IDS = "parcl_property_ids"
29 changes: 28 additions & 1 deletion parcllabs/schemas/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class PropertyV2RetrieveParams(BaseModel):
default=None, description="Geographic coordinates with radius to filter by"
)

# Property filters
property_types: list[str] | None = Field(
default=None, description="List of property types to filter by"
)
Expand All @@ -62,6 +61,22 @@ class PropertyV2RetrieveParams(BaseModel):
)

# Event filters
include_events: bool | None = Field(
default=None,
description="""Boolean flag indicating if event data is included in the response.
0 signifies that no event data is returned.
1 signifies that event data is included, scoped
to events that match the event filters supplied in the request
""",
)
include_full_event_history: bool | None = Field(
default=None,
description="""Boolean flag indicating if the full event history is returned for
each property (effective only when include_events is 1).
0 signifies that only events matching the event filters are returned.
1 signifies that the property's entire event history is returned
""",
)
event_names: list[str] | None = Field(
default=None, description="List of event names to filter by"
)
Expand Down Expand Up @@ -100,6 +115,18 @@ class PropertyV2RetrieveParams(BaseModel):
current_on_market_rental_flag: bool | None = Field(
default=None, description="Whether to filter by current on market rental flag"
)
current_new_construction_flag: bool | None = Field(
default=None, description="Whether to filter by current new construction flag"
)
current_owner_occupied_flag: bool | None = Field(
default=None, description="Whether to filter by current owner occupied flag"
)
current_investor_owned_flag: bool | None = Field(
default=None, description="Whether to filter by current investor owned flag"
)
current_entity_owner_name: str | None = Field(
default=None, description="Current entity owner name to filter by"
)

# Pagination
limit: int | None = Field(
Expand Down
177 changes: 164 additions & 13 deletions parcllabs/services/properties/property_v2.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any

import pandas as pd

from parcllabs.common import PARCL_PROPERTY_IDS, PARCL_PROPERTY_IDS_LIMIT
from parcllabs.enums import RequestLimits
from parcllabs.schemas.schemas import PropertyV2RetrieveParamCategories, PropertyV2RetrieveParams
from parcllabs.services.parcllabs_service import ParclLabsService
Expand All @@ -15,31 +17,39 @@ def __init__(self, *args: object, **kwargs: object) -> None:
super().__init__(*args, **kwargs)
self.simple_bool_validator = Validators.validate_input_bool_param_simple

@staticmethod
def _raise_http_error(chunk_num: int, status_code: int, response_preview: str) -> None:
error_msg = f"Chunk {chunk_num} failed: HTTP {status_code}"
raise RuntimeError(f"{error_msg}\nResponse content: {response_preview}...")

@staticmethod
def _raise_empty_response_error(chunk_num: int) -> None:
raise RuntimeError(f"Chunk {chunk_num} failed: Empty response from API")

def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict]:
"""Fetch data using POST request with pagination support."""
response = self._post(url=self.full_post_url, data=data, params=params)
result = response.json()

all_data = [result]
pagination = result.get("pagination")
metadata = result.get("metadata")
all_data = [result]

returned_count = metadata.get("results", {}).get("returned_count", 0)

if pagination:
limit = pagination.get("limit")
returned_count = metadata.get("results", {}).get("returned_count", 0)
if returned_count < limit: # if we got fewer results than requested, don't paginate
return all_data

# If we need to paginate, use concurrent requests
if pagination.get("has_more"):
print("More pages to fetch, paginating additional pages...")
offset = pagination.get("offset")
total_count = metadata.get("results", {}).get("total_available", 0)
total_available = metadata.get("results", {}).get("total_available", 0)

# Calculate how many more pages we need
remaining_pages = (total_count - limit) // limit
if (total_count - limit) % limit > 0:
remaining_pages = (total_available - limit) // limit
if (total_available - limit) % limit > 0:
remaining_pages += 1

# Generate all the URLs we need to fetch
Expand All @@ -66,6 +76,103 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict

return all_data

def _fetch_post_parcl_property_ids(
self,
params: dict[str, Any],
data: dict[str, Any],
) -> list[dict]:
"""Fetch data using POST request with parcl_property_ids, chunking the request

Args:
params: Dictionary of parameters to pass to the request.
data: Dictionary of data to pass to the request.

Returns:
List of dictionaries containing the data from the request.
"""
parcl_property_ids = data.get(PARCL_PROPERTY_IDS)
num_ids = len(parcl_property_ids)
if num_ids <= PARCL_PROPERTY_IDS_LIMIT:
return self._fetch_post(params=params, data=data)

# If we exceed PARCL_PROPERTY_IDS_LIMIT, chunk the request
parcl_property_ids_chunks = [
parcl_property_ids[i : i + PARCL_PROPERTY_IDS_LIMIT]
for i in range(0, num_ids, PARCL_PROPERTY_IDS_LIMIT)
]
num_chunks = len(parcl_property_ids_chunks)

print(f"Fetching {num_chunks} chunks...")

all_data = []
with ThreadPoolExecutor(max_workers=3) as executor:
# Create a copy of data for each chunk to avoid race conditions
future_to_chunk = {}
for idx, chunk in enumerate(parcl_property_ids_chunks):
# Create a copy of data with the specific chunk
chunk_data = data.copy()
chunk_data[PARCL_PROPERTY_IDS] = chunk

# Submit the task
future = executor.submit(
self._post,
url=self.full_post_url,
data=chunk_data,
params=params,
)
future_to_chunk[future] = idx + 1

# Small delay between submissions to avoid rate limiting
if idx < len(parcl_property_ids_chunks) - 1: # Don't delay after the last one
time.sleep(0.1)

# Helper functions to abstract raise statements

# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk_num = future_to_chunk[future]
try:
result = future.result()

# Check HTTP status code
if result.status_code != 200:
response_preview = (
result.text[:200] if result.text else "No response content"
)
self._raise_http_error(chunk_num, result.status_code, response_preview)

# Check if response has content
if not result.text.strip():
self._raise_empty_response_error(chunk_num)

# Try to parse JSON
try:
response = result.json()
all_data.append(response)
print(f"Completed chunk {chunk_num} of {num_chunks}")
except ValueError as json_exc:
response_preview = (
result.text[:200] if result.text else "No response content"
)
raise RuntimeError(
f"Chunk {chunk_num} failed: Invalid JSON - {json_exc}\n"
f"Response content: {response_preview}..."
) from json_exc

except Exception as exc:
# If it's already a RuntimeError from above, re-raise it
if isinstance(exc, RuntimeError):
raise

# For any other unexpected errors, wrap and raise
raise RuntimeError(
f"Chunk {chunk_num} failed with unexpected error: {exc} "
f"(Exception type: {type(exc).__name__})"
) from exc

print(f"All {num_chunks} chunks completed successfully.")
return all_data

def _as_pd_dataframe(self, data: list[Mapping[str, Any]]) -> pd.DataFrame:
"""
Convert API response data to a pandas DataFrame with events as rows
Expand Down Expand Up @@ -163,7 +270,7 @@ def _build_search_criteria(
data["parcl_ids"] = parcl_ids

if parcl_property_ids:
data["parcl_property_ids"] = parcl_property_ids
data[PARCL_PROPERTY_IDS] = parcl_property_ids

if geo_coordinates:
data["geo_coordinates"] = geo_coordinates
Expand Down Expand Up @@ -208,6 +315,10 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
"""Build boolean property filters."""
filters = {}

if params.include_property_details is not None:
filters["include_property_details"] = self.simple_bool_validator(
params.include_property_details
)
if params.current_on_market_flag is not None:
filters["current_on_market_flag"] = self.simple_bool_validator(
params.current_on_market_flag
Expand All @@ -216,9 +327,17 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
filters["current_on_market_rental_flag"] = self.simple_bool_validator(
params.current_on_market_rental_flag
)
if params.include_property_details is not None:
filters["include_property_details"] = self.simple_bool_validator(
params.include_property_details
if params.current_new_construction_flag is not None:
filters["current_new_construction_flag"] = self.simple_bool_validator(
params.current_new_construction_flag
)
if params.current_owner_occupied_flag is not None:
filters["current_owner_occupied_flag"] = self.simple_bool_validator(
params.current_owner_occupied_flag
)
if params.current_investor_owned_flag is not None:
filters["current_investor_owned_flag"] = self.simple_bool_validator(
params.current_investor_owned_flag
)

return filters
Expand All @@ -242,9 +361,13 @@ def _build_property_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
property_type.upper() for property_type in params.property_types
]

# Handle current entity owner name
if params.current_entity_owner_name is not None:
property_filters["current_entity_owner_name"] = params.current_entity_owner_name

return property_filters

def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, Any]:
def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, Any]: # noqa: C901
"""Build event filters from validated Pydantic schema."""
event_filters = {}

Expand All @@ -271,6 +394,12 @@ def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, An
event_filters["is_new_construction"] = self.simple_bool_validator(
params.is_new_construction
)
if params.include_events is not None:
event_filters["include_events"] = self.simple_bool_validator(params.include_events)
if params.include_full_event_history is not None:
event_filters["include_full_event_history"] = self.simple_bool_validator(
params.include_full_event_history
)

return event_filters

Expand Down Expand Up @@ -356,6 +485,12 @@ def retrieve(
is_owner_occupied: bool | None = None,
current_on_market_flag: bool | None = None,
current_on_market_rental_flag: bool | None = None,
current_new_construction_flag: bool | None = None,
current_owner_occupied_flag: bool | None = None,
current_investor_owned_flag: bool | None = None,
current_entity_owner_name: str | None = None,
include_events: bool | None = None,
include_full_event_history: bool | None = None,
limit: int | None = None,
params: Mapping[str, Any] | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
Expand Down Expand Up @@ -393,6 +528,12 @@ def retrieve(
is_owner_occupied: Whether to filter by owner occupied.
current_on_market_flag: Whether to filter by current_on_market flag.
current_on_market_rental_flag: Whether to filter by current_on_market_rental flag.
current_new_construction_flag: Whether to filter by current_new_construction flag.
current_owner_occupied_flag: Whether to filter by current_owner_occupied flag.
current_investor_owned_flag: Whether to filter by current_investor_owned flag.
current_entity_owner_name: Current entity owner name to filter by.
include_events: Whether to include events in the response.
include_full_event_history: Whether to include full event history in the response.
limit: Number of results to return.
params: Additional parameters to pass to the request.
Returns:
Expand Down Expand Up @@ -431,6 +572,12 @@ def retrieve(
is_owner_occupied=is_owner_occupied,
current_on_market_flag=current_on_market_flag,
current_on_market_rental_flag=current_on_market_rental_flag,
current_new_construction_flag=current_new_construction_flag,
current_owner_occupied_flag=current_owner_occupied_flag,
current_investor_owned_flag=current_investor_owned_flag,
current_entity_owner_name=current_entity_owner_name,
include_events=include_events,
include_full_event_history=include_full_event_history,
limit=limit,
params=params or {},
)
Expand All @@ -452,10 +599,14 @@ def retrieve(

# Set limit
request_params = input_params.params.copy()
request_params["limit"] = self._validate_limit(input_params.limit)

# Make request with params
results = self._fetch_post(params=request_params, data=data)
if data.get(PARCL_PROPERTY_IDS):
request_params["limit"] = PARCL_PROPERTY_IDS_LIMIT
results = self._fetch_post_parcl_property_ids(params=request_params, data=data)
else:
request_params["limit"] = self._validate_limit(input_params.limit)
results = self._fetch_post(params=request_params, data=data)

# Get metadata from results
metadata = self._get_metadata(results)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_property_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from parcllabs.common import PARCL_PROPERTY_IDS
from parcllabs.enums import RequestLimits
from parcllabs.schemas.schemas import GeoCoordinates, PropertyV2RetrieveParams
from parcllabs.services.properties.property_v2 import PropertyV2Service
Expand Down Expand Up @@ -51,7 +52,7 @@ def test_build_search_criteria(property_v2_service: PropertyV2Service) -> None:

# Test with parcl_property_ids
criteria = property_v2_service._build_search_criteria(parcl_property_ids=[789, 101])
assert criteria == {"parcl_property_ids": [789, 101]}
assert criteria == {PARCL_PROPERTY_IDS: [789, 101]}

# Test with location
geo_coordinates = {"latitude": 37.7749, "longitude": -122.4194, "radius": 5.0}
Expand All @@ -64,7 +65,7 @@ def test_build_search_criteria(property_v2_service: PropertyV2Service) -> None:
)
assert criteria == {
"parcl_ids": [123],
"parcl_property_ids": [456],
PARCL_PROPERTY_IDS: [456],
"geo_coordinates": geo_coordinates,
}

Expand Down