Skip to content

AI 서비스 기초 코드 & 에이전트 워크플로 구축#21

Merged
VitoJeong merged 24 commits intoon-seoul-agentfrom
AGENT-3-retrieval
Apr 25, 2026
Merged

AI 서비스 기초 코드 & 에이전트 워크플로 구축#21
VitoJeong merged 24 commits intoon-seoul-agentfrom
AGENT-3-retrieval

Conversation

@VitoJeong
Copy link
Copy Markdown
Collaborator

개요

AI 서비스의 핵심인 멀티에이전트 오케스트레이션 파이프라인과 semantic 검색 체계를 구축했습니다.

구현 항목

작업내용 상세 문서는 on-seoul-agent/docs/ai-service-implementation.md를 참고해주세요.

다중 DB 연동 및 임베딩 인프라 (Phase 4, 7)

  • DB 계정 분리: on_ai_app(CRUD) / on_data_reader(SELECT 전용) 이중 엔진 구성 및 FastAPI DI 주입
  • 권한 격리 검증: on_data_reader의 INSERT/UPDATE 시도 시 권한 오류 발생 통합 테스트
  • 임베딩 배치 적재: embed_metadata.py--all / --limit / --incremental 모드 지원, 신규 service_id만 재임베딩하는 증분 업데이트

LangChain Agent 구현 (Phase 5)

  • RouterAgent: LCEL 체인으로 사용자 의도를 SQL_SEARCH / VECTOR_SEARCH / MAP / FALLBACK 4종으로 분류
  • SqlAgent: LLM이 추출한 파라미터(카테고리·지역·키워드·상태)를 바인드 파라미터 SQL로 변환 — LLM 생성 SQL 직접 실행 없음
  • VectorAgent: 질의 정제 → 임베딩 → tools/vector_search.py 위임
  • AnswerAgent: 검색 결과 통합 → 자연어 답변 생성, 첫 메시지 시 대화 제목 생성(title_needed 플래그)

워크플로우 및 Trace (Phase 6)

  • AgentWorkflow: Router → Branch(SQL/Vector/MAP/FALLBACK) → Answer 순서 실행, 세션 주입 방식으로 테스트 용이성 확보
  • DB 세션 라우팅: SqlAgent → on_data(읽기 전용), VectorAgent → on_ai(service_embeddings)
  • chat_agent_traces 적재: 실행 완료 후 intent·node_path·elapsed_ms를 JSONB로 저장 (best-effort, 실패 시 응답 영향 없음)
  • 오류 복원: Agent 예외 발생 시 fallback 답변 반환, state.error 기록

검색 전략 및 파라미터 튜닝 (Phase 8, 9)

  • tools/vector_search.py: pre-filter(카테고리·지역·상태를 metadata JSONB WHERE 절 적용) + pgvector 코사인 유사도 검색 독립 함수
  • HNSW 인덱스: m=16, ef_construction=64, ef_search=40 DDL 활성화 (1000건 기준)
  • 전략 결정: 하이브리드 FTS·재순위화 미채택 — 소규모 데이터에서 순수 벡터 검색 품질 충분

검색 품질 평가 인프라 (Phase 10)

  • eval_search.py: 카테고리·지역·키워드·의미별 샘플 쿼리 20건, --query / --top-k 옵션 지원

changha added 23 commits April 20, 2026 21:59
- RouterAgent: 사용자 의도를 SQL_SEARCH/VECTOR_SEARCH/MAP/FALLBACK으로 분류
- SqlAgent: _SqlParams 추출 → 동적 WHERE 절 SQL 생성 (카테고리/지역/키워드/상태 필터)
- VectorAgent: 질의 정제 → 임베딩 → pgvector cosine 유사도 검색 (threshold 0.4)
- AnswerAgent: 검색 결과 통합 → 자연어 답변 생성 + title_needed 시 대화 제목 생성
- 공통: AgentState spread 패턴({**state, key: value}) 으로 불변 상태 업데이트
- RouterAgent: 4종 intent 분류, state 보존, message 전달 검증 (6건)
- SqlAgent: sql_results 적재, 동적 필터 빌드, state 보존 (7건)
- VectorAgent: vector_results 적재, 정제 질의, 임베딩 호출, query_vector 전달 (6건)
- AnswerAgent: 답변/제목 생성, fallback URL, 결과 병합, state 보존 (8건)
- 전체 테스트 58개 통과
- AgentWorkflow: Router → Branch → Answer 순서로 실행
- 의도별 분기: SQL/Vector/MAP(stub)/FALLBACK
- chat_agent_traces 적재: intent, node_path, elapsed_ms, error (best-effort)
- core/database.py: ai_session_ctx / data_session_ctx context manager 추가
- 워크플로우 통합 테스트 13건 추가 (전체 67개 통과)
[Critical]
- workflow: VectorAgent를 data_session 대신 ai_session으로 라우팅
  (service_embeddings는 on_ai DB에 존재)
- workflow._dispatch 시그니처에 ai_session 파라미터 추가

[Important]
- workflow: 오류 발생 시 fallback 답변 채움 (answer=None 반환 방지)
- vector_agent: _SCORE_THRESHOLD(거리) → _MIN_SIMILARITY(유사도=0.6)로 통일,
  내부 변환(1-x) 제거로 가독성 개선
- sql_agent: LIMIT {_TOP_K} f-string → :top_k bind 파라미터로 분리

[Docs/Nit]
- workflow docstring: 세션 용도 오기 수정
- workflow logger.exception: 중복 exc 인자 제거
- test_workflow: VectorAgent 세션 라우팅 검증 테스트 추가

[QA 추가 테스트]
- 미사용 import ruff auto-fix (8건)
- SqlAgent: keyword=None ILIKE 미추가, SQL Injection 방어 검증
- VectorAgent: 빈 결과 시 [] 반환, threshold/top_k 파라미터 검증
- AnswerAgent: 결과 모두 None 시 빈 목록 처리, metadata 추출 경로
- Workflow: trace commit 호출 검증

전체 78 passed
- pre-filter 전략 채택: max_class_name/area_name/service_status를 WHERE 절 적용
  (metadata JSONB 경로, 1000건 미만에서 카테고리 내 유사도 비교로 정확도 향상)
- 하이브리드 검색(tsvector) 미채택: 소규모 데이터 충분, 5000건 이상 시 도입
- 재순위화/MMR 미채택: min_similarity=0.6 기준 중복 발생 빈도 낮음
- VectorAgent._similarity_search → tools.vector_search.vector_search 위임
- CAST(:query_vector AS vector) asyncpg 호환 유지
- 테스트 11건 추가 (pre-filter bind 검증, 파라미터 오버라이드 등)
[embed_metadata.py]
- --incremental 플래그: service_embeddings에 없는 service_id만 임베딩 (신규 데이터 전용)
  two-phase: on_ai에서 기존 ID set 조회 → Python 필터 (크로스 DB JOIN 불가)
- 로그: 기존 N건 제외, M건 신규 임베딩

[ddl_chat_entities.sql]
- HNSW 인덱스 활성화: m=16, ef_construction=64 (1000건 기본값)
- ef_search=40 주석 안내
- 10000건 이상 m=32, ef_construction=128 재검토 권고 주석 추가
- 내장 샘플 쿼리셋 20건 (카테고리·지역·키워드·의미 다양하게 구성)
- --query: 단일 질의 실행, --top-k: 결과 수 조정 (기본 5)
- tools.vector_search.vector_search 사용, tabulate 의존성 없음
- 사용: uv run python scripts/eval_search.py [--query '...'] [--top-k N]
[MUST-FIX]
- embed_metadata: LIMIT f-string 삽입 → :limit bind 파라미터로 교체
- eval_search: _run_query 타입 힌트 추가 (embeddings: Embeddings, session: AsyncSession)

[SHOULD-FIX]
- vector_search: _ALLOWED_PREFILTER_CLAUSES 화이트리스트 상수 선언
  pre-filter 조건 문자열을 상수에서만 조립, 값은 bind 파라미터 전달 명시
- eval_search: 환경변수 미설정 시 친절한 오류 메시지 출력 (try/except + sys.exit)
- test_vector_search: 중복 pytestmark = pytest.mark.asyncio 제거

[NIT]
- vector_agent: pre-filter 미전달에 대한 TODO(Phase 15) 주석 추가
- ddl_chat_entities: ef_search 세션/영속 적용 방법 주석 보완
asyncio.gather로 배치 요청을 동시 발사하면 aiolimiter 토큰 버킷이
순식간에 소진되어 Gemini Embedding API 429가 발생한다.
aembed_query를 순차 호출로 변경해 rate limiter가 정확히 동작하도록 수정.
문제 1 (버스트): AsyncLimiter(max_rate=70) 는 버킷이 70토큰으로 가득 찬 채 시작해
순차 처리 이후에도 API 응답이 빠르면 수십 개 요청이 수초 내 발사됨 → RPM/TPM 초과.
max_rate=1, time_period=60/rpm 으로 버킷 크기를 1로 고정해 요청 간격을 강제.

문제 2 (재시도 없음): RPM 외 TPM 초과도 429를 유발하므로 limiter가 정상이어도
일시 스파이크에서 429가 발생할 수 있음.
aembed_query에 지수 백오프 재시도(최대 5회, 10s·20s·40s·80s·160s) 추가.

설계: limiter를 _GeminiEmbeddings 생성자로 주입하도록 변경해
프로덕션은 모듈 수준 limiter, 테스트는 _FAST_LIMITER 사용.
gemini_embed_rpm 기본값을 70 → 60(무료 티어 안전값)으로 조정.
@VitoJeong VitoJeong requested a review from f-lab-ted April 23, 2026 05:40
Copy link
Copy Markdown

@f-lab-ted f-lab-ted left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ai-service-implementation.md Phase 4~10에 명시된 멀티에이전트 오케스트레이션 파이프라인과 semantic 검색 체계를 리뷰했습니다. Router → Branch(SQL/Vector/Map/Fallback) → Answer 워크플로우 구조, 이중 DB 세션 라우팅(on_ai CRUD / on_data SELECT 전용), 임베딩 배치 적재 스크립트, pgvector 검색 도구, 그리고 13건의 워크플로우 통합 테스트를 중점적으로 검토했습니다. 특히 PR 가이드라인에 따라 SOLID 설계 원칙 준수 여부, 테스트 적절성, LLM 호출 안정성에 초점을 맞추어 리뷰했습니다.

잘한 점

SqlAgent에서 LLM이 SQL을 직접 생성하지 않고 파라미터만 추출하여 바인드 파라미터로 주입하는 설계는 SQL Injection 방지 측면에서 훌륭합니다. AgentState TypedDict 기반의 불변 스프레드 패턴({**state, key: value})과 생성자 주입 방식으로 테스트 용이성을 확보한 점, 그리고 vector_search.py의 화이트리스트 기반 pre-filter 조건 조립도 보안과 확장성 모두 잘 고려되어 있습니다.

보완할 점

가장 우려되는 부분은 LLM Chat 호출의 안정성입니다. 임베딩 API에는 rate limit + 429 백오프가 잘 구현되어 있지만, 핵심 Chat LLM 호출(Router/SQL/Vector/Answer 4개 에이전트)에는 타임아웃이나 재시도가 전혀 없어 프로덕션에서 LLM 응답 지연 시 요청이 무한 대기할 수 있습니다. _save_trace의 세션 롤백 누락과 429 판별의 문자열 매칭 방식도 안정성 관점에서 개선이 필요합니다.

결론

전체적으로 설계 문서대로 잘 구현되어 있고 테스트도 충실합니다. 다만 LLM Chat 호출에 타임아웃/재시도 메커니즘을 추가하는 것은 프로덕션 안정성을 위해 반드시 수정 바랍니다. 나머지 코멘트는 검토 후 반영 여부 결정해주세요.


try:
# ── 1. Router: 의도 분류 ──────────────────────────────────────
state = await self._router.classify(state)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical — LLM Chat 호출 타임아웃/재시도/Rate Limit 대책 부재]

임베딩 API에는 AsyncLimiter 기반 RPM 제한 + 429 지수 백오프(최대 5회, 10s→160s)가 잘 구현되어 있지만, 핵심 워크플로우의 4개 에이전트(RouterAgent.classify, SqlAgent.search, VectorAgent.search, AnswerAgent.answer)가 내부에서 호출하는 self._chain.ainvoke()에는 타임아웃, 재시도, Rate Limit 제어가 모두 없습니다.

  1. 타임아웃 부재: LLM 프로바이더가 일시적으로 느려지거나 응답하지 않으면 워크플로우 전체가 무한 대기합니다. FastAPI 워커 스레드를 점유하게 되어 서비스 전체에 영향을 줄 수 있습니다.
  2. Rate Limit 미처리: llm/client.py에서 ChatOpenAI / ChatGoogleGenerativeAI 생성 시 max_retries나 rate limit 관련 설정이 없습니다. 임베딩 쪽에는 AsyncLimiter와 429 재시도가 있는 것과 대조적으로, Chat LLM 호출은 프로바이더의 rate limit에 무방비 상태입니다. 동시 요청이 몰리면 429 응답을 받고 그대로 예외가 전파됩니다.

제안:

# 방법 1: asyncio.wait_for로 타임아웃 + LangChain with_retry()로 재시도
import asyncio

self._chain = (
    prompt
    | llm.with_structured_output(_IntentOutput)
    .with_retry(stop_after_attempt=3, wait_exponential_jitter=True)
)

async def classify(self, state: AgentState) -> AgentState:
    result = await asyncio.wait_for(
        self._chain.ainvoke({"message": state["message"]}),
        timeout=30.0,  # 30초
    )
    return {**state, "intent": result.intent}

# 방법 2: LLM 클라이언트 레벨에서 max_retries 설정
ChatOpenAI(model=..., max_retries=3, request_timeout=30)

# 방법 3: 임베딩과 동일하게 AsyncLimiter 적용
from aiolimiter import AsyncLimiter
_chat_limiter = AsyncLimiter(max_rate=10, time_period=60)

async def classify(self, state: AgentState) -> AgentState:
    async with _chat_limiter:
        result = await asyncio.wait_for(
            self._chain.ainvoke({"message": state["message"]}),
            timeout=30.0,
        )
    return {**state, "intent": result.intent}

임베딩 호출에 적용된 것과 동일한 수준의 방어(AsyncLimiter + 지수 백오프 재시도)를 Chat LLM 호출에도 적용하는 것을 권장합니다. 최소한 asyncio.wait_for 타임아웃과 with_retry()만이라도 적용하면 무한 대기 및 429 폭주를 방지할 수 있습니다.


저장 실패 시 로그만 남기고 워크플로우 결과에 영향을 주지 않는다.
"""
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Major — _save_trace 세션 롤백 누락]

_save_tracefinally 블록에서 호출되므로, 메인 워크플로우에서 ai_session을 사용하던 중(VectorAgent) 예외가 발생하면 세션의 트랜잭션이 failed 상태일 수 있습니다. 이 상태에서 rollback 없이 바로 INSERT를 시도하면 InFailedSqlTransaction 오류가 발생하고, except에서 잡히긴 하지만 trace가 유실됩니다.

또한 execute 성공 후 commit 실패 시에도 rollback이 없어 세션이 dirty 상태로 남습니다.

async def _save_trace(session, message_id, trace):
    try:
        await session.rollback()  # 이전 트랜잭션 상태 정리
        trace_json = json.dumps(trace, ensure_ascii=False, default=str)
        await session.execute(
            text("INSERT INTO chat_agent_traces ..."),
            {"message_id": message_id, "trace": trace_json},
        )
        await session.commit()
    except Exception as exc:
        logger.warning("trace 저장 실패 (message_id=%s): %s", message_id, exc)
        try:
            await session.rollback()
        except Exception:
            pass

best-effort 의도에 맞게 rollback을 선행하면 trace 저장 성공률이 높아집니다.

try:
return await self._aembed_once(text)
except Exception as exc:
is_rate_limit = "429" in str(exc) or "RESOURCE_EXHAUSTED" in str(exc)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Major — 429 판별 문자열 매칭의 취약성]

is_rate_limit = "429" in str(exc) or "RESOURCE_EXHAUSTED" in str(exc)

이 방식은 다음 문제가 있습니다:

  1. 예외 메시지 포맷이 라이브러리 버전에 따라 변경될 수 있음
  2. 우연히 "429"가 포함된 비관련 에러도 재시도 대상이 됨
  3. google.api_core.exceptions.ResourceExhausted 등 구체적인 예외 타입이 있는데 활용하지 않음

제안:

from google.api_core.exceptions import ResourceExhausted
from openai import RateLimitError

is_rate_limit = isinstance(exc, (ResourceExhausted, RateLimitError))
# 혹은 HTTP status code 기반 판별
if hasattr(exc, 'code'):
    is_rate_limit = exc.code == 429

예외 타입 기반 판별이 더 안정적이고, 향후 OpenAI 임베딩 지원 시에도 확장이 용이합니다.

self._base = base
self._limiter = limiter if limiter is not None else _gemini_embed_limiter

def embed_documents(self, texts: list[str]) -> list[list[float]]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor — embed_documents / embed_query 동기 메서드에 rate limit 미적용]

_GeminiEmbeddings의 비동기 메서드(aembed_query, aembed_documents)에는 rate limit + 429 재시도가 적용되어 있지만, 동기 메서드(embed_documents, embed_query)는 self._base에 그대로 위임하여 rate limit이 우회됩니다.

def embed_documents(self, texts: list[str]) -> list[list[float]]:
    return self._base.embed_documents(texts)  # rate limit 없음

def embed_query(self, text: str) -> list[float]:
    return self._base.embed_query(text)  # rate limit 없음

현재 코드에서 동기 메서드를 직접 호출하는 곳이 없다면 당장은 문제가 없지만, 의도치 않은 호출 시 rate limit 없이 API를 때릴 수 있습니다. 동기 메서드에서 raise NotImplementedError("Use async methods")를 던지거나, Embeddings 인터페이스 구현 의무상 필요하다면 docstring에 rate limit 미적용을 명시하는 것이 안전합니다.

)

logger.info("완료: %d건 적재", len(rows))
await on_data_engine.dispose()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor — 예외 시 엔진 dispose 누락]

run() 함수에서 임베딩 처리 중 예외가 발생하면 on_data_engine.dispose()on_ai_engine.dispose()가 호출되지 않아 커넥션 풀이 정리되지 않습니다. 배치 스크립트라서 프로세스 종료 시 OS가 정리하긴 하지만, try/finally로 감싸는 것이 좋습니다.

async def run(limit, incremental=False):
    on_data_engine = create_async_engine(...)
    on_ai_engine = create_async_engine(...)
    try:
        # ... 기존 로직 ...
    finally:
        await on_data_engine.dispose()
        await on_ai_engine.dispose()


class LLMException(OnSeoulAgentException):
"""LLM 벤더(Gemini, OpenAI 등) 호출 관련 예외"""
"""LLM 프로파이더(Gemini, OpenAI 등) 호출 관련 예외"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor — "프로파이더" 오타]

"프로파이더"는 "프로바이더(provider)"의 오타입니다. 이 PR에서 "벤더 → 프로바이더" 용어 통일 작업이 있었는데, exceptions.py, ai-service-implementation.md 등 여러 곳에서 "프로파이더"로 잘못 기재되어 있습니다.

# before
"""LLM 프로파이더(Gemini, OpenAI 등) 호출 관련 예외"""

# after
"""LLM 프로바이더(Gemini, OpenAI 등) 호출 관련 예외"""

@VitoJeong
Copy link
Copy Markdown
Collaborator Author

@f-lab-ted 피드백 주신 내용 반영하여 merge 하겠습니다~!

@VitoJeong VitoJeong merged commit ac20e0e into on-seoul-agent Apr 25, 2026
1 check passed
@VitoJeong VitoJeong deleted the AGENT-3-retrieval branch April 25, 2026 07:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants