1+ import time
12from collections .abc import Mapping
23from concurrent .futures import ThreadPoolExecutor , as_completed
34from typing import Any
45
56import pandas as pd
67
8+ from parcllabs .common import PARCL_PROPERTY_IDS , PARCL_PROPERTY_IDS_LIMIT
79from parcllabs .enums import RequestLimits
810from parcllabs .schemas .schemas import PropertyV2RetrieveParamCategories , PropertyV2RetrieveParams
911from parcllabs .services .parcllabs_service import ParclLabsService
@@ -15,31 +17,39 @@ def __init__(self, *args: object, **kwargs: object) -> None:
1517 super ().__init__ (* args , ** kwargs )
1618 self .simple_bool_validator = Validators .validate_input_bool_param_simple
1719
20+ @staticmethod
21+ def _raise_http_error (chunk_num : int , status_code : int , response_preview : str ) -> None :
22+ error_msg = f"Chunk { chunk_num } failed: HTTP { status_code } "
23+ raise RuntimeError (f"{ error_msg } \n Response content: { response_preview } ..." )
24+
25+ @staticmethod
26+ def _raise_empty_response_error (chunk_num : int ) -> None :
27+ raise RuntimeError (f"Chunk { chunk_num } failed: Empty response from API" )
28+
1829 def _fetch_post (self , params : dict [str , Any ], data : dict [str , Any ]) -> list [dict ]:
1930 """Fetch data using POST request with pagination support."""
2031 response = self ._post (url = self .full_post_url , data = data , params = params )
2132 result = response .json ()
2233
34+ all_data = [result ]
2335 pagination = result .get ("pagination" )
2436 metadata = result .get ("metadata" )
25- all_data = [result ]
26-
27- returned_count = metadata .get ("results" , {}).get ("returned_count" , 0 )
2837
2938 if pagination :
3039 limit = pagination .get ("limit" )
40+ returned_count = metadata .get ("results" , {}).get ("returned_count" , 0 )
3141 if returned_count < limit : # if we got fewer results than requested, don't paginate
3242 return all_data
3343
3444 # If we need to paginate, use concurrent requests
3545 if pagination .get ("has_more" ):
3646 print ("More pages to fetch, paginating additional pages..." )
3747 offset = pagination .get ("offset" )
38- total_count = metadata .get ("results" , {}).get ("total_available" , 0 )
48+ total_available = metadata .get ("results" , {}).get ("total_available" , 0 )
3949
4050 # Calculate how many more pages we need
41- remaining_pages = (total_count - limit ) // limit
42- if (total_count - limit ) % limit > 0 :
51+ remaining_pages = (total_available - limit ) // limit
52+ if (total_available - limit ) % limit > 0 :
4353 remaining_pages += 1
4454
4555 # Generate all the URLs we need to fetch
@@ -66,6 +76,103 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict
6676
6777 return all_data
6878
79+ def _fetch_post_parcl_property_ids (
80+ self ,
81+ params : dict [str , Any ],
82+ data : dict [str , Any ],
83+ ) -> list [dict ]:
84+ """Fetch data using POST request with parcl_property_ids, chunking the request
85+
86+ Args:
87+ params: Dictionary of parameters to pass to the request.
88+ data: Dictionary of data to pass to the request.
89+
90+ Returns:
91+ List of dictionaries containing the data from the request.
92+ """
93+ parcl_property_ids = data .get (PARCL_PROPERTY_IDS )
94+ num_ids = len (parcl_property_ids )
95+ if num_ids <= PARCL_PROPERTY_IDS_LIMIT :
96+ return self ._fetch_post (params = params , data = data )
97+
98+ # If we exceed PARCL_PROPERTY_IDS_LIMIT, chunk the request
99+ parcl_property_ids_chunks = [
100+ parcl_property_ids [i : i + PARCL_PROPERTY_IDS_LIMIT ]
101+ for i in range (0 , num_ids , PARCL_PROPERTY_IDS_LIMIT )
102+ ]
103+ num_chunks = len (parcl_property_ids_chunks )
104+
105+ print (f"Fetching { num_chunks } chunks..." )
106+
107+ all_data = []
108+ with ThreadPoolExecutor (max_workers = 3 ) as executor :
109+ # Create a copy of data for each chunk to avoid race conditions
110+ future_to_chunk = {}
111+ for idx , chunk in enumerate (parcl_property_ids_chunks ):
112+ # Create a copy of data with the specific chunk
113+ chunk_data = data .copy ()
114+ chunk_data [PARCL_PROPERTY_IDS ] = chunk
115+
116+ # Submit the task
117+ future = executor .submit (
118+ self ._post ,
119+ url = self .full_post_url ,
120+ data = chunk_data ,
121+ params = params ,
122+ )
123+ future_to_chunk [future ] = idx + 1
124+
125+ # Small delay between submissions to avoid rate limiting
126+ if idx < len (parcl_property_ids_chunks ) - 1 : # Don't delay after the last one
127+ time .sleep (0.1 )
128+
129+ # Helper functions to abstract raise statements
130+
131+ # Collect results as they complete
132+ for future in as_completed (future_to_chunk ):
133+ chunk_num = future_to_chunk [future ]
134+ try :
135+ result = future .result ()
136+
137+ # Check HTTP status code
138+ if result .status_code != 200 :
139+ response_preview = (
140+ result .text [:200 ] if result .text else "No response content"
141+ )
142+ self ._raise_http_error (chunk_num , result .status_code , response_preview )
143+
144+ # Check if response has content
145+ if not result .text .strip ():
146+ self ._raise_empty_response_error (chunk_num )
147+
148+ # Try to parse JSON
149+ try :
150+ response = result .json ()
151+ all_data .append (response )
152+ print (f"Completed chunk { chunk_num } of { num_chunks } " )
153+ except ValueError as json_exc :
154+ response_preview = (
155+ result .text [:200 ] if result .text else "No response content"
156+ )
157+ raise RuntimeError (
158+ f"Chunk { chunk_num } failed: Invalid JSON - { json_exc } \n "
159+ f"Response content: { response_preview } ..."
160+ ) from json_exc
161+
162+ except Exception as exc :
163+ # If it's already a RuntimeError from above, re-raise it
164+ if isinstance (exc , RuntimeError ):
165+ raise
166+
167+ # For any other unexpected errors, wrap and raise
168+ raise RuntimeError (
169+ f"Chunk { chunk_num } failed with unexpected error: { exc } "
170+ f"(Exception type: { type (exc ).__name__ } )"
171+ ) from exc
172+
173+ print (f"All { num_chunks } chunks completed successfully." )
174+ return all_data
175+
69176 def _as_pd_dataframe (self , data : list [Mapping [str , Any ]]) -> pd .DataFrame :
70177 """
71178 Convert API response data to a pandas DataFrame with events as rows
@@ -163,7 +270,7 @@ def _build_search_criteria(
163270 data ["parcl_ids" ] = parcl_ids
164271
165272 if parcl_property_ids :
166- data ["parcl_property_ids" ] = parcl_property_ids
273+ data [PARCL_PROPERTY_IDS ] = parcl_property_ids
167274
168275 if geo_coordinates :
169276 data ["geo_coordinates" ] = geo_coordinates
@@ -208,6 +315,10 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
208315 """Build boolean property filters."""
209316 filters = {}
210317
318+ if params .include_property_details is not None :
319+ filters ["include_property_details" ] = self .simple_bool_validator (
320+ params .include_property_details
321+ )
211322 if params .current_on_market_flag is not None :
212323 filters ["current_on_market_flag" ] = self .simple_bool_validator (
213324 params .current_on_market_flag
@@ -216,9 +327,17 @@ def _build_boolean_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
216327 filters ["current_on_market_rental_flag" ] = self .simple_bool_validator (
217328 params .current_on_market_rental_flag
218329 )
219- if params .include_property_details is not None :
220- filters ["include_property_details" ] = self .simple_bool_validator (
221- params .include_property_details
330+ if params .current_new_construction_flag is not None :
331+ filters ["current_new_construction_flag" ] = self .simple_bool_validator (
332+ params .current_new_construction_flag
333+ )
334+ if params .current_owner_occupied_flag is not None :
335+ filters ["current_owner_occupied_flag" ] = self .simple_bool_validator (
336+ params .current_owner_occupied_flag
337+ )
338+ if params .current_investor_owned_flag is not None :
339+ filters ["current_investor_owned_flag" ] = self .simple_bool_validator (
340+ params .current_investor_owned_flag
222341 )
223342
224343 return filters
@@ -242,9 +361,13 @@ def _build_property_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
242361 property_type .upper () for property_type in params .property_types
243362 ]
244363
364+ # Handle current entity owner name
365+ if params .current_entity_owner_name is not None :
366+ property_filters ["current_entity_owner_name" ] = params .current_entity_owner_name
367+
245368 return property_filters
246369
247- def _build_event_filters (self , params : PropertyV2RetrieveParams ) -> dict [str , Any ]:
370+ def _build_event_filters (self , params : PropertyV2RetrieveParams ) -> dict [str , Any ]: # noqa: C901
248371 """Build event filters from validated Pydantic schema."""
249372 event_filters = {}
250373
@@ -271,6 +394,12 @@ def _build_event_filters(self, params: PropertyV2RetrieveParams) -> dict[str, An
271394 event_filters ["is_new_construction" ] = self .simple_bool_validator (
272395 params .is_new_construction
273396 )
397+ if params .include_events is not None :
398+ event_filters ["include_events" ] = self .simple_bool_validator (params .include_events )
399+ if params .include_full_event_history is not None :
400+ event_filters ["include_full_event_history" ] = self .simple_bool_validator (
401+ params .include_full_event_history
402+ )
274403
275404 return event_filters
276405
@@ -356,6 +485,12 @@ def retrieve(
356485 is_owner_occupied : bool | None = None ,
357486 current_on_market_flag : bool | None = None ,
358487 current_on_market_rental_flag : bool | None = None ,
488+ current_new_construction_flag : bool | None = None ,
489+ current_owner_occupied_flag : bool | None = None ,
490+ current_investor_owned_flag : bool | None = None ,
491+ current_entity_owner_name : str | None = None ,
492+ include_events : bool | None = None ,
493+ include_full_event_history : bool | None = None ,
359494 limit : int | None = None ,
360495 params : Mapping [str , Any ] | None = None ,
361496 ) -> tuple [pd .DataFrame , dict [str , Any ]]:
@@ -393,6 +528,12 @@ def retrieve(
393528 is_owner_occupied: Whether to filter by owner occupied.
394529 current_on_market_flag: Whether to filter by current_on_market flag.
395530 current_on_market_rental_flag: Whether to filter by current_on_market_rental flag.
531+ current_new_construction_flag: Whether to filter by current_new_construction flag.
532+ current_owner_occupied_flag: Whether to filter by current_owner_occupied flag.
533+ current_investor_owned_flag: Whether to filter by current_investor_owned flag.
534+ current_entity_owner_name: Current entity owner name to filter by.
535+ include_events: Whether to include events in the response.
536+ include_full_event_history: Whether to include full event history in the response.
396537 limit: Number of results to return.
397538 params: Additional parameters to pass to the request.
398539 Returns:
@@ -431,6 +572,12 @@ def retrieve(
431572 is_owner_occupied = is_owner_occupied ,
432573 current_on_market_flag = current_on_market_flag ,
433574 current_on_market_rental_flag = current_on_market_rental_flag ,
575+ current_new_construction_flag = current_new_construction_flag ,
576+ current_owner_occupied_flag = current_owner_occupied_flag ,
577+ current_investor_owned_flag = current_investor_owned_flag ,
578+ current_entity_owner_name = current_entity_owner_name ,
579+ include_events = include_events ,
580+ include_full_event_history = include_full_event_history ,
434581 limit = limit ,
435582 params = params or {},
436583 )
@@ -452,10 +599,14 @@ def retrieve(
452599
453600 # Set limit
454601 request_params = input_params .params .copy ()
455- request_params ["limit" ] = self ._validate_limit (input_params .limit )
456602
457603 # Make request with params
458- results = self ._fetch_post (params = request_params , data = data )
604+ if data .get (PARCL_PROPERTY_IDS ):
605+ request_params ["limit" ] = PARCL_PROPERTY_IDS_LIMIT
606+ results = self ._fetch_post_parcl_property_ids (params = request_params , data = data )
607+ else :
608+ request_params ["limit" ] = self ._validate_limit (input_params .limit )
609+ results = self ._fetch_post (params = request_params , data = data )
459610
460611 # Get metadata from results
461612 metadata = self ._get_metadata (results )
0 commit comments