Skip to content

Commit a44bd24

Browse files
authored
elimination of redundant mappers (#45)
* elimination of redundant mappers * metadata (domain vs. avro vs. pydantic) fixes * events / admin events fix * unification of api (result -> skip/has more,..) * kafka C-level concurrency bug fix * ruff fixes, + extra doc about lock in producer
1 parent 0b1ef49 commit a44bd24

File tree

88 files changed

+662
-1933
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+662
-1933
lines changed

backend/app/api/routes/admin/events.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@
1212
from app.domain.enums.events import EventType
1313
from app.infrastructure.mappers import (
1414
AdminReplayApiMapper,
15-
EventDetailMapper,
1615
EventFilterMapper,
17-
EventMapper,
18-
EventStatisticsMapper,
19-
ReplaySessionMapper,
2016
)
2117
from app.schemas_pydantic.admin_events import (
2218
EventBrowseRequest,
@@ -50,9 +46,8 @@ async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEv
5046
sort_order=request.sort_order,
5147
)
5248

53-
event_mapper = EventMapper()
5449
return EventBrowseResponse(
55-
events=[jsonable_encoder(event_mapper.to_dict(event)) for event in result.events],
50+
events=[jsonable_encoder(event) for event in result.events],
5651
total=result.total,
5752
skip=result.skip,
5853
limit=result.limit,
@@ -69,8 +64,7 @@ async def get_event_stats(
6964
) -> EventStatsResponse:
7065
try:
7166
stats = await service.get_event_stats(hours=hours)
72-
stats_mapper = EventStatisticsMapper()
73-
return EventStatsResponse(**stats_mapper.to_dict(stats))
67+
return EventStatsResponse.model_validate(stats)
7468

7569
except Exception as e:
7670
raise HTTPException(status_code=500, detail=str(e))
@@ -147,12 +141,10 @@ async def get_event_detail(event_id: str, service: FromDishka[AdminEventsService
147141
if not result:
148142
raise HTTPException(status_code=404, detail="Event not found")
149143

150-
detail_mapper = EventDetailMapper()
151-
serialized_result = jsonable_encoder(detail_mapper.to_dict(result))
152144
return EventDetailResponse(
153-
event=serialized_result["event"],
154-
related_events=serialized_result["related_events"],
155-
timeline=serialized_result["timeline"],
145+
event=jsonable_encoder(result.event),
146+
related_events=[jsonable_encoder(e) for e in result.related_events],
147+
timeline=[jsonable_encoder(e) for e in result.timeline],
156148
)
157149

158150
except HTTPException:
@@ -209,8 +201,7 @@ async def get_replay_status(session_id: str, service: FromDishka[AdminEventsServ
209201
if not status:
210202
raise HTTPException(status_code=404, detail="Replay session not found")
211203

212-
replay_mapper = ReplaySessionMapper()
213-
return EventReplayStatusResponse(**replay_mapper.status_detail_to_dict(status))
204+
return EventReplayStatusResponse.model_validate(status)
214205

215206
except HTTPException:
216207
raise

backend/app/api/routes/admin/users.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@
99
from app.domain.enums.user import UserRole
1010
from app.domain.rate_limit import UserRateLimit
1111
from app.domain.user import UserUpdate as DomainUserUpdate
12-
from app.infrastructure.mappers import AdminOverviewApiMapper, UserMapper
13-
from app.schemas_pydantic.admin_user_overview import AdminUserOverview
12+
from app.schemas_pydantic.admin_user_overview import (
13+
AdminUserOverview,
14+
DerivedCounts,
15+
RateLimitSummary,
16+
)
17+
from app.schemas_pydantic.events import EventResponse, EventStatistics
1418
from app.schemas_pydantic.user import (
1519
DeleteUserResponse,
1620
MessageResponse,
@@ -48,17 +52,20 @@ async def list_users(
4852
role=role,
4953
)
5054

51-
user_mapper = UserMapper()
5255
summaries = await rate_limit_service.get_user_rate_limit_summaries([u.user_id for u in result.users])
5356
user_responses: list[UserResponse] = []
5457
for user in result.users:
55-
user_dict = user_mapper.to_response_dict(user)
58+
user_response = UserResponse.model_validate(user)
5659
summary = summaries.get(user.user_id)
5760
if summary:
58-
user_dict["bypass_rate_limit"] = summary.bypass_rate_limit
59-
user_dict["global_multiplier"] = summary.global_multiplier
60-
user_dict["has_custom_limits"] = summary.has_custom_limits
61-
user_responses.append(UserResponse(**user_dict))
61+
user_response = user_response.model_copy(
62+
update={
63+
"bypass_rate_limit": summary.bypass_rate_limit,
64+
"global_multiplier": summary.global_multiplier,
65+
"has_custom_limits": summary.has_custom_limits,
66+
}
67+
)
68+
user_responses.append(user_response)
6269

6370
return UserListResponse(
6471
users=user_responses,
@@ -80,8 +87,7 @@ async def create_user(
8087
domain_user = await admin_user_service.create_user(admin_username=admin.username, user_data=user_data)
8188
except ValueError as ve:
8289
raise HTTPException(status_code=400, detail=str(ve))
83-
user_mapper = UserMapper()
84-
return UserResponse(**user_mapper.to_response_dict(domain_user))
90+
return UserResponse.model_validate(domain_user)
8591

8692

8793
@router.get("/{user_id}", response_model=UserResponse)
@@ -94,8 +100,7 @@ async def get_user(
94100
if not user:
95101
raise HTTPException(status_code=404, detail="User not found")
96102

97-
user_mapper = UserMapper()
98-
return UserResponse(**user_mapper.to_response_dict(user))
103+
return UserResponse.model_validate(user)
99104

100105

101106
@router.get("/{user_id}/overview", response_model=AdminUserOverview)
@@ -109,8 +114,13 @@ async def get_user_overview(
109114
domain = await admin_user_service.get_user_overview(user_id=user_id, hours=24)
110115
except ValueError:
111116
raise HTTPException(status_code=404, detail="User not found")
112-
mapper = AdminOverviewApiMapper()
113-
return mapper.to_response(domain)
117+
return AdminUserOverview(
118+
user=UserResponse.model_validate(domain.user),
119+
stats=EventStatistics.model_validate(domain.stats),
120+
derived_counts=DerivedCounts.model_validate(domain.derived_counts),
121+
rate_limit_summary=RateLimitSummary.model_validate(domain.rate_limit_summary),
122+
recent_events=[EventResponse.model_validate(e).model_dump() for e in domain.recent_events],
123+
)
114124

115125

116126
@router.put("/{user_id}", response_model=UserResponse)
@@ -141,8 +151,7 @@ async def update_user(
141151
if not updated_user:
142152
raise HTTPException(status_code=500, detail="Failed to update user")
143153

144-
user_mapper = UserMapper()
145-
return UserResponse(**user_mapper.to_response_dict(updated_user))
154+
return UserResponse.model_validate(updated_user)
146155

147156

148157
@router.delete("/{user_id}", response_model=DeleteUserResponse)

backend/app/api/routes/dlq.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,8 @@ async def get_dlq_messages(
5959
status=status, topic=topic, event_type=event_type, limit=limit, offset=offset
6060
)
6161

62-
# Convert domain messages to response models
63-
messages = [
64-
DLQMessageResponse(
65-
event_id=msg.event_id or "unknown",
66-
event_type=msg.event_type,
67-
original_topic=msg.original_topic,
68-
error=msg.error,
69-
retry_count=msg.retry_count,
70-
failed_at=msg.failed_at or datetime(1970, 1, 1, tzinfo=timezone.utc),
71-
status=DLQMessageStatus(msg.status),
72-
age_seconds=msg.age_seconds,
73-
details={
74-
"producer_id": msg.producer_id,
75-
"dlq_offset": msg.dlq_offset,
76-
"dlq_partition": msg.dlq_partition,
77-
"last_error": msg.last_error,
78-
"next_retry_at": msg.next_retry_at,
79-
},
80-
)
81-
for msg in result.messages
82-
]
62+
# Convert domain messages to response models using model_validate
63+
messages = [DLQMessageResponse.model_validate(msg) for msg in result.messages]
8364

8465
return DLQMessagesResponse(messages=messages, total=result.total, offset=result.offset, limit=result.limit)
8566

@@ -163,15 +144,4 @@ async def discard_dlq_message(
163144
@router.get("/topics", response_model=List[DLQTopicSummaryResponse])
164145
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> List[DLQTopicSummaryResponse]:
165146
topics = await repository.get_topics_summary()
166-
return [
167-
DLQTopicSummaryResponse(
168-
topic=topic.topic,
169-
total_messages=topic.total_messages,
170-
status_breakdown=topic.status_breakdown,
171-
oldest_message=topic.oldest_message,
172-
newest_message=topic.newest_message,
173-
avg_retry_count=topic.avg_retry_count,
174-
max_retry_count=topic.max_retry_count,
175-
)
176-
for topic in topics
177-
]
147+
return [DLQTopicSummaryResponse.model_validate(topic) for topic in topics]

backend/app/api/routes/events.py

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
from app.core.utils import get_client_ip
1313
from app.domain.enums.common import SortOrder
1414
from app.domain.events.event_models import EventFilter
15-
from app.infrastructure.kafka.events.metadata import EventMetadata
16-
from app.infrastructure.mappers import EventMapper, EventStatisticsMapper
15+
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
1716
from app.schemas_pydantic.events import (
1817
DeleteEventResponse,
1918
EventAggregationRequest,
@@ -39,21 +38,30 @@ async def get_execution_events(
3938
current_user: Annotated[UserResponse, Depends(current_user)],
4039
event_service: FromDishka[EventService],
4140
include_system_events: bool = Query(False, description="Include system-generated events"),
41+
limit: int = Query(100, ge=1, le=1000),
42+
skip: int = Query(0, ge=0),
4243
) -> EventListResponse:
43-
mapper = EventMapper()
44-
events = await event_service.get_execution_events(
44+
result = await event_service.get_execution_events(
4545
execution_id=execution_id,
4646
user_id=current_user.user_id,
4747
user_role=current_user.role,
4848
include_system_events=include_system_events,
49+
limit=limit,
50+
skip=skip,
4951
)
5052

51-
if events is None:
53+
if result is None:
5254
raise HTTPException(status_code=403, detail="Access denied")
5355

54-
event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
56+
event_responses = [EventResponse.model_validate(event) for event in result.events]
5557

56-
return EventListResponse(events=event_responses, total=len(event_responses), limit=1000, skip=0, has_more=False)
58+
return EventListResponse(
59+
events=event_responses,
60+
total=result.total,
61+
limit=limit,
62+
skip=skip,
63+
has_more=result.has_more,
64+
)
5765

5866

5967
@router.get("/user", response_model=EventListResponse)
@@ -68,7 +76,6 @@ async def get_user_events(
6876
sort_order: SortOrder = Query(SortOrder.DESC),
6977
) -> EventListResponse:
7078
"""Get events for the current user"""
71-
mapper = EventMapper()
7279
result = await event_service.get_user_events_paginated(
7380
user_id=current_user.user_id,
7481
event_types=event_types,
@@ -79,7 +86,7 @@ async def get_user_events(
7986
sort_order=sort_order,
8087
)
8188

82-
event_responses = [EventResponse(**mapper.to_dict(event)) for event in result.events]
89+
event_responses = [EventResponse.model_validate(event) for event in result.events]
8390

8491
return EventListResponse(
8592
events=event_responses, total=result.total, limit=limit, skip=skip, has_more=result.has_more
@@ -92,7 +99,6 @@ async def query_events(
9299
filter_request: EventFilterRequest,
93100
event_service: FromDishka[EventService],
94101
) -> EventListResponse:
95-
mapper = EventMapper()
96102
event_filter = EventFilter(
97103
event_types=[str(et) for et in filter_request.event_types] if filter_request.event_types else None,
98104
aggregate_id=filter_request.aggregate_id,
@@ -116,7 +122,7 @@ async def query_events(
116122
if result is None:
117123
raise HTTPException(status_code=403, detail="Cannot query other users' events")
118124

119-
event_responses = [EventResponse(**mapper.to_dict(event)) for event in result.events]
125+
event_responses = [EventResponse.model_validate(event) for event in result.events]
120126

121127
return EventListResponse(
122128
events=event_responses, total=result.total, limit=result.limit, skip=result.skip, has_more=result.has_more
@@ -130,43 +136,57 @@ async def get_events_by_correlation(
130136
event_service: FromDishka[EventService],
131137
include_all_users: bool = Query(False, description="Include events from all users (admin only)"),
132138
limit: int = Query(100, ge=1, le=1000),
139+
skip: int = Query(0, ge=0),
133140
) -> EventListResponse:
134-
mapper = EventMapper()
135-
events = await event_service.get_events_by_correlation(
141+
result = await event_service.get_events_by_correlation(
136142
correlation_id=correlation_id,
137143
user_id=current_user.user_id,
138144
user_role=current_user.role,
139145
include_all_users=include_all_users,
140146
limit=limit,
147+
skip=skip,
141148
)
142149

143-
event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
150+
event_responses = [EventResponse.model_validate(event) for event in result.events]
144151

145-
return EventListResponse(events=event_responses, total=len(event_responses), limit=limit, skip=0, has_more=False)
152+
return EventListResponse(
153+
events=event_responses,
154+
total=result.total,
155+
limit=limit,
156+
skip=skip,
157+
has_more=result.has_more,
158+
)
146159

147160

148161
@router.get("/current-request", response_model=EventListResponse)
149162
async def get_current_request_events(
150163
current_user: Annotated[UserResponse, Depends(current_user)],
151164
event_service: FromDishka[EventService],
152165
limit: int = Query(100, ge=1, le=1000),
166+
skip: int = Query(0, ge=0),
153167
) -> EventListResponse:
154-
mapper = EventMapper()
155168
correlation_id = CorrelationContext.get_correlation_id()
156169
if not correlation_id:
157-
return EventListResponse(events=[], total=0, limit=limit, skip=0, has_more=False)
170+
return EventListResponse(events=[], total=0, limit=limit, skip=skip, has_more=False)
158171

159-
events = await event_service.get_events_by_correlation(
172+
result = await event_service.get_events_by_correlation(
160173
correlation_id=correlation_id,
161174
user_id=current_user.user_id,
162175
user_role=current_user.role,
163176
include_all_users=False,
164177
limit=limit,
178+
skip=skip,
165179
)
166180

167-
event_responses = [EventResponse(**mapper.to_dict(event)) for event in events]
181+
event_responses = [EventResponse.model_validate(event) for event in result.events]
168182

169-
return EventListResponse(events=event_responses, total=len(event_responses), limit=limit, skip=0, has_more=False)
183+
return EventListResponse(
184+
events=event_responses,
185+
total=result.total,
186+
limit=limit,
187+
skip=skip,
188+
has_more=result.has_more,
189+
)
170190

171191

172192
@router.get("/statistics", response_model=EventStatistics)
@@ -190,20 +210,18 @@ async def get_event_statistics(
190210
include_all_users=include_all_users,
191211
)
192212

193-
stats_mapper = EventStatisticsMapper()
194-
return EventStatistics(**stats_mapper.to_dict(stats))
213+
return EventStatistics.model_validate(stats)
195214

196215

197216
@router.get("/{event_id}", response_model=EventResponse)
198217
async def get_event(
199218
event_id: str, current_user: Annotated[UserResponse, Depends(current_user)], event_service: FromDishka[EventService]
200219
) -> EventResponse:
201220
"""Get a specific event by ID"""
202-
mapper = EventMapper()
203221
event = await event_service.get_event(event_id=event_id, user_id=current_user.user_id, user_role=current_user.role)
204222
if event is None:
205223
raise HTTPException(status_code=404, detail="Event not found")
206-
return EventResponse(**mapper.to_dict(event))
224+
return EventResponse.model_validate(event)
207225

208226

209227
@router.post("/publish", response_model=PublishEventResponse)

0 commit comments

Comments
 (0)