1616 DomainUserSettings ,
1717 DomainUserSettingsUpdate ,
1818)
19- from app .infrastructure .kafka .events .user import EditorSettingsPayload , NotificationSettingsPayload
2019from app .services .event_bus import EventBusEvent , EventBusManager
2120from app .services .kafka_event_service import KafkaEventService
2221
@@ -120,9 +119,6 @@ async def update_user_settings(
120119 await self .repository .create_snapshot (new_settings )
121120 return new_settings
122121
123- # Mapping for nested settings → typed Avro payloads
124- _payload_types = {"notifications" : NotificationSettingsPayload , "editor" : EditorSettingsPayload }
125-
126122 async def _publish_settings_event (
127123 self , user_id : str , changes : dict [str , Any ], reason : str | None
128124 ) -> None :
@@ -134,7 +130,7 @@ async def _publish_settings_event(
134130 "user_id" : user_id ,
135131 "changed_fields" : list (changes .keys ()),
136132 "reason" : reason ,
137- ** { k : self . _payload_types [ k ]( ** v ) if k in self . _payload_types else v for k , v in changes . items ()} ,
133+ ** changes ,
138134 },
139135 metadata = None ,
140136 )
@@ -178,15 +174,14 @@ async def get_settings_history(self, user_id: str, limit: int = 50) -> List[Doma
178174 history : list [DomainSettingsHistoryEntry ] = []
179175 for event in events :
180176 changed_fields = event .payload .get ("changed_fields" , [])
181- changes = event .payload .get ("changes" , {})
182177 for field in changed_fields :
183178 history .append (
184179 DomainSettingsHistoryEntry (
185180 timestamp = event .timestamp ,
186181 event_type = event .event_type ,
187182 field = f"/{ field } " ,
188183 old_value = None ,
189- new_value = changes .get (field ),
184+ new_value = event . payload .get (field ),
190185 reason = event .payload .get ("reason" ),
191186 correlation_id = event .correlation_id ,
192187 )
@@ -207,14 +202,13 @@ async def restore_settings_to_point(self, user_id: str, timestamp: datetime) ->
207202 await self .repository .create_snapshot (settings )
208203 self ._add_to_cache (user_id , settings )
209204
210- # Publish restoration event
205+ # Publish restoration event (marker only, no field changes)
211206 await self .event_service .publish_event (
212207 event_type = EventType .USER_SETTINGS_UPDATED ,
213208 aggregate_id = f"user_settings_{ user_id } " ,
214209 payload = {
215210 "user_id" : user_id ,
216- "changed_fields" : ["restored" ],
217- "changes" : {"restored_to" : timestamp .isoformat ()},
211+ "changed_fields" : [],
218212 "reason" : f"Settings restored to { timestamp .isoformat ()} " ,
219213 },
220214 metadata = None ,
@@ -243,9 +237,13 @@ async def _get_settings_events(
243237 for e in raw
244238 ]
245239
240+ # Fields that are stored directly in event payload (not in nested 'changes')
241+ _settings_fields = {"theme" , "timezone" , "date_format" , "time_format" , "notifications" , "editor" }
242+
246243 def _apply_event (self , settings : DomainUserSettings , event : DomainSettingsEvent ) -> DomainUserSettings :
247244 """Apply a settings update event using TypeAdapter merge."""
248- changes = event .payload .get ("changes" , {})
245+ # Extract changes from typed fields in payload
246+ changes = {k : v for k , v in event .payload .items () if k in self ._settings_fields and v is not None }
249247 if not changes :
250248 return settings
251249
0 commit comments