Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/backend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:
SCHEMA_SUBJECT_PREFIX: "ci.${{ github.run_id }}."
run: |
cd backend
uv run pytest tests/integration -v --cov=app --cov-branch --cov-report=xml --cov-report=term
uv run pytest tests/integration -v -rs --cov=app --cov-branch --cov-report=xml --cov-report=term

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/mypy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ jobs:
SECRET_KEY: ${{ secrets.TEST_SECRET_KEY }}
run: |
cd backend
uv run mypy --config-file pyproject.toml .
uv run mypy --config-file pyproject.toml --strict .
10 changes: 2 additions & 8 deletions backend/app/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,12 @@


@inject
async def current_user(
request: Request,
auth_service: FromDishka[AuthService]
) -> UserResponse:
async def current_user(request: Request, auth_service: FromDishka[AuthService]) -> UserResponse:
"""Get authenticated user."""
return await auth_service.get_current_user(request)


@inject
async def admin_user(
request: Request,
auth_service: FromDishka[AuthService]
) -> UserResponse:
async def admin_user(request: Request, auth_service: FromDishka[AuthService]) -> UserResponse:
"""Get authenticated admin user."""
return await auth_service.get_admin(request)
169 changes: 75 additions & 94 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,12 @@
from app.services.admin import AdminEventsService

router = APIRouter(
prefix="/admin/events",
tags=["admin-events"],
route_class=DishkaRoute,
dependencies=[Depends(admin_user)]
prefix="/admin/events", tags=["admin-events"], route_class=DishkaRoute, dependencies=[Depends(admin_user)]
)


@router.post("/browse")
async def browse_events(
request: EventBrowseRequest,
service: FromDishka[AdminEventsService]
) -> EventBrowseResponse:
async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEventsService]) -> EventBrowseResponse:
try:
event_filter = EventFilterMapper.from_admin_pydantic(request.filters)

Expand All @@ -53,15 +47,15 @@ async def browse_events(
skip=request.skip,
limit=request.limit,
sort_by=request.sort_by,
sort_order=request.sort_order
sort_order=request.sort_order,
)

event_mapper = EventMapper()
return EventBrowseResponse(
events=[jsonable_encoder(event_mapper.to_dict(event)) for event in result.events],
total=result.total,
skip=result.skip,
limit=result.limit
limit=result.limit,
)

except Exception as e:
Expand All @@ -70,8 +64,8 @@ async def browse_events(

@router.get("/stats")
async def get_event_stats(
service: FromDishka[AdminEventsService],
hours: int = Query(default=24, le=168),
service: FromDishka[AdminEventsService],
hours: int = Query(default=24, le=168),
) -> EventStatsResponse:
try:
stats = await service.get_event_stats(hours=hours)
Expand All @@ -82,11 +76,71 @@ async def get_event_stats(
raise HTTPException(status_code=500, detail=str(e))


@router.get("/export/csv")
async def export_events_csv(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
) -> StreamingResponse:
try:
export_filter = EventFilterMapper.from_admin_pydantic(
AdminEventFilter(
event_types=event_types,
start_time=start_time,
end_time=end_time,
)
)
result = await service.export_events_csv_content(filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.get("/export/json")
async def export_events_json(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
aggregate_id: str | None = Query(None, description="Aggregate ID filter"),
correlation_id: str | None = Query(None, description="Correlation ID filter"),
user_id: str | None = Query(None, description="User ID filter"),
service_name: str | None = Query(None, description="Service name filter"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
) -> StreamingResponse:
"""Export events as JSON with comprehensive filtering."""
try:
export_filter = EventFilterMapper.from_admin_pydantic(
AdminEventFilter(
event_types=event_types,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
service_name=service_name,
start_time=start_time,
end_time=end_time,
)
)
result = await service.export_events_json_content(filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.file_name}"},
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.get("/{event_id}")
async def get_event_detail(
event_id: str,
service: FromDishka[AdminEventsService]
) -> EventDetailResponse:
async def get_event_detail(event_id: str, service: FromDishka[AdminEventsService]) -> EventDetailResponse:
try:
result = await service.get_event_detail(event_id)

Expand All @@ -98,7 +152,7 @@ async def get_event_detail(
return EventDetailResponse(
event=serialized_result["event"],
related_events=serialized_result["related_events"],
timeline=serialized_result["timeline"]
timeline=serialized_result["timeline"],
)

except HTTPException:
Expand All @@ -109,9 +163,7 @@ async def get_event_detail(

@router.post("/replay")
async def replay_events(
request: EventReplayRequest,
background_tasks: BackgroundTasks,
service: FromDishka[AdminEventsService]
request: EventReplayRequest, background_tasks: BackgroundTasks, service: FromDishka[AdminEventsService]
) -> EventReplayResponse:
try:
replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}"
Expand Down Expand Up @@ -150,10 +202,7 @@ async def replay_events(


@router.get("/replay/{session_id}/status")
async def get_replay_status(
session_id: str,
service: FromDishka[AdminEventsService]
) -> EventReplayStatusResponse:
async def get_replay_status(session_id: str, service: FromDishka[AdminEventsService]) -> EventReplayStatusResponse:
try:
status = await service.get_replay_status(session_id)

Expand All @@ -171,84 +220,16 @@ async def get_replay_status(

@router.delete("/{event_id}")
async def delete_event(
event_id: str,
admin: Annotated[UserResponse, Depends(admin_user)],
service: FromDishka[AdminEventsService]
event_id: str, admin: Annotated[UserResponse, Depends(admin_user)], service: FromDishka[AdminEventsService]
) -> EventDeleteResponse:
try:
deleted = await service.delete_event(event_id=event_id, deleted_by=admin.email)
if not deleted:
raise HTTPException(status_code=500, detail="Failed to delete event")

return EventDeleteResponse(
message="Event deleted and archived",
event_id=event_id
)
return EventDeleteResponse(message="Event deleted and archived", event_id=event_id)

except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.get("/export/csv")
async def export_events_csv(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
) -> StreamingResponse:
try:
export_filter = EventFilterMapper.from_admin_pydantic(
AdminEventFilter(
event_types=event_types,
start_time=start_time,
end_time=end_time,
)
)
result = await service.export_events_csv_content(filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.filename}"},
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.get("/export/json")
async def export_events_json(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
aggregate_id: str | None = Query(None, description="Aggregate ID filter"),
correlation_id: str | None = Query(None, description="Correlation ID filter"),
user_id: str | None = Query(None, description="User ID filter"),
service_name: str | None = Query(None, description="Service name filter"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
) -> StreamingResponse:
"""Export events as JSON with comprehensive filtering."""
try:
export_filter = EventFilterMapper.from_admin_pydantic(
AdminEventFilter(
event_types=event_types,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
service_name=service_name,
start_time=start_time,
end_time=end_time,
)
)
result = await service.export_events_json_content(filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
headers={"Content-Disposition": f"attachment; filename={result.filename}"},
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
24 changes: 9 additions & 15 deletions backend/app/api/routes/admin/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
from app.services.admin import AdminSettingsService

router = APIRouter(
prefix="/admin/settings",
tags=["admin", "settings"],
route_class=DishkaRoute,
dependencies=[Depends(admin_user)]
prefix="/admin/settings", tags=["admin", "settings"], route_class=DishkaRoute, dependencies=[Depends(admin_user)]
)


@router.get("/", response_model=SystemSettings)
async def get_system_settings(
admin: Annotated[UserResponse, Depends(admin_user)],
service: FromDishka[AdminSettingsService],
admin: Annotated[UserResponse, Depends(admin_user)],
service: FromDishka[AdminSettingsService],
) -> SystemSettings:
try:
domain_settings = await service.get_system_settings(admin.username)
Expand All @@ -35,18 +32,15 @@ async def get_system_settings(

@router.put("/", response_model=SystemSettings)
async def update_system_settings(
admin: Annotated[UserResponse, Depends(admin_user)],
settings: SystemSettings,
service: FromDishka[AdminSettingsService],
admin: Annotated[UserResponse, Depends(admin_user)],
settings: SystemSettings,
service: FromDishka[AdminSettingsService],
) -> SystemSettings:
try:
settings_mapper = SettingsMapper()
domain_settings = settings_mapper.system_settings_from_pydantic(settings.model_dump())
except (ValueError, ValidationError, KeyError) as e:
raise HTTPException(
status_code=422,
detail=f"Invalid settings: {str(e)}"
)
raise HTTPException(status_code=422, detail=f"Invalid settings: {str(e)}")
except Exception:
raise HTTPException(status_code=400, detail="Invalid settings format")

Expand All @@ -68,8 +62,8 @@ async def update_system_settings(

@router.post("/reset", response_model=SystemSettings)
async def reset_system_settings(
admin: Annotated[UserResponse, Depends(admin_user)],
service: FromDishka[AdminSettingsService],
admin: Annotated[UserResponse, Depends(admin_user)],
service: FromDishka[AdminSettingsService],
) -> SystemSettings:
try:
reset_domain_settings = await service.reset_system_settings(admin.username, admin.user_id)
Expand Down
Loading
Loading