Skip to content

Commit 65306e3

Browse files
author
Bob Strahan
committed
Fix reporting database schema handling and table naming
1 parent 4090930 commit 65306e3

File tree

3 files changed

+47
-46
lines changed

3 files changed

+47
-46
lines changed

docs/reporting-database.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ The document sections tables store the actual extracted data from document secti
111111

112112
### Dynamic Section Tables
113113

114-
Document sections are stored in dynamically created tables based on the section classification. Each section type gets its own table (e.g., `invoice`, `receipt`, `bank_statement`, etc.) with the following characteristics:
114+
Document sections are stored in dynamically created tables based on the section classification. Each section type gets its own table (e.g., `document_sections_invoice`, `document_sections_receipt`, `document_sections_bank_statement`, etc.) with the following characteristics:
115115

116116
**Common Metadata Columns:**
117117
| Column | Type | Description |

lib/idp_common_pkg/idp_common/reporting/save_reporting_data.py

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -179,49 +179,39 @@ def _flatten_json_data(
179179
def _create_dynamic_schema(self, records: List[Dict[str, Any]]) -> pa.Schema:
180180
"""
181181
Create a PyArrow schema dynamically from a list of records.
182+
Uses conservative typing - all fields default to string unless whitelisted.
183+
This prevents Athena type compatibility issues.
182184
183185
Args:
184186
records: List of dictionaries to analyze
185187
186188
Returns:
187-
PyArrow schema
189+
PyArrow schema with conservative string typing
188190
"""
191+
# Define fields that should maintain specific types
192+
TIMESTAMP_FIELDS = {
193+
"timestamp",
194+
"evaluation_date",
195+
}
196+
189197
if not records:
190198
# Return a minimal schema with just section_id
191199
return pa.schema([("section_id", pa.string())])
192200

193-
# Collect all unique keys and their types
194-
field_types = {}
195-
201+
# Collect all unique field names
202+
all_fields = set()
196203
for record in records:
197-
for key, value in record.items():
198-
if key not in field_types:
199-
field_types[key] = []
200-
field_types[key].append(self._infer_pyarrow_type(value))
204+
all_fields.update(record.keys())
201205

202-
# Create schema fields
206+
# Create schema with conservative typing
203207
schema_fields = []
204-
for field_name, types in field_types.items():
205-
# Use the most common type, defaulting to string if mixed types
206-
type_counts = {}
207-
for pa_type in types:
208-
type_str = str(pa_type)
209-
type_counts[type_str] = type_counts.get(type_str, 0) + 1
210-
211-
# Get the most frequent type
212-
most_common_type_str = max(type_counts, key=type_counts.get)
213-
214-
# Convert back to PyArrow type (simplified approach)
215-
if most_common_type_str == "string":
216-
pa_type = pa.string()
217-
elif most_common_type_str == "int64":
218-
pa_type = pa.int64()
219-
elif most_common_type_str == "float64":
220-
pa_type = pa.float64()
221-
elif most_common_type_str == "bool":
222-
pa_type = pa.bool_()
208+
for field_name in sorted(all_fields): # Sort for consistent ordering
209+
if field_name in TIMESTAMP_FIELDS:
210+
# Keep timestamps as timestamps for proper time-based queries
211+
pa_type = pa.timestamp("ms")
223212
else:
224-
pa_type = pa.string() # Default to string
213+
# Default everything else to string to prevent type conflicts
214+
pa_type = pa.string()
225215

226216
schema_fields.append((field_name, pa_type))
227217

@@ -758,6 +748,7 @@ def save_document_sections(self, document: Document) -> Optional[Dict[str, Any]]
758748
schema = self._create_dynamic_schema(section_records)
759749

760750
# Ensure all records conform to the schema by filling missing fields and converting types
751+
# With conservative typing, most fields will be strings to prevent type conflicts
761752
for record in section_records:
762753
for field in schema:
763754
field_name = field.name
@@ -768,19 +759,28 @@ def save_document_sections(self, document: Document) -> Optional[Dict[str, Any]]
768759
value = record[field_name]
769760
if value is not None:
770761
if field.type == pa.string():
762+
# Convert all values to strings for consistency
763+
record[field_name] = str(value)
764+
elif field.type == pa.timestamp("ms"):
765+
# Keep timestamps as datetime objects
766+
if isinstance(value, datetime.datetime):
767+
record[field_name] = value
768+
else:
769+
# Try to parse string timestamps
770+
try:
771+
if isinstance(value, str):
772+
record[field_name] = (
773+
datetime.datetime.fromisoformat(
774+
value.replace("Z", "+00:00")
775+
)
776+
)
777+
else:
778+
record[field_name] = None
779+
except (ValueError, TypeError):
780+
record[field_name] = None
781+
else:
782+
# For any other types, convert to string as fallback
771783
record[field_name] = str(value)
772-
elif field.type == pa.int64():
773-
try:
774-
record[field_name] = int(value)
775-
except (ValueError, TypeError):
776-
record[field_name] = None
777-
elif field.type == pa.float64():
778-
try:
779-
record[field_name] = float(value)
780-
except (ValueError, TypeError):
781-
record[field_name] = None
782-
elif field.type == pa.bool_():
783-
record[field_name] = bool(value)
784784

785785
# Create S3 key with separate tables for each section type
786786
# document_sections/{section_type}/date={date}/{escaped_doc_id}_section_{section_id}.parquet

template.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,7 +1585,7 @@ Resources:
15851585
Type: AWS::Glue::Crawler
15861586
Properties:
15871587
Name: !Sub "${AWS::StackName}-document-sections-crawler"
1588-
Description: "Crawler to discover document section tables in the reporting bucket"
1588+
Description: "Crawler to discover document section tables in the reporting bucket with conservative schema handling"
15891589
Role: !GetAtt DocumentSectionsCrawlerRole.Arn
15901590
DatabaseName: !Ref ReportingDatabase
15911591
Targets:
@@ -1605,9 +1605,10 @@ Resources:
16051605
"Version": 1.0,
16061606
"CrawlerOutput": {
16071607
"Partitions": { "AddOrUpdateBehavior": "InheritFromTable" },
1608-
"Tables": { "AddOrUpdateBehavior": "MergeNewColumns" },
1609-
"Grouping": { "TableLevelConfiguration": 2 }
1610-
}
1608+
"Tables": { "AddOrUpdateBehavior": "MergeNewColumns" }
1609+
},
1610+
"Grouping": { "TableLevelConfiguration": 2 },
1611+
"CreatePartitionIndex": true
16111612
}
16121613
Schedule: !If
16131614
- DocumentSectionsCrawlerScheduleEnabled

0 commit comments

Comments
 (0)