|
1 | 1 | from fastapi import Request |
2 | 2 | from consts.model import ExportAndImportAgentInfo, ExportAndImportDataFormat, MCPInfo, AgentRequest |
3 | 3 | import sys |
| 4 | +import asyncio |
| 5 | +import json |
4 | 6 | from unittest.mock import patch, MagicMock, mock_open, call, Mock, AsyncMock |
5 | 7 |
|
6 | 8 | import pytest |
@@ -2355,6 +2357,306 @@ def fake_unregister(conv_id): |
2355 | 2357 |
|
2356 | 2358 |
|
2357 | 2359 | @pytest.mark.asyncio |
| 2360 | +async def test__stream_agent_chunks_emits_error_chunk_on_run_failure(monkeypatch): |
| 2361 | + """When agent_run raises, an error SSE chunk should be emitted and run unregistered.""" |
| 2362 | + agent_request = AgentRequest( |
| 2363 | + agent_id=1, |
| 2364 | + conversation_id=1001, |
| 2365 | + query="trigger error", |
| 2366 | + history=[], |
| 2367 | + minio_files=[], |
| 2368 | + is_debug=True, # avoid persisting messages to focus on error path |
| 2369 | + ) |
| 2370 | + |
| 2371 | + async def failing_agent_run(*_, **__): |
| 2372 | + raise Exception("oops") |
| 2373 | + |
| 2374 | + monkeypatch.setattr( |
| 2375 | + "backend.services.agent_service.agent_run", failing_agent_run, raising=False |
| 2376 | + ) |
| 2377 | + |
| 2378 | + called = {"unregistered": None} |
| 2379 | + |
| 2380 | + def fake_unregister(conv_id): |
| 2381 | + called["unregistered"] = conv_id |
| 2382 | + |
| 2383 | + monkeypatch.setattr( |
| 2384 | + "backend.services.agent_service.agent_run_manager.unregister_agent_run", |
| 2385 | + fake_unregister, |
| 2386 | + raising=False, |
| 2387 | + ) |
| 2388 | + |
| 2389 | + # Collect streamed chunks |
| 2390 | + collected = [] |
| 2391 | + async for out in agent_service._stream_agent_chunks( |
| 2392 | + agent_request, "u", "t", MagicMock(), MagicMock() |
| 2393 | + ): |
| 2394 | + collected.append(out) |
| 2395 | + |
| 2396 | + # Expect a single error payload chunk and unregister called |
| 2397 | + assert collected and collected[0].startswith( |
| 2398 | + "data: {") and "\"type\": \"error\"" in collected[0] |
| 2399 | + assert called["unregistered"] == 1001 |
| 2400 | + |
| 2401 | + |
| 2402 | +@pytest.mark.asyncio |
| 2403 | +async def test__stream_agent_chunks_captures_final_answer_and_adds_memory(monkeypatch): |
| 2404 | + """Final answer should be captured and appended to memory via add_memory_in_levels.""" |
| 2405 | + agent_request = AgentRequest( |
| 2406 | + agent_id=3, |
| 2407 | + conversation_id=3003, |
| 2408 | + query="hello", |
| 2409 | + history=[], |
| 2410 | + minio_files=[], |
| 2411 | + is_debug=False, |
| 2412 | + ) |
| 2413 | + |
| 2414 | + async def yield_final_answer(*_, **__): |
| 2415 | + yield json.dumps({"type": "token", "content": "hi"}, ensure_ascii=False) |
| 2416 | + yield json.dumps({"type": "final_answer", "content": "bye"}, ensure_ascii=False) |
| 2417 | + |
| 2418 | + monkeypatch.setattr( |
| 2419 | + "backend.services.agent_service.agent_run", yield_final_answer, raising=False |
| 2420 | + ) |
| 2421 | + |
| 2422 | + add_calls = {"args": None, "called": False} |
| 2423 | + |
| 2424 | + async def fake_add_memory_in_levels(**kwargs): |
| 2425 | + add_calls["args"] = kwargs |
| 2426 | + add_calls["called"] = True |
| 2427 | + return {"results": [{"ok": True}]} |
| 2428 | + |
| 2429 | + monkeypatch.setattr( |
| 2430 | + "backend.services.agent_service.add_memory_in_levels", |
| 2431 | + fake_add_memory_in_levels, |
| 2432 | + raising=False, |
| 2433 | + ) |
| 2434 | + |
| 2435 | + # Memory context with switch ON |
| 2436 | + memory_ctx = MagicMock() |
| 2437 | + memory_ctx.user_config = MagicMock( |
| 2438 | + memory_switch=True, |
| 2439 | + agent_share_option="always", |
| 2440 | + disable_agent_ids=[], |
| 2441 | + disable_user_agent_ids=[], |
| 2442 | + ) |
| 2443 | + memory_ctx.memory_config = {"cfg": 1} |
| 2444 | + memory_ctx.tenant_id = "t" |
| 2445 | + memory_ctx.user_id = "u" |
| 2446 | + memory_ctx.agent_id = 3 |
| 2447 | + |
| 2448 | + # Capture and await scheduled background task |
| 2449 | + task_holder = {"task": None} |
| 2450 | + orig_create_task = asyncio.create_task |
| 2451 | + |
| 2452 | + def capture_task(coro): |
| 2453 | + t = orig_create_task(coro) |
| 2454 | + task_holder["task"] = t |
| 2455 | + return t |
| 2456 | + |
| 2457 | + monkeypatch.setattr(asyncio, "create_task", capture_task) |
| 2458 | + |
| 2459 | + # Run stream |
| 2460 | + collected = [] |
| 2461 | + async for out in agent_service._stream_agent_chunks( |
| 2462 | + agent_request, "u", "t", MagicMock(query="hello"), memory_ctx |
| 2463 | + ): |
| 2464 | + collected.append(out) |
| 2465 | + |
| 2466 | + # Ensure background task completed |
| 2467 | + if task_holder["task"] is not None: |
| 2468 | + await task_holder["task"] |
| 2469 | + |
| 2470 | + assert add_calls["called"] is True |
| 2471 | + assert add_calls["args"]["messages"] == [ |
| 2472 | + {"role": "user", "content": "hello"}, |
| 2473 | + {"role": "assistant", "content": "bye"}, |
| 2474 | + ] |
| 2475 | + assert set(add_calls["args"]["memory_levels"]) == {"agent", "user_agent"} |
| 2476 | + assert add_calls["args"]["memory_config"] == {"cfg": 1} |
| 2477 | + assert add_calls["args"]["tenant_id"] == "t" |
| 2478 | + assert add_calls["args"]["user_id"] == "u" |
| 2479 | + assert add_calls["args"]["agent_id"] == 3 |
| 2480 | + |
| 2481 | + |
| 2482 | +@pytest.mark.asyncio |
| 2483 | +async def test__stream_agent_chunks_skips_memory_when_switch_off(monkeypatch): |
| 2484 | + """When memory switch is off, background memory addition exits early.""" |
| 2485 | + agent_request = AgentRequest( |
| 2486 | + agent_id=4, |
| 2487 | + conversation_id=4004, |
| 2488 | + query="q", |
| 2489 | + history=[], |
| 2490 | + minio_files=[], |
| 2491 | + is_debug=False, |
| 2492 | + ) |
| 2493 | + |
| 2494 | + async def yield_one(*_, **__): |
| 2495 | + yield json.dumps({"type": "final_answer", "content": "ans"}, ensure_ascii=False) |
| 2496 | + |
| 2497 | + monkeypatch.setattr( |
| 2498 | + "backend.services.agent_service.agent_run", yield_one, raising=False |
| 2499 | + ) |
| 2500 | + |
| 2501 | + called = {"count": 0} |
| 2502 | + |
| 2503 | + async def track_add(**kwargs): |
| 2504 | + called["count"] += 1 |
| 2505 | + return {"results": []} |
| 2506 | + |
| 2507 | + monkeypatch.setattr( |
| 2508 | + "backend.services.agent_service.add_memory_in_levels", track_add, raising=False |
| 2509 | + ) |
| 2510 | + |
| 2511 | + memory_ctx = MagicMock() |
| 2512 | + memory_ctx.user_config = MagicMock(memory_switch=False) |
| 2513 | + |
| 2514 | + async for _ in agent_service._stream_agent_chunks( |
| 2515 | + agent_request, "u", "t", MagicMock(query="q"), memory_ctx |
| 2516 | + ): |
| 2517 | + pass |
| 2518 | + |
| 2519 | + await asyncio.sleep(0) |
| 2520 | + assert called["count"] == 0 |
| 2521 | + |
| 2522 | + |
| 2523 | +@pytest.mark.asyncio |
| 2524 | +async def test__stream_agent_chunks_background_add_exception(monkeypatch): |
| 2525 | + """Exceptions in background memory addition should be caught and not crash the stream.""" |
| 2526 | + agent_request = AgentRequest( |
| 2527 | + agent_id=5, |
| 2528 | + conversation_id=5005, |
| 2529 | + query="q", |
| 2530 | + history=[], |
| 2531 | + minio_files=[], |
| 2532 | + is_debug=False, |
| 2533 | + ) |
| 2534 | + |
| 2535 | + async def yield_final(*_, **__): |
| 2536 | + yield json.dumps({"type": "final_answer", "content": "A"}, ensure_ascii=False) |
| 2537 | + |
| 2538 | + monkeypatch.setattr( |
| 2539 | + "backend.services.agent_service.agent_run", yield_final, raising=False |
| 2540 | + ) |
| 2541 | + |
| 2542 | + async def raise_in_add(**kwargs): |
| 2543 | + raise RuntimeError("mem add fail") |
| 2544 | + |
| 2545 | + monkeypatch.setattr( |
| 2546 | + "backend.services.agent_service.add_memory_in_levels", raise_in_add, raising=False |
| 2547 | + ) |
| 2548 | + |
| 2549 | + memory_ctx = MagicMock() |
| 2550 | + memory_ctx.user_config = MagicMock( |
| 2551 | + memory_switch=True, |
| 2552 | + agent_share_option="always", |
| 2553 | + disable_agent_ids=[], |
| 2554 | + disable_user_agent_ids=[], |
| 2555 | + ) |
| 2556 | + |
| 2557 | + # Capture and await scheduled background task |
| 2558 | + task_holder = {"task": None} |
| 2559 | + orig_create_task = asyncio.create_task |
| 2560 | + |
| 2561 | + def capture_task(coro): |
| 2562 | + t = orig_create_task(coro) |
| 2563 | + task_holder["task"] = t |
| 2564 | + return t |
| 2565 | + |
| 2566 | + monkeypatch.setattr(asyncio, "create_task", capture_task) |
| 2567 | + |
| 2568 | + async for _ in agent_service._stream_agent_chunks( |
| 2569 | + agent_request, "u", "t", MagicMock(query="q"), memory_ctx |
| 2570 | + ): |
| 2571 | + pass |
| 2572 | + |
| 2573 | + # Let background exception be handled by awaiting the task |
| 2574 | + if task_holder["task"] is not None: |
| 2575 | + await task_holder["task"] |
| 2576 | + |
| 2577 | + |
| 2578 | +@pytest.mark.asyncio |
| 2579 | +async def test__stream_agent_chunks_schedule_task_failure(monkeypatch): |
| 2580 | + """Scheduling background task failure should be caught and logged.""" |
| 2581 | + agent_request = AgentRequest( |
| 2582 | + agent_id=6, |
| 2583 | + conversation_id=6006, |
| 2584 | + query="q", |
| 2585 | + history=[], |
| 2586 | + minio_files=[], |
| 2587 | + is_debug=False, |
| 2588 | + ) |
| 2589 | + |
| 2590 | + async def yield_final(*_, **__): |
| 2591 | + yield json.dumps({"type": "final_answer", "content": "A"}, ensure_ascii=False) |
| 2592 | + |
| 2593 | + monkeypatch.setattr( |
| 2594 | + "backend.services.agent_service.agent_run", yield_final, raising=False |
| 2595 | + ) |
| 2596 | + |
| 2597 | + # Force asyncio.create_task to fail |
| 2598 | + def fail_create_task(*_, **__): |
| 2599 | + raise RuntimeError("schedule fail") |
| 2600 | + |
| 2601 | + monkeypatch.setattr("asyncio.create_task", fail_create_task) |
| 2602 | + |
| 2603 | + memory_ctx = MagicMock() |
| 2604 | + memory_ctx.user_config = MagicMock( |
| 2605 | + memory_switch=True, |
| 2606 | + agent_share_option="always", |
| 2607 | + disable_agent_ids=[], |
| 2608 | + disable_user_agent_ids=[], |
| 2609 | + ) |
| 2610 | + |
| 2611 | + collected = [] |
| 2612 | + async for out in agent_service._stream_agent_chunks( |
| 2613 | + agent_request, "u", "t", MagicMock(query="q"), memory_ctx |
| 2614 | + ): |
| 2615 | + collected.append(out) |
| 2616 | + |
| 2617 | + assert collected # Stream still produced data without crashing |
| 2618 | + |
| 2619 | + |
| 2620 | +def test_insert_related_agent_impl_failure_returns_400(): |
| 2621 | + """When insertion fails, should return 400 JSONResponse.""" |
| 2622 | + with patch( |
| 2623 | + "backend.services.agent_service.query_sub_agents_id_list", return_value=[] |
| 2624 | + ) as _, patch( |
| 2625 | + "backend.services.agent_service.insert_related_agent", return_value=False |
| 2626 | + ) as __: |
| 2627 | + resp = insert_related_agent_impl( |
| 2628 | + parent_agent_id=1, child_agent_id=2, tenant_id="t") |
| 2629 | + assert resp.status_code == 400 |
| 2630 | + |
| 2631 | + |
| 2632 | +@pytest.mark.asyncio |
| 2633 | +async def test_generate_stream_with_memory_unexpected_exception_emits_error(monkeypatch): |
| 2634 | + """Generic exceptions should emit an error SSE chunk and stop.""" |
| 2635 | + agent_request = AgentRequest( |
| 2636 | + agent_id=9, |
| 2637 | + conversation_id=9009, |
| 2638 | + query="q", |
| 2639 | + history=[], |
| 2640 | + minio_files=[], |
| 2641 | + is_debug=False, |
| 2642 | + ) |
| 2643 | + |
| 2644 | + # Cause an unexpected error inside the try block |
| 2645 | + monkeypatch.setattr( |
| 2646 | + "backend.services.agent_service.build_memory_context", |
| 2647 | + MagicMock(side_effect=Exception("unexpected")), |
| 2648 | + raising=False, |
| 2649 | + ) |
| 2650 | + |
| 2651 | + out = [] |
| 2652 | + async for d in agent_service.generate_stream_with_memory( |
| 2653 | + agent_request, user_id="u", tenant_id="t" |
| 2654 | + ): |
| 2655 | + out.append(d) |
| 2656 | + |
| 2657 | + assert out and out[0].startswith( |
| 2658 | + "data: {") and "\"type\": \"error\"" in out[0] |
| 2659 | + |
2358 | 2660 | async def test_generate_stream_no_memory_registers_and_streams(monkeypatch): |
2359 | 2661 | """generate_stream_no_memory should prepare run info, register it and stream data without memory tokens.""" |
2360 | 2662 | # Prepare AgentRequest & Request |
|
0 commit comments