@@ -147,18 +147,47 @@ def _infer_pyarrow_type(self, value: Any) -> pa.DataType:
147147 else :
148148 return pa .string () # Default to string for unknown types
149149
150+ def _convert_value_to_string (self , value : Any ) -> Optional [str ]:
151+ """
152+ Convert any value to string, handling special cases for robust type compatibility.
153+
154+ Args:
155+ value: The value to convert
156+
157+ Returns:
158+ String representation of the value, or None if input is None
159+ """
160+ if value is None :
161+ return None
162+ elif isinstance (value , bytes ):
163+ # Handle binary data
164+ try :
165+ return value .decode ("utf-8" )
166+ except UnicodeDecodeError :
167+ # If can't decode, convert to hex string
168+ return value .hex ()
169+ elif isinstance (value , (list , dict )):
170+ return json .dumps (value )
171+ elif isinstance (value , datetime .datetime ):
172+ return value .isoformat ()
173+ elif isinstance (value , (int , float , bool )):
174+ return str (value )
175+ else :
176+ return str (value )
177+
150178 def _flatten_json_data (
151179 self , data : Dict [str , Any ], prefix : str = ""
152180 ) -> Dict [str , Any ]:
153181 """
154- Flatten nested JSON data with dot notation.
182+ Flatten nested JSON data with dot notation and convert all values to strings
183+ for robust type compatibility.
155184
156185 Args:
157186 data: The JSON data to flatten
158187 prefix: Prefix for nested keys
159188
160189 Returns:
161- Flattened dictionary
190+ Flattened dictionary with all values converted to strings
162191 """
163192 flattened = {}
164193
@@ -172,7 +201,8 @@ def _flatten_json_data(
172201 # Convert lists to JSON strings
173202 flattened [new_key ] = json .dumps (value ) if value else None
174203 else :
175- flattened [new_key ] = value
204+ # Convert all values to strings for type consistency
205+ flattened [new_key ] = self ._convert_value_to_string (value )
176206
177207 return flattened
178208
@@ -217,6 +247,64 @@ def _create_dynamic_schema(self, records: List[Dict[str, Any]]) -> pa.Schema:
217247
218248 return pa .schema (schema_fields )
219249
250+ def _sanitize_records_for_schema (
251+ self , records : List [Dict [str , Any ]], schema : pa .Schema
252+ ) -> List [Dict [str , Any ]]:
253+ """
254+ Sanitize records to ensure they conform to the schema and handle type compatibility issues.
255+
256+ Args:
257+ records: List of record dictionaries
258+ schema: PyArrow schema to conform to
259+
260+ Returns:
261+ List of sanitized records
262+ """
263+ sanitized_records = []
264+
265+ for record in records :
266+ sanitized_record = {}
267+
268+ # Process each field in the schema
269+ for field in schema :
270+ field_name = field .name
271+ value = record .get (field_name )
272+
273+ if value is None :
274+ sanitized_record [field_name ] = None
275+ elif field .type == pa .string ():
276+ # Convert all values to strings for string fields
277+ sanitized_record [field_name ] = self ._convert_value_to_string (value )
278+ elif field .type == pa .timestamp ("ms" ):
279+ # Handle timestamp fields
280+ if isinstance (value , datetime .datetime ):
281+ sanitized_record [field_name ] = value
282+ else :
283+ # Try to parse string timestamps
284+ try :
285+ if isinstance (value , str ):
286+ sanitized_record [field_name ] = (
287+ datetime .datetime .fromisoformat (
288+ value .replace ("Z" , "+00:00" )
289+ )
290+ )
291+ else :
292+ sanitized_record [field_name ] = None
293+ except (ValueError , TypeError ):
294+ sanitized_record [field_name ] = None
295+ else :
296+ # For any other types, convert to string as fallback
297+ sanitized_record [field_name ] = self ._convert_value_to_string (value )
298+
299+ # Add any fields from the record that aren't in the schema (shouldn't happen with dynamic schema)
300+ for field_name , value in record .items ():
301+ if field_name not in sanitized_record :
302+ sanitized_record [field_name ] = self ._convert_value_to_string (value )
303+
304+ sanitized_records .append (sanitized_record )
305+
306+ return sanitized_records
307+
220308 def save (self , document : Document , data_to_save : List [str ]) -> List [Dict [str , Any ]]:
221309 """
222310 Save document data based on the data_to_save list.
@@ -747,40 +835,10 @@ def save_document_sections(self, document: Document) -> Optional[Dict[str, Any]]
747835 # Create dynamic schema for this section's data
748836 schema = self ._create_dynamic_schema (section_records )
749837
750- # Ensure all records conform to the schema by filling missing fields and converting types
751- # With conservative typing, most fields will be strings to prevent type conflicts
752- for record in section_records :
753- for field in schema :
754- field_name = field .name
755- if field_name not in record :
756- record [field_name ] = None
757- else :
758- # Convert values to match the expected schema types
759- value = record [field_name ]
760- if value is not None :
761- if field .type == pa .string ():
762- # Convert all values to strings for consistency
763- record [field_name ] = str (value )
764- elif field .type == pa .timestamp ("ms" ):
765- # Keep timestamps as datetime objects
766- if isinstance (value , datetime .datetime ):
767- record [field_name ] = value
768- else :
769- # Try to parse string timestamps
770- try :
771- if isinstance (value , str ):
772- record [field_name ] = (
773- datetime .datetime .fromisoformat (
774- value .replace ("Z" , "+00:00" )
775- )
776- )
777- else :
778- record [field_name ] = None
779- except (ValueError , TypeError ):
780- record [field_name ] = None
781- else :
782- # For any other types, convert to string as fallback
783- record [field_name ] = str (value )
838+ # Sanitize all records to ensure robust type compatibility
839+ section_records = self ._sanitize_records_for_schema (
840+ section_records , schema
841+ )
784842
785843 # Create S3 key with separate tables for each section type
786844 # document_sections/{section_type}/date={date}/{escaped_doc_id}_section_{section_id}.parquet
0 commit comments