Skip to content

Commit 6280365

Browse files
committed
moved k8s tests to e2e, fixed inconsistencies in user settings
1 parent 25338cf commit 6280365

File tree

4 files changed

+78
-60
lines changed

4 files changed

+78
-60
lines changed

.github/actions/docker-cache/action.yml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ runs:
1212
- name: Generate cache key from images
1313
id: cache-key
1414
shell: bash
15+
env:
16+
IMAGES_INPUT: ${{ inputs.images }}
1517
run: |
1618
# Create a stable hash from the sorted image list
17-
IMAGES_HASH=$(echo "${{ inputs.images }}" | tr ' ' '\n' | sort | md5sum | cut -d' ' -f1)
19+
# Using env var to prevent script injection
20+
IMAGES_HASH=$(echo "$IMAGES_INPUT" | tr ' ' '\n' | sort | md5sum | cut -d' ' -f1)
1821
echo "key=docker-${{ runner.os }}-${IMAGES_HASH}" >> $GITHUB_OUTPUT
1922
2023
- name: Cache Docker images
@@ -38,17 +41,19 @@ runs:
3841
- name: Pull and save Docker images
3942
if: steps.docker-cache.outputs.cache-hit != 'true'
4043
shell: bash
44+
env:
45+
IMAGES_INPUT: ${{ inputs.images }}
4146
run: |
4247
mkdir -p /tmp/docker-cache
4348
4449
echo "Pulling images in parallel..."
45-
for img in ${{ inputs.images }}; do
50+
for img in $IMAGES_INPUT; do
4651
docker pull "$img" &
4752
done
4853
wait
4954
5055
echo "Saving images with zstd compression..."
51-
for img in ${{ inputs.images }}; do
56+
for img in $IMAGES_INPUT; do
5257
# Create filename from image name (replace special chars)
5358
filename=$(echo "$img" | tr '/:' '_')
5459
docker save "$img" | zstd -T0 -3 > "/tmp/docker-cache/${filename}.tar.zst" &

backend/app/infrastructure/kafka/events/user.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class UserSettingsUpdatedEvent(BaseEvent):
5151
topic: ClassVar[KafkaTopic] = KafkaTopic.USER_SETTINGS_EVENTS
5252
user_id: str
5353
settings_type: SettingsType
54-
changes: dict[str, str]
54+
updated: dict[str, str]
5555

5656

5757
class UserThemeChangedEvent(BaseEvent):

backend/app/services/user_settings_service.py

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import json
23
from datetime import datetime, timedelta, timezone
34
from typing import Any, List
45

@@ -182,17 +183,17 @@ async def update_user_settings(
182183
settings_type = SettingsType.DISPLAY
183184
else:
184185
settings_type = SettingsType.PREFERENCES
185-
# Flatten changes to string map for the generic event
186-
changes: dict[str, str] = {}
187-
for k, v in updated.items():
188-
changes[k] = str(v)
186+
# Stringify all values for Avro compatibility (nested dicts become JSON strings)
187+
updated_stringified: dict[str, str] = {
188+
k: json.dumps(v) if isinstance(v, dict) else str(v) for k, v in updated.items()
189+
}
189190
await self.event_service.publish_event(
190191
event_type=EventType.USER_SETTINGS_UPDATED,
191192
aggregate_id=f"user_settings_{user_id}",
192193
payload={
193194
"user_id": user_id,
194195
"settings_type": settings_type,
195-
"changes": changes,
196+
"updated": updated_stringified,
196197
"reason": reason,
197198
},
198199
metadata=None,
@@ -297,7 +298,7 @@ async def restore_settings_to_point(self, user_id: str, timestamp: datetime) ->
297298
payload={
298299
"user_id": user_id,
299300
"settings_type": SettingsType.PREFERENCES,
300-
"changes": {"restored_to": timestamp.isoformat()},
301+
"updated": {"restored_to": timestamp.isoformat()},
301302
},
302303
metadata=None,
303304
)
@@ -339,32 +340,22 @@ def _apply_event(self, settings: DomainUserSettings, event: DomainSettingsEvent)
339340
settings.theme = Theme(new_theme)
340341
return settings
341342

342-
upd = event.payload.get("updated")
343-
if not upd:
344-
return settings
345-
346-
# Top-level
347-
if "theme" in upd:
348-
settings.theme = Theme(upd["theme"])
349-
if "timezone" in upd:
350-
settings.timezone = upd["timezone"]
351-
if "date_format" in upd:
352-
settings.date_format = upd["date_format"]
353-
if "time_format" in upd:
354-
settings.time_format = upd["time_format"]
355-
# Nested
356-
if "notifications" in upd and isinstance(upd["notifications"], dict):
357-
n = upd["notifications"]
358-
channels: list[NotificationChannel] = [NotificationChannel(c) for c in n.get("channels", [])]
343+
if event.event_type == EventType.USER_NOTIFICATION_SETTINGS_UPDATED:
344+
n = event.payload.get("settings", {})
345+
channels_raw = event.payload.get("channels", [])
346+
channels: list[NotificationChannel] = [NotificationChannel(c) for c in channels_raw] if channels_raw else []
359347
settings.notifications = DomainNotificationSettings(
360348
execution_completed=n.get("execution_completed", settings.notifications.execution_completed),
361349
execution_failed=n.get("execution_failed", settings.notifications.execution_failed),
362350
system_updates=n.get("system_updates", settings.notifications.system_updates),
363351
security_alerts=n.get("security_alerts", settings.notifications.security_alerts),
364352
channels=channels or settings.notifications.channels,
365353
)
366-
if "editor" in upd and isinstance(upd["editor"], dict):
367-
e = upd["editor"]
354+
settings.updated_at = event.timestamp
355+
return settings
356+
357+
if event.event_type == EventType.USER_EDITOR_SETTINGS_UPDATED:
358+
e = event.payload.get("settings", {})
368359
settings.editor = DomainEditorSettings(
369360
theme=e.get("theme", settings.editor.theme),
370361
font_size=e.get("font_size", settings.editor.font_size),
@@ -373,8 +364,58 @@ def _apply_event(self, settings: DomainUserSettings, event: DomainSettingsEvent)
373364
word_wrap=e.get("word_wrap", settings.editor.word_wrap),
374365
show_line_numbers=e.get("show_line_numbers", settings.editor.show_line_numbers),
375366
)
376-
if "custom_settings" in upd and isinstance(upd["custom_settings"], dict):
377-
settings.custom_settings = upd["custom_settings"]
367+
settings.updated_at = event.timestamp
368+
return settings
369+
370+
upd = event.payload.get("updated")
371+
if not upd:
372+
return settings
373+
374+
# Helper to parse JSON strings or return dict as-is
375+
def parse_value(val: object) -> object:
376+
if isinstance(val, str):
377+
try:
378+
return json.loads(val)
379+
except (json.JSONDecodeError, ValueError):
380+
return val
381+
return val
382+
383+
# Top-level
384+
if "theme" in upd:
385+
settings.theme = Theme(str(upd["theme"]))
386+
if "timezone" in upd:
387+
settings.timezone = str(upd["timezone"])
388+
if "date_format" in upd:
389+
settings.date_format = str(upd["date_format"])
390+
if "time_format" in upd:
391+
settings.time_format = str(upd["time_format"])
392+
# Nested (may be JSON strings or dicts)
393+
if "notifications" in upd:
394+
n = parse_value(upd["notifications"])
395+
if isinstance(n, dict):
396+
channels: list[NotificationChannel] = [NotificationChannel(c) for c in n.get("channels", [])]
397+
settings.notifications = DomainNotificationSettings(
398+
execution_completed=n.get("execution_completed", settings.notifications.execution_completed),
399+
execution_failed=n.get("execution_failed", settings.notifications.execution_failed),
400+
system_updates=n.get("system_updates", settings.notifications.system_updates),
401+
security_alerts=n.get("security_alerts", settings.notifications.security_alerts),
402+
channels=channels or settings.notifications.channels,
403+
)
404+
if "editor" in upd:
405+
e = parse_value(upd["editor"])
406+
if isinstance(e, dict):
407+
settings.editor = DomainEditorSettings(
408+
theme=e.get("theme", settings.editor.theme),
409+
font_size=e.get("font_size", settings.editor.font_size),
410+
tab_size=e.get("tab_size", settings.editor.tab_size),
411+
use_tabs=e.get("use_tabs", settings.editor.use_tabs),
412+
word_wrap=e.get("word_wrap", settings.editor.word_wrap),
413+
show_line_numbers=e.get("show_line_numbers", settings.editor.show_line_numbers),
414+
)
415+
if "custom_settings" in upd:
416+
cs = parse_value(upd["custom_settings"])
417+
if isinstance(cs, dict):
418+
settings.custom_settings = cs
378419
settings.version = event.payload.get("version", settings.version)
379420
settings.updated_at = event.timestamp
380421
return settings

backend/tests/integration/test_execution_routes.py renamed to backend/tests/integration/k8s/test_execution_routes.py

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,7 @@
1414
)
1515

1616

17-
def has_k8s_workers() -> bool:
18-
"""Check if K8s workers are available for execution."""
19-
# Check if K8s worker container is running
20-
import subprocess
21-
try:
22-
result = subprocess.run(
23-
["docker", "ps", "--filter", "name=k8s-worker", "--format", "{{.Names}}"],
24-
capture_output=True,
25-
text=True,
26-
timeout=2
27-
)
28-
return "k8s-worker" in result.stdout
29-
except Exception:
30-
return False
31-
32-
33-
@pytest.mark.integration
17+
@pytest.mark.k8s
3418
class TestExecution:
3519
"""Test execution endpoints against real backend."""
3620

@@ -52,7 +36,6 @@ async def test_execute_requires_authentication(self, client: AsyncClient) -> Non
5236
for word in ["not authenticated", "unauthorized", "login"])
5337

5438
@pytest.mark.asyncio
55-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
5639
async def test_execute_simple_python_script(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
5740
"""Test executing a simple Python script."""
5841
# Login first
@@ -96,7 +79,6 @@ async def test_execute_simple_python_script(self, client: AsyncClient, test_user
9679
]
9780

9881
@pytest.mark.asyncio
99-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
10082
async def test_get_execution_result(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
10183
"""Test getting execution result after completion using SSE (event-driven)."""
10284
# Login first
@@ -137,7 +119,6 @@ async def test_get_execution_result(self, client: AsyncClient, test_user: Dict[s
137119
assert "Line 2" in execution_result.stdout
138120

139121
@pytest.mark.asyncio
140-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
141122
async def test_execute_with_error(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
142123
"""Test executing a script that produces an error."""
143124
# Login first
@@ -163,7 +144,6 @@ async def test_execute_with_error(self, client: AsyncClient, test_user: Dict[str
163144
# No waiting - execution was accepted, error will be processed asynchronously
164145

165146
@pytest.mark.asyncio
166-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
167147
async def test_execute_with_resource_tracking(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
168148
"""Test that execution tracks resource usage."""
169149
# Login first
@@ -205,7 +185,6 @@ async def test_execute_with_resource_tracking(self, client: AsyncClient, test_us
205185
assert resource_usage.peak_memory_kb >= 0
206186

207187
@pytest.mark.asyncio
208-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
209188
async def test_execute_with_different_language_versions(self, client: AsyncClient,
210189
test_user: Dict[str, str]) -> None:
211190
"""Test execution with different Python versions."""
@@ -240,7 +219,6 @@ async def test_execute_with_different_language_versions(self, client: AsyncClien
240219
assert "execution_id" in data
241220

242221
@pytest.mark.asyncio
243-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
244222
async def test_execute_with_large_output(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
245223
"""Test execution with large output."""
246224
# Login first
@@ -279,7 +257,6 @@ async def test_execute_with_large_output(self, client: AsyncClient, test_user: D
279257
assert "End of output" in result_data["stdout"] or len(result_data["stdout"]) > 10000
280258

281259
@pytest.mark.asyncio
282-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
283260
async def test_cancel_running_execution(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
284261
"""Test cancelling a running execution."""
285262
# Login first
@@ -326,7 +303,6 @@ async def test_cancel_running_execution(self, client: AsyncClient, test_user: Di
326303
# Cancel response of 200 means cancellation was accepted
327304

328305
@pytest.mark.asyncio
329-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
330306
async def test_execution_with_timeout(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
331307
"""Bounded check: long-running executions don't finish immediately.
332308
@@ -364,7 +340,6 @@ async def test_execution_with_timeout(self, client: AsyncClient, test_user: Dict
364340
# No need to wait or observe states
365341

366342
@pytest.mark.asyncio
367-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
368343
async def test_sandbox_restrictions(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
369344
"""Test that dangerous operations are blocked by sandbox."""
370345
# Login first
@@ -421,7 +396,6 @@ async def test_sandbox_restrictions(self, client: AsyncClient, test_user: Dict[s
421396
assert exec_response.status_code in [400, 422]
422397

423398
@pytest.mark.asyncio
424-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
425399
async def test_concurrent_executions_by_same_user(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
426400
"""Test running multiple executions concurrently."""
427401
# Login first
@@ -489,7 +463,6 @@ async def test_get_k8s_resource_limits(self, client: AsyncClient) -> None:
489463
assert key in limits
490464

491465
@pytest.mark.asyncio
492-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
493466
async def test_get_user_executions_list(self, client: AsyncClient, test_user: Dict[str, str]) -> None:
494467
"""User executions list returns paginated executions for current user."""
495468
# Login first
@@ -504,7 +477,6 @@ async def test_get_user_executions_list(self, client: AsyncClient, test_user: Di
504477
assert set(["executions", "total", "limit", "skip", "has_more"]).issubset(payload.keys())
505478

506479
@pytest.mark.asyncio
507-
@pytest.mark.skipif(not has_k8s_workers(), reason="K8s workers not available")
508480
async def test_execution_idempotency_same_key_returns_same_execution(self, client: AsyncClient,
509481
test_user: Dict[str, str]) -> None:
510482
"""Submitting the same request with the same Idempotency-Key yields the same execution_id."""

0 commit comments

Comments
 (0)