3434
3535def _extract_record_metadata (
3636 record : DynamoDBRecord ,
37- ) -> tuple [str , str , Optional [Dict [str , Any ]], Optional [Dict [str , Any ]], str ]:
37+ ) -> tuple [
38+ str ,
39+ str ,
40+ Optional [Dict [str , Any ]],
41+ Optional [Dict [str , Any ]],
42+ str ,
43+ str ,
44+ Dict [str , Any ],
45+ ]:
3846 """Extract metadata from stream record."""
3947 event_source_arn = record .event_source_arn or ""
4048 full_table_name = extract_table_name_from_arn (event_source_arn )
@@ -45,49 +53,52 @@ def _extract_record_metadata(
4553 new_image = record .dynamodb .new_image
4654
4755 record_id = keys .get ("id" )
56+ # Field name for composite key: "document" for gp org entities, may differ for pharmacy
57+ field_name = keys .get ("field" , "document" )
4858 event_name = record .get ("eventName" , "MODIFY" )
4959
50- LOGGER .log (
51- VersionHistoryLogBase .VH_PROCESSOR_006 ,
52- entity_name = entity_name ,
53- record_id = record_id ,
54- field_name = "document" ,
55- has_old_image = old_image is not None ,
56- has_new_image = new_image is not None ,
57- keys = keys ,
58- )
60+ return entity_name , event_name , old_image , new_image , record_id , field_name , keys
5961
60- LOGGER .log (
61- VersionHistoryLogBase .VH_PROCESSOR_003 ,
62- entity_name = entity_name ,
63- record_id = record_id ,
64- field_name = "document" ,
65- event_type = event_name ,
66- )
67-
68- return entity_name , event_name , old_image , new_image , record_id
6962
70-
71- def _extract_document_values (
63+ def _extract_field_values (
7264 old_image : Optional [Dict [str , Any ]],
7365 new_image : Optional [Dict [str , Any ]],
66+ field_name : str ,
7467) -> tuple [Optional [Dict [str , Any ]], Optional [Dict [str , Any ]]]:
75- """Extract document values, excluding system fields."""
76- old_value = (
77- {k : v for k , v in old_image .items () if k not in SYSTEM_FIELDS }
78- if old_image
79- else None
80- )
81- new_value = (
82- {k : v for k , v in new_image .items () if k not in SYSTEM_FIELDS }
83- if new_image
84- else None
85- )
68+ """Extract field values from DynamoDB images.
69+
70+ For 'document' field: extracts entire record excluding system fields.
71+ For specific fields: could extract individual field values.
72+
73+ Args:
74+ old_image: Previous DynamoDB image
75+ new_image: New DynamoDB image
76+ field_name: Name of field to extract (e.g., 'document')
77+
78+ Returns:
79+ Tuple of (old_value, new_value)
80+ """
81+ # For 'document' field type, extract entire record minus system fields
82+ if field_name == "document" :
83+ old_value = (
84+ {k : v for k , v in old_image .items () if k not in SYSTEM_FIELDS }
85+ if old_image
86+ else None
87+ )
88+ new_value = (
89+ {k : v for k , v in new_image .items () if k not in SYSTEM_FIELDS }
90+ if new_image
91+ else None
92+ )
93+ else :
94+ # For specific field tracking, extract just that field
95+ old_value = old_image .get (field_name ) if old_image else None
96+ new_value = new_image .get (field_name ) if new_image else None
8697
8798 LOGGER .log (
8899 VersionHistoryLogBase .VH_PROCESSOR_017 ,
89- field_name = "document" ,
90- is_document_field = True ,
100+ field_name = field_name ,
101+ is_document_field = ( field_name == "document" ) ,
91102 system_fields_count = len (SYSTEM_FIELDS ),
92103 )
93104
@@ -128,6 +139,7 @@ def _should_skip_update(
128139 field_delta : Dict [str , Any ],
129140 entity_name : str ,
130141 record_id : str ,
142+ field_name : str ,
131143) -> bool :
132144 """Check if UPDATE should be skipped due to no meaningful changes."""
133145 if change_type != "UPDATE" :
@@ -139,7 +151,7 @@ def _should_skip_update(
139151 VersionHistoryLogBase .VH_PROCESSOR_015 ,
140152 entity_name = entity_name ,
141153 record_id = record_id ,
142- field_name = "document" ,
154+ field_name = field_name ,
143155 )
144156 return True
145157 else :
@@ -158,6 +170,7 @@ def _create_version_item(
158170 field_delta : Dict [str , Any ],
159171 new_image : Optional [Dict [str , Any ]],
160172 old_image : Optional [Dict [str , Any ]],
173+ field_name : str ,
161174) -> Dict [str , Any ]:
162175 """Build version history item for DynamoDB."""
163176 changed_by = extract_changed_by (new_image or old_image or {})
@@ -169,7 +182,7 @@ def _create_version_item(
169182 changed_by_display = changed_by .get ("display" ),
170183 )
171184
172- entity_id = f"{ entity_name } | { record_id } |details "
185+ entity_id = f"{ entity_name } # { record_id } # { field_name } "
173186 timestamp = datetime .now (UTC )
174187
175188 LOGGER .log (
@@ -182,7 +195,7 @@ def _create_version_item(
182195 "entity_id" : entity_id ,
183196 "timestamp" : timestamp .isoformat (),
184197 "change_type" : change_type ,
185- "changed_fields" : {"details" : field_delta },
198+ "changed_fields" : {field_name : field_delta },
186199 "changed_by" : changed_by ,
187200 }
188201
@@ -192,24 +205,44 @@ def process_stream_record(
192205 version_history_table : "Table" ,
193206) -> None :
194207 """Process DynamoDB stream record and write to version history."""
195- entity_name , event_name , old_image , new_image , record_id = _extract_record_metadata (
196- record
208+ entity_name , event_name , old_image , new_image , record_id , field_name , keys = (
209+ _extract_record_metadata (record )
210+ )
211+
212+ LOGGER .log (
213+ VersionHistoryLogBase .VH_PROCESSOR_006 ,
214+ entity_name = entity_name ,
215+ record_id = record_id ,
216+ field_name = field_name ,
217+ has_old_image = old_image is not None ,
218+ has_new_image = new_image is not None ,
219+ keys = keys ,
220+ )
221+
222+ LOGGER .log (
223+ VersionHistoryLogBase .VH_PROCESSOR_003 ,
224+ entity_name = entity_name ,
225+ record_id = record_id ,
226+ field_name = field_name ,
227+ event_type = event_name ,
197228 )
198229
199- old_value , new_value = _extract_document_values (old_image , new_image )
230+ old_value , new_value = _extract_field_values (old_image , new_image , field_name )
200231 change_type = _determine_change_type (event_name , old_value , new_value )
201232 field_delta = compute_field_delta (old_value , new_value )
202233
203234 LOGGER .log (
204235 VersionHistoryLogBase .VH_PROCESSOR_008 ,
205- field_name = "document" ,
236+ field_name = field_name ,
206237 delta_keys = list (field_delta .keys ()),
207238 has_diff = "diff" in field_delta ,
208239 delta = field_delta ,
209240 values_equal = field_delta .get ("old" ) == field_delta .get ("new" ),
210241 )
211242
212- if _should_skip_update (change_type , field_delta , entity_name , record_id ):
243+ if _should_skip_update (
244+ change_type , field_delta , entity_name , record_id , field_name
245+ ):
213246 return
214247
215248 LOGGER .log (
@@ -225,6 +258,7 @@ def process_stream_record(
225258 field_delta ,
226259 new_image ,
227260 old_image ,
261+ field_name ,
228262 )
229263
230264 LOGGER .log (
@@ -238,5 +272,5 @@ def process_stream_record(
238272 VersionHistoryLogBase .VH_PROCESSOR_002 ,
239273 entity_id = version_item ["entity_id" ],
240274 change_type = change_type ,
241- changed_fields = ["details" ],
275+ changed_fields = [field_name ],
242276 )
0 commit comments