diff --git a/.gitignore b/.gitignore index a45ffc1..30e54df 100644 --- a/.gitignore +++ b/.gitignore @@ -176,3 +176,5 @@ cython_debug/ .python-version .DS_Store + +testing/ diff --git a/CHANGELOG.md b/CHANGELOG.md index c27daa5..92dda4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.15.0 +- Update to reflect enhanced `property_v2.search` endpoint. +- Chunking logic for `property_v2.search` endpoint for parcl_property_id requests. + ### v1.14.5 - Update `property_v2.search` to fix pagination logic bug. diff --git a/parcllabs/__version__.py b/parcllabs/__version__.py index 0a028af..57d0a1b 100644 --- a/parcllabs/__version__.py +++ b/parcllabs/__version__.py @@ -1 +1 @@ -VERSION = "1.14.5" +VERSION = "1.15.0" diff --git a/parcllabs/common.py b/parcllabs/common.py index 3753913..29a580d 100644 --- a/parcllabs/common.py +++ b/parcllabs/common.py @@ -28,3 +28,5 @@ ZIP_CODE_LENGTH = 5 +PARCL_PROPERTY_IDS_LIMIT = 10000 +PARCL_PROPERTY_IDS = "parcl_property_ids" diff --git a/parcllabs/schemas/schemas.py b/parcllabs/schemas/schemas.py index cf62f8a..f727850 100644 --- a/parcllabs/schemas/schemas.py +++ b/parcllabs/schemas/schemas.py @@ -35,7 +35,6 @@ class PropertyV2RetrieveParams(BaseModel): default=None, description="Geographic coordinates with radius to filter by" ) - # Property filters property_types: list[str] | None = Field( default=None, description="List of property types to filter by" ) @@ -62,6 +61,22 @@ class PropertyV2RetrieveParams(BaseModel): ) # Event filters + include_events: bool | None = Field( + default=None, + description="""Boolean flag indicating if event data is included in the response. + 0 signifies that no event data is returned. + 1 signifies that event data is included, scoped + to events that match the event filters supplied in the request + """, + ) + include_full_event_history: bool | None = Field( + default=None, + description="""Boolean flag indicating if the full event history is returned for + each property (effective only when include_events is 1). + 0 signifies that only events matching the event filters are returned. + 1 signifies that the property's entire event history is returned + """, + ) event_names: list[str] | None = Field( default=None, description="List of event names to filter by" ) @@ -100,6 +115,18 @@ class PropertyV2RetrieveParams(BaseModel): current_on_market_rental_flag: bool | None = Field( default=None, description="Whether to filter by current on market rental flag" ) + current_new_construction_flag: bool | None = Field( + default=None, description="Whether to filter by current new construction flag" + ) + current_owner_occupied_flag: bool | None = Field( + default=None, description="Whether to filter by current owner occupied flag" + ) + current_investor_owned_flag: bool | None = Field( + default=None, description="Whether to filter by current investor owned flag" + ) + current_entity_owner_name: str | None = Field( + default=None, description="Current entity owner name to filter by" + ) # Pagination limit: int | None = Field( diff --git a/parcllabs/services/properties/property_v2.py b/parcllabs/services/properties/property_v2.py index 336fcab..47a4367 100644 --- a/parcllabs/services/properties/property_v2.py +++ b/parcllabs/services/properties/property_v2.py @@ -1,9 +1,11 @@ +import time from collections.abc import Mapping from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any import pandas as pd +from parcllabs.common import PARCL_PROPERTY_IDS, PARCL_PROPERTY_IDS_LIMIT from parcllabs.enums import RequestLimits from parcllabs.schemas.schemas import PropertyV2RetrieveParamCategories, PropertyV2RetrieveParams from parcllabs.services.parcllabs_service import ParclLabsService @@ -15,19 +17,27 @@ def __init__(self, *args: object, **kwargs: object) -> None: super().__init__(*args, **kwargs) self.simple_bool_validator = Validators.validate_input_bool_param_simple + @staticmethod + def _raise_http_error(chunk_num: int, status_code: int, response_preview: str) -> None: + error_msg = f"Chunk {chunk_num} failed: HTTP {status_code}" + raise RuntimeError(f"{error_msg}\nResponse content: {response_preview}...") + + @staticmethod + def _raise_empty_response_error(chunk_num: int) -> None: + raise RuntimeError(f"Chunk {chunk_num} failed: Empty response from API") + def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict]: """Fetch data using POST request with pagination support.""" response = self._post(url=self.full_post_url, data=data, params=params) result = response.json() + all_data = [result] pagination = result.get("pagination") metadata = result.get("metadata") - all_data = [result] - - returned_count = metadata.get("results", {}).get("returned_count", 0) if pagination: limit = pagination.get("limit") + returned_count = metadata.get("results", {}).get("returned_count", 0) if returned_count < limit: # if we got fewer results than requested, don't paginate return all_data @@ -35,11 +45,11 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict if pagination.get("has_more"): print("More pages to fetch, paginating additional pages...") offset = pagination.get("offset") - total_count = metadata.get("results", {}).get("total_available", 0) + total_available = metadata.get("results", {}).get("total_available", 0) # Calculate how many more pages we need - remaining_pages = (total_count - limit) // limit - if (total_count - limit) % limit > 0: + remaining_pages = (total_available - limit) // limit + if (total_available - limit) % limit > 0: remaining_pages += 1 # Generate all the URLs we need to fetch @@ -66,6 +76,103 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict return all_data + def _fetch_post_parcl_property_ids( + self, + params: dict[str, Any], + data: dict[str, Any], + ) -> list[dict]: + """Fetch data using POST request with parcl_property_ids, chunking the request + + Args: + params: Dictionary of parameters to pass to the request. + data: Dictionary of data to pass to the request. + + Returns: + List of dictionaries containing the data from the request. + """ + parcl_property_ids = data.get(PARCL_PROPERTY_IDS) + num_ids = len(parcl_property_ids) + if num_ids <= PARCL_PROPERTY_IDS_LIMIT: + return self._fetch_post(params=params, data=data) + + # If we exceed PARCL_PROPERTY_IDS_LIMIT, chunk the request + parcl_property_ids_chunks = [ + parcl_property_ids[i : i + PARCL_PROPERTY_IDS_LIMIT] + for i in range(0, num_ids, PARCL_PROPERTY_IDS_LIMIT) + ] + num_chunks = len(parcl_property_ids_chunks) + + print(f"Fetching {num_chunks} chunks...") + + all_data = [] + with ThreadPoolExecutor(max_workers=3) as executor: + # Create a copy of data for each chunk to avoid race conditions + future_to_chunk = {} + for idx, chunk in enumerate(parcl_property_ids_chunks): + # Create a copy of data with the specific chunk + chunk_data = data.copy() + chunk_data[PARCL_PROPERTY_IDS] = chunk + + # Submit the task + future = executor.submit( + self._post, + url=self.full_post_url, + data=chunk_data, + params=params, + ) + future_to_chunk[future] = idx + 1 + + # Small delay between submissions to avoid rate limiting + if idx < len(parcl_property_ids_chunks) - 1: # Don't delay after the last one + time.sleep(0.1) + + # Helper functions to abstract raise statements + + # Collect results as they complete + for future in as_completed(future_to_chunk): + chunk_num = future_to_chunk[future] + try: + result = future.result() + + # Check HTTP status code + if result.status_code != 200: + response_preview = ( + result.text[:200] if result.text else "No response content" + ) + self._raise_http_error(chunk_num, result.status_code, response_preview) + + # Check if response has content + if not result.text.strip(): + self._raise_empty_response_error(chunk_num) + + # Try to parse JSON + try: + response = result.json() + all_data.append(response) + print(f"Completed chunk {chunk_num} of {num_chunks}") + except ValueError as json_exc: + response_preview = ( + result.text[:200] if result.text else "No response content" + ) + raise RuntimeError( + f"Chunk {chunk_num} failed: Invalid JSON - {json_exc}\n" + f"Response content: {response_preview}..." + ) from json_exc + + except Exception as exc: + # If it's already a RuntimeError from above, re-raise it + if isinstance(exc, RuntimeError): + raise + + # For any other unexpected errors, wrap and raise + raise RuntimeError( + f"Chunk {chunk_num} failed with unexpected error: {exc} " + f"(Exception type: {type(exc).__name__})" + ) from exc + + print(f"All {num_chunks} chunks completed successfully.") + return all_data + def _as_pd_dataframe(self, data: list[Mapping[str, Any]]) -> pd.DataFrame: """ Convert API response data to a pandas DataFrame with events as rows @@ -163,7 +270,7 @@ def _build_search_criteria( data["parcl_ids"] = parcl_ids if parcl_property_ids: - data["parcl_property_ids"] = parcl_property_ids + data[PARCL_PROPERTY_IDS] = parcl_property_ids if geo_coordinates: data["geo_coordinates"] = geo_coordinates @@ -208,6 +315,10 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str, """Build boolean property filters.""" filters = {} + if params.include_property_details is not None: + filters["include_property_details"] = self.simple_bool_validator( + params.include_property_details + ) if params.current_on_market_flag is not None: filters["current_on_market_flag"] = self.simple_bool_validator( params.current_on_market_flag @@ -216,9 +327,17 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str, filters["current_on_market_rental_flag"] = self.simple_bool_validator( params.current_on_market_rental_flag ) - if params.include_property_details is not None: - filters["include_property_details"] = self.simple_bool_validator( - params.include_property_details + if params.current_new_construction_flag is not None: + filters["current_new_construction_flag"] = self.simple_bool_validator( + params.current_new_construction_flag + ) + if params.current_owner_occupied_flag is not None: + filters["current_owner_occupied_flag"] = self.simple_bool_validator( + params.current_owner_occupied_flag + ) + if params.current_investor_owned_flag is not None: + filters["current_investor_owned_flag"] = self.simple_bool_validator( + params.current_investor_owned_flag ) return filters @@ -242,9 +361,13 @@ def _build_property_filters(self, params: PropertyV2RetrieveParams) -> dict[str, property_type.upper() for property_type in params.property_types ] + # Handle current entity owner name + if params.current_entity_owner_name is not None: + property_filters["current_entity_owner_name"] = params.current_entity_owner_name + return property_filters - def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, Any]: + def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, Any]: # noqa: C901 """Build event filters from validated Pydantic schema.""" event_filters = {} @@ -271,6 +394,12 @@ def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, An event_filters["is_new_construction"] = self.simple_bool_validator( params.is_new_construction ) + if params.include_events is not None: + event_filters["include_events"] = self.simple_bool_validator(params.include_events) + if params.include_full_event_history is not None: + event_filters["include_full_event_history"] = self.simple_bool_validator( + params.include_full_event_history + ) return event_filters @@ -356,6 +485,12 @@ def retrieve( is_owner_occupied: bool | None = None, current_on_market_flag: bool | None = None, current_on_market_rental_flag: bool | None = None, + current_new_construction_flag: bool | None = None, + current_owner_occupied_flag: bool | None = None, + current_investor_owned_flag: bool | None = None, + current_entity_owner_name: str | None = None, + include_events: bool | None = None, + include_full_event_history: bool | None = None, limit: int | None = None, params: Mapping[str, Any] | None = None, ) -> tuple[pd.DataFrame, dict[str, Any]]: @@ -393,6 +528,12 @@ def retrieve( is_owner_occupied: Whether to filter by owner occupied. current_on_market_flag: Whether to filter by current_on_market flag. current_on_market_rental_flag: Whether to filter by current_on_market_rental flag. + current_new_construction_flag: Whether to filter by current_new_construction flag. + current_owner_occupied_flag: Whether to filter by current_owner_occupied flag. + current_investor_owned_flag: Whether to filter by current_investor_owned flag. + current_entity_owner_name: Current entity owner name to filter by. + include_events: Whether to include events in the response. + include_full_event_history: Whether to include full event history in the response. limit: Number of results to return. params: Additional parameters to pass to the request. Returns: @@ -431,6 +572,12 @@ def retrieve( is_owner_occupied=is_owner_occupied, current_on_market_flag=current_on_market_flag, current_on_market_rental_flag=current_on_market_rental_flag, + current_new_construction_flag=current_new_construction_flag, + current_owner_occupied_flag=current_owner_occupied_flag, + current_investor_owned_flag=current_investor_owned_flag, + current_entity_owner_name=current_entity_owner_name, + include_events=include_events, + include_full_event_history=include_full_event_history, limit=limit, params=params or {}, ) @@ -452,10 +599,14 @@ def retrieve( # Set limit request_params = input_params.params.copy() - request_params["limit"] = self._validate_limit(input_params.limit) # Make request with params - results = self._fetch_post(params=request_params, data=data) + if data.get(PARCL_PROPERTY_IDS): + request_params["limit"] = PARCL_PROPERTY_IDS_LIMIT + results = self._fetch_post_parcl_property_ids(params=request_params, data=data) + else: + request_params["limit"] = self._validate_limit(input_params.limit) + results = self._fetch_post(params=request_params, data=data) # Get metadata from results metadata = self._get_metadata(results) diff --git a/tests/test_property_v2.py b/tests/test_property_v2.py index e51a4a6..81e3d27 100644 --- a/tests/test_property_v2.py +++ b/tests/test_property_v2.py @@ -2,6 +2,7 @@ import pytest +from parcllabs.common import PARCL_PROPERTY_IDS from parcllabs.enums import RequestLimits from parcllabs.schemas.schemas import GeoCoordinates, PropertyV2RetrieveParams from parcllabs.services.properties.property_v2 import PropertyV2Service @@ -51,7 +52,7 @@ def test_build_search_criteria(property_v2_service: PropertyV2Service) -> None: # Test with parcl_property_ids criteria = property_v2_service._build_search_criteria(parcl_property_ids=[789, 101]) - assert criteria == {"parcl_property_ids": [789, 101]} + assert criteria == {PARCL_PROPERTY_IDS: [789, 101]} # Test with location geo_coordinates = {"latitude": 37.7749, "longitude": -122.4194, "radius": 5.0} @@ -64,7 +65,7 @@ def test_build_search_criteria(property_v2_service: PropertyV2Service) -> None: ) assert criteria == { "parcl_ids": [123], - "parcl_property_ids": [456], + PARCL_PROPERTY_IDS: [456], "geo_coordinates": geo_coordinates, }