55
66import pandas as pd
77
8- from parcllabs .common import PARCL_PROPERTY_IDS_LIMIT , PARCL_PROPERTY_IDS
8+ from parcllabs .common import PARCL_PROPERTY_IDS , PARCL_PROPERTY_IDS_LIMIT
99from parcllabs .enums import RequestLimits
1010from parcllabs .schemas .schemas import PropertyV2RetrieveParamCategories , PropertyV2RetrieveParams
1111from parcllabs .services .parcllabs_service import ParclLabsService
@@ -17,6 +17,15 @@ def __init__(self, *args: object, **kwargs: object) -> None:
1717 super ().__init__ (* args , ** kwargs )
1818 self .simple_bool_validator = Validators .validate_input_bool_param_simple
1919
20+ @staticmethod
21+ def _raise_http_error (chunk_num : int , status_code : int , response_preview : str ) -> None :
22+ error_msg = f"Chunk { chunk_num } failed: HTTP { status_code } "
23+ raise RuntimeError (f"{ error_msg } \n Response content: { response_preview } ..." )
24+
25+ @staticmethod
26+ def _raise_empty_response_error (chunk_num : int ) -> None :
27+ raise RuntimeError (f"Chunk { chunk_num } failed: Empty response from API" )
28+
2029 def _fetch_post (self , params : dict [str , Any ], data : dict [str , Any ]) -> list [dict ]:
2130 """Fetch data using POST request with pagination support."""
2231 response = self ._post (url = self .full_post_url , data = data , params = params )
@@ -67,7 +76,11 @@ def _fetch_post(self, params: dict[str, Any], data: dict[str, Any]) -> list[dict
6776
6877 return all_data
6978
70- def _fetch_post_parcl_property_ids (self , params : dict [str , Any ], data : dict [str , Any ]) -> list [dict ]:
79+ def _fetch_post_parcl_property_ids (
80+ self ,
81+ params : dict [str , Any ],
82+ data : dict [str , Any ],
83+ ) -> list [dict ]:
7184 """Fetch data using POST request with parcl_property_ids, chunking the request
7285
7386 Args:
@@ -84,13 +97,14 @@ def _fetch_post_parcl_property_ids(self, params: dict[str, Any], data: dict[str,
8497
8598 # If we exceed PARCL_PROPERTY_IDS_LIMIT, chunk the request
8699 parcl_property_ids_chunks = [
87- parcl_property_ids [i :i + PARCL_PROPERTY_IDS_LIMIT ] for i in range (0 , num_ids , PARCL_PROPERTY_IDS_LIMIT )
100+ parcl_property_ids [i : i + PARCL_PROPERTY_IDS_LIMIT ]
101+ for i in range (0 , num_ids , PARCL_PROPERTY_IDS_LIMIT )
88102 ]
89103 num_chunks = len (parcl_property_ids_chunks )
90104
91105 print (f"Fetching { num_chunks } chunks..." )
92106
93- all_data = []
107+ all_data = []
94108 with ThreadPoolExecutor (max_workers = 3 ) as executor :
95109 # Create a copy of data for each chunk to avoid race conditions
96110 future_to_chunk = {}
@@ -100,13 +114,20 @@ def _fetch_post_parcl_property_ids(self, params: dict[str, Any], data: dict[str,
100114 chunk_data [PARCL_PROPERTY_IDS ] = chunk
101115
102116 # Submit the task
103- future = executor .submit (self ._post , url = self .full_post_url , data = chunk_data , params = params )
117+ future = executor .submit (
118+ self ._post ,
119+ url = self .full_post_url ,
120+ data = chunk_data ,
121+ params = params ,
122+ )
104123 future_to_chunk [future ] = idx + 1
105124
106125 # Small delay between submissions to avoid rate limiting
107126 if idx < len (parcl_property_ids_chunks ) - 1 : # Don't delay after the last one
108127 time .sleep (0.1 )
109128
129+ # Helper functions to abstract raise statements
130+
110131 # Collect results as they complete
111132 for future in as_completed (future_to_chunk ):
112133 chunk_num = future_to_chunk [future ]
@@ -115,32 +136,39 @@ def _fetch_post_parcl_property_ids(self, params: dict[str, Any], data: dict[str,
115136
116137 # Check HTTP status code
117138 if result .status_code != 200 :
118- error_msg = f"Chunk { chunk_num } failed: HTTP { result .status_code } "
119- response_preview = result .text [:200 ] if result .text else "No response content"
120- raise RuntimeError (f"{ error_msg } \n Response content: { response_preview } ..." )
139+ response_preview = (
140+ result .text [:200 ] if result .text else "No response content"
141+ )
142+ self ._raise_http_error (chunk_num , result .status_code , response_preview )
121143
122144 # Check if response has content
123145 if not result .text .strip ():
124- raise RuntimeError ( f"Chunk { chunk_num } failed: Empty response from API" )
146+ self . _raise_empty_response_error ( chunk_num )
125147
126148 # Try to parse JSON
127149 try :
128150 response = result .json ()
129151 all_data .append (response )
130152 print (f"Completed chunk { chunk_num } of { num_chunks } " )
131153 except ValueError as json_exc :
132- response_preview = result .text [:200 ] if result .text else "No response content"
133- raise RuntimeError (f"Chunk { chunk_num } failed: Invalid JSON - { json_exc } \n "
134- f"Response content: { response_preview } ..." )
154+ response_preview = (
155+ result .text [:200 ] if result .text else "No response content"
156+ )
157+ raise RuntimeError (
158+ f"Chunk { chunk_num } failed: Invalid JSON - { json_exc } \n "
159+ f"Response content: { response_preview } ..."
160+ ) from json_exc
135161
136162 except Exception as exc :
137163 # If it's already a RuntimeError from above, re-raise it
138164 if isinstance (exc , RuntimeError ):
139165 raise
140166
141167 # For any other unexpected errors, wrap and raise
142- raise RuntimeError (f"Chunk { chunk_num } failed with unexpected error: { exc } "
143- f"(Exception type: { type (exc ).__name__ } )" )
168+ raise RuntimeError (
169+ f"Chunk { chunk_num } failed with unexpected error: { exc } "
170+ f"(Exception type: { type (exc ).__name__ } )"
171+ ) from exc
144172
145173 print (f"All { num_chunks } chunks completed successfully." )
146174 return all_data
@@ -339,7 +367,7 @@ def _build_property_filters(self, params: PropertyV2RetrieveParams) -> dict[str,
339367
340368 return property_filters
341369
342- def _build_event_filters (self , params : PropertyV2RetrieveParams ) -> dict [str , Any ]:
370+ def _build_event_filters (self , params : PropertyV2RetrieveParams ) -> dict [str , Any ]: # noqa: C901
343371 """Build event filters from validated Pydantic schema."""
344372 event_filters = {}
345373
0 commit comments