Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,5 @@ cython_debug/
.python-version

.DS_Store

testing/
2 changes: 1 addition & 1 deletion parcllabs/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "1.14.5"
VERSION = "1.15.0"
2 changes: 2 additions & 0 deletions parcllabs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@


ZIP_CODE_LENGTH = 5
PARCL_PROPERTY_IDS_LIMIT = 10000
PARCL_PROPERTY_IDS = "parcl_property_ids"
25 changes: 24 additions & 1 deletion parcllabs/schemas/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class PropertyV2RetrieveParams(BaseModel):
default=None, description="Geographic coordinates with radius to filter by"
)

# Property filters
property_types: list[str] | None = Field(
default=None, description="List of property types to filter by"
)
Expand All @@ -62,6 +61,18 @@ class PropertyV2RetrieveParams(BaseModel):
)

# Event filters
include_events: bool | None = Field(
default=None,
description="""Boolean flag indicating if event data is included in the response.
0 signifies that no event data is returned.
1 signifies that event data is included, scoped to events that match the event filters supplied in the request""",
)
include_full_event_history: bool | None = Field(
default=None,
description="""Boolean flag indicating if the full event history is returned for each property (effective only when include_events is 1).
0 signifies that only events matching the event filters are returned.
1 signifies that the property's entire event history is returned""",
)
event_names: list[str] | None = Field(
default=None, description="List of event names to filter by"
)
Expand Down Expand Up @@ -100,6 +111,18 @@ class PropertyV2RetrieveParams(BaseModel):
current_on_market_rental_flag: bool | None = Field(
default=None, description="Whether to filter by current on market rental flag"
)
current_new_construction_flag: bool | None = Field(
default=None, description="Whether to filter by current new construction flag"
)
current_owner_occupied_flag: bool | None = Field(
default=None, description="Whether to filter by current owner occupied flag"
)
current_investor_owned_flag: bool | None = Field(
default=None, description="Whether to filter by current investor owned flag"
)
current_entity_owner_name: str | None = Field(
default=None, description="Current entity owner name to filter by"
)

# Pagination
limit: int | None = Field(
Expand Down
147 changes: 135 additions & 12 deletions parcllabs/services/properties/property_v2.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any

import pandas as pd

from parcllabs.common import PARCL_PROPERTY_IDS_LIMIT, PARCL_PROPERTY_IDS
from parcllabs.enums import RequestLimits
from parcllabs.schemas.schemas import PropertyV2RetrieveParamCategories, PropertyV2RetrieveParams
from parcllabs.services.parcllabs_service import ParclLabsService
Expand All @@ -20,26 +22,25 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict
response = self._post(url=self.full_post_url, data=data, params=params)
result = response.json()

all_data = [result]
pagination = result.get("pagination")
metadata = result.get("metadata")
all_data = [result]

returned_count = metadata.get("results", {}).get("returned_count", 0)

if pagination:
limit = pagination.get("limit")
returned_count = metadata.get("results", {}).get("returned_count", 0)
if returned_count < limit: # if we got fewer results than requested, don't paginate
return all_data

# If we need to paginate, use concurrent requests
if pagination.get("has_more"):
print("More pages to fetch, paginating additional pages...")
offset = pagination.get("offset")
total_count = metadata.get("results", {}).get("total_available", 0)
total_available = metadata.get("results", {}).get("total_available", 0)

# Calculate how many more pages we need
remaining_pages = (total_count - limit) // limit
if (total_count - limit) % limit > 0:
remaining_pages = (total_available - limit) // limit
if (total_available - limit) % limit > 0:
remaining_pages += 1

# Generate all the URLs we need to fetch
Expand All @@ -66,6 +67,84 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict

return all_data

def _fetch_post_parcl_property_ids(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict]:
"""Fetch data using POST request with parcl_property_ids, chunking the request

Args:
params: Dictionary of parameters to pass to the request.
data: Dictionary of data to pass to the request.

Returns:
List of dictionaries containing the data from the request.
"""
parcl_property_ids = data.get(PARCL_PROPERTY_IDS)
num_ids = len(parcl_property_ids)
if num_ids <= PARCL_PROPERTY_IDS_LIMIT:
return self._fetch_post(params=params, data=data)

# If we exceed PARCL_PROPERTY_IDS_LIMIT, chunk the request
parcl_property_ids_chunks = [
parcl_property_ids[i:i + PARCL_PROPERTY_IDS_LIMIT] for i in range(0, num_ids, PARCL_PROPERTY_IDS_LIMIT)
]
num_chunks = len(parcl_property_ids_chunks)

print(f"Fetching {num_chunks} chunks...")

all_data = []
with ThreadPoolExecutor(max_workers=3) as executor:
# Create a copy of data for each chunk to avoid race conditions
future_to_chunk = {}
for idx, chunk in enumerate(parcl_property_ids_chunks):
# Create a copy of data with the specific chunk
chunk_data = data.copy()
chunk_data[PARCL_PROPERTY_IDS] = chunk

# Submit the task
future = executor.submit(self._post, url=self.full_post_url, data=chunk_data, params=params)
future_to_chunk[future] = idx + 1

# Small delay between submissions to avoid rate limiting
if idx < len(parcl_property_ids_chunks) - 1: # Don't delay after the last one
time.sleep(0.1)

# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk_num = future_to_chunk[future]
try:
result = future.result()

# Check HTTP status code
if result.status_code != 200:
error_msg = f"Chunk {chunk_num} failed: HTTP {result.status_code}"
response_preview = result.text[:200] if result.text else "No response content"
raise RuntimeError(f"{error_msg}\nResponse content: {response_preview}...")

# Check if response has content
if not result.text.strip():
raise RuntimeError(f"Chunk {chunk_num} failed: Empty response from API")

# Try to parse JSON
try:
response = result.json()
all_data.append(response)
print(f"Completed chunk {chunk_num} of {num_chunks}")
except ValueError as json_exc:
response_preview = result.text[:200] if result.text else "No response content"
raise RuntimeError(f"Chunk {chunk_num} failed: Invalid JSON - {json_exc}\n"
f"Response content: {response_preview}...")

except Exception as exc:
# If it's already a RuntimeError from above, re-raise it
if isinstance(exc, RuntimeError):
raise

# For any other unexpected errors, wrap and raise
raise RuntimeError(f"Chunk {chunk_num} failed with unexpected error: {exc} "
f"(Exception type: {type(exc).__name__})")

print(f"All {num_chunks} chunks completed successfully.")
return all_data

def _as_pd_dataframe(self, data: list[Mapping[str, Any]]) -> pd.DataFrame:
"""
Convert API response data to a pandas DataFrame with events as rows
Expand Down Expand Up @@ -163,7 +242,7 @@ def _build_search_criteria(
data["parcl_ids"] = parcl_ids

if parcl_property_ids:
data["parcl_property_ids"] = parcl_property_ids
data[PARCL_PROPERTY_IDS] = parcl_property_ids

if geo_coordinates:
data["geo_coordinates"] = geo_coordinates
Expand Down Expand Up @@ -208,6 +287,10 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
"""Build boolean property filters."""
filters = {}

if params.include_property_details is not None:
filters["include_property_details"] = self.simple_bool_validator(
params.include_property_details
)
if params.current_on_market_flag is not None:
filters["current_on_market_flag"] = self.simple_bool_validator(
params.current_on_market_flag
Expand All @@ -216,9 +299,17 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
filters["current_on_market_rental_flag"] = self.simple_bool_validator(
params.current_on_market_rental_flag
)
if params.include_property_details is not None:
filters["include_property_details"] = self.simple_bool_validator(
params.include_property_details
if params.current_new_construction_flag is not None:
filters["current_new_construction_flag"] = self.simple_bool_validator(
params.current_new_construction_flag
)
if params.current_owner_occupied_flag is not None:
filters["current_owner_occupied_flag"] = self.simple_bool_validator(
params.current_owner_occupied_flag
)
if params.current_investor_owned_flag is not None:
filters["current_investor_owned_flag"] = self.simple_bool_validator(
params.current_investor_owned_flag
)

return filters
Expand All @@ -242,6 +333,10 @@ def _build_property_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
property_type.upper() for property_type in params.property_types
]

# Handle current entity owner name
if params.current_entity_owner_name is not None:
property_filters["current_entity_owner_name"] = params.current_entity_owner_name

return property_filters

def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, Any]:
Expand Down Expand Up @@ -271,6 +366,12 @@ def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, An
event_filters["is_new_construction"] = self.simple_bool_validator(
params.is_new_construction
)
if params.include_events is not None:
event_filters["include_events"] = self.simple_bool_validator(params.include_events)
if params.include_full_event_history is not None:
event_filters["include_full_event_history"] = self.simple_bool_validator(
params.include_full_event_history
)

return event_filters

Expand Down Expand Up @@ -356,6 +457,12 @@ def retrieve(
is_owner_occupied: bool | None = None,
current_on_market_flag: bool | None = None,
current_on_market_rental_flag: bool | None = None,
current_new_construction_flag: bool | None = None,
current_owner_occupied_flag: bool | None = None,
current_investor_owned_flag: bool | None = None,
current_entity_owner_name: str | None = None,
include_events: bool | None = None,
include_full_event_history: bool | None = None,
limit: int | None = None,
params: Mapping[str, Any] | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
Expand Down Expand Up @@ -393,6 +500,12 @@ def retrieve(
is_owner_occupied: Whether to filter by owner occupied.
current_on_market_flag: Whether to filter by current_on_market flag.
current_on_market_rental_flag: Whether to filter by current_on_market_rental flag.
current_new_construction_flag: Whether to filter by current_new_construction flag.
current_owner_occupied_flag: Whether to filter by current_owner_occupied flag.
current_investor_owned_flag: Whether to filter by current_investor_owned flag.
current_entity_owner_name: Current entity owner name to filter by.
include_events: Whether to include events in the response.
include_full_event_history: Whether to include full event history in the response.
limit: Number of results to return.
params: Additional parameters to pass to the request.
Returns:
Expand Down Expand Up @@ -431,6 +544,12 @@ def retrieve(
is_owner_occupied=is_owner_occupied,
current_on_market_flag=current_on_market_flag,
current_on_market_rental_flag=current_on_market_rental_flag,
current_new_construction_flag=current_new_construction_flag,
current_owner_occupied_flag=current_owner_occupied_flag,
current_investor_owned_flag=current_investor_owned_flag,
current_entity_owner_name=current_entity_owner_name,
include_events=include_events,
include_full_event_history=include_full_event_history,
limit=limit,
params=params or {},
)
Expand All @@ -452,10 +571,14 @@ def retrieve(

# Set limit
request_params = input_params.params.copy()
request_params["limit"] = self._validate_limit(input_params.limit)

# Make request with params
results = self._fetch_post(params=request_params, data=data)
if data.get(PARCL_PROPERTY_IDS):
request_params["limit"] = PARCL_PROPERTY_IDS_LIMIT
results = self._fetch_post_parcl_property_ids(params=request_params, data=data)
else:
request_params["limit"] = self._validate_limit(input_params.limit)
results = self._fetch_post(params=request_params, data=data)

# Get metadata from results
metadata = self._get_metadata(results)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_property_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from parcllabs.common import PARCL_PROPERTY_IDS
from parcllabs.enums import RequestLimits
from parcllabs.schemas.schemas import GeoCoordinates, PropertyV2RetrieveParams
from parcllabs.services.properties.property_v2 import PropertyV2Service
Expand Down Expand Up @@ -51,7 +52,7 @@ def test_build_search_criteria(property_v2_service: PropertyV2Service) -> None:

# Test with parcl_property_ids
criteria = property_v2_service._build_search_criteria(parcl_property_ids=[789, 101])
assert criteria == {"parcl_property_ids": [789, 101]}
assert criteria == {PARCL_PROPERTY_IDS: [789, 101]}

# Test with location
geo_coordinates = {"latitude": 37.7749, "longitude": -122.4194, "radius": 5.0}
Expand All @@ -64,7 +65,7 @@ def test_build_search_criteria(property_v2_service: PropertyV2Service) -> None:
)
assert criteria == {
"parcl_ids": [123],
"parcl_property_ids": [456],
PARCL_PROPERTY_IDS: [456],
"geo_coordinates": geo_coordinates,
}

Expand Down
Loading