Skip to content

Commit 04718b6

Browse files
CopilotCataldir
andauthored
feat: Truth HITL service — human-in-the-loop review queue for AI-proposed attributes (#126)
* Initial plan * feat(#102): implement Truth HITL service Co-authored-by: Cataldir <29005497+Cataldir@users.noreply.github.com> * chore: merge main into branch, resolve conflicts Co-authored-by: Cataldir <29005497+Cataldir@users.noreply.github.com> * fix: align truth-hitl imports with lint policy --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Cataldir <29005497+Cataldir@users.noreply.github.com> Co-authored-by: Ricardo Cataldi <rcataldi@microsoft.com>
1 parent 88b673f commit 04718b6

File tree

15 files changed

+865
-40
lines changed

15 files changed

+865
-40
lines changed

apps/truth-hitl/src/Dockerfile

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Generated for truth-hitl
2+
ARG PYTHON_VERSION=3.13-slim
3+
4+
FROM python:${PYTHON_VERSION} AS base
5+
ENV PYTHONDONTWRITEBYTECODE=1 \
6+
PYTHONUNBUFFERED=1 \
7+
PIP_NO_CACHE_DIR=1 \
8+
APP_NAME=truth-hitl
9+
WORKDIR /app/src
10+
RUN apt-get update && apt-get install -y --no-install-recommends build-essential libpq-dev git curl \
11+
&& rm -rf /var/lib/apt/lists/*
12+
13+
FROM base AS dev
14+
ENV APP_ENV=dev
15+
COPY pyproject.toml ./pyproject.toml
16+
COPY . ./
17+
ENV PYTHONPATH=/app/src
18+
RUN python -m pip install --upgrade pip && \
19+
python -m pip install --no-cache-dir -e .[dev,test,lint]
20+
CMD ["uvicorn", "truth_hitl.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
21+
22+
FROM base AS prod
23+
ENV APP_ENV=prod
24+
COPY pyproject.toml ./pyproject.toml
25+
COPY . ./
26+
ENV PYTHONPATH=/app/src
27+
RUN python -m pip install --upgrade pip && \
28+
python -m pip install --no-cache-dir .
29+
CMD ["uvicorn", "truth_hitl.main:app", "--host", "0.0.0.0", "--port", "8000"]

apps/truth-hitl/src/pyproject.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[project]
2+
name = "truth-hitl"
3+
version = "0.1.0"
4+
description = "Truth Human-in-the-Loop review service"
5+
authors = [{name = "Ricardo Cataldi", email = "rcataldi@microsoft.com"}]
6+
requires-python = ">=3.13"
7+
dependencies = ["fastapi", "fastapi-mcp", "uvicorn", "pydantic>=2", "agent-framework", "azure-ai-projects>=2.0.0b4", "azure-identity", "azure-search-documents", "azure-cosmos", "azure-storage-blob", "azure-eventhub", "redis", "asyncpg", "holiday-peak-lib @ git+https://github.com/Azure-Samples/holiday-peak-hub.git@v1.0.0#subdirectory=lib/src"]
8+
9+
[project.optional-dependencies]
10+
dev = ["faker", "pre-commit", "python-dotenv", "debugpy"]
11+
test = ["pytest", "pytest-cov", "pytest-asyncio", "httpx", "requests"]
12+
lint = ["pylint", "isort", "black[jupyter]>=25.1.0"]
13+
14+
[tool.pytest.ini_options]
15+
addopts = "-ra -q -s --cov=truth_hitl --cov-report=term-missing"
16+
testpaths = ["tests"]
17+
18+
[build-system]
19+
requires = ["setuptools>=68", "wheel"]
20+
build-backend = "setuptools.build_meta"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Truth HITL service package."""
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Adapters for the Truth HITL service."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass, field
6+
7+
from truth_hitl.review_manager import ReviewManager
8+
9+
10+
@dataclass
11+
class HITLAdapters:
12+
"""Container for Truth HITL service adapters."""
13+
14+
review_manager: ReviewManager = field(default_factory=ReviewManager)
15+
16+
17+
def build_hitl_adapters(
18+
*,
19+
review_manager: ReviewManager | None = None,
20+
) -> HITLAdapters:
21+
"""Create adapters for the HITL review workflow."""
22+
return HITLAdapters(
23+
review_manager=review_manager or ReviewManager(),
24+
)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Truth HITL agent implementation and MCP tool registration."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any
6+
7+
from holiday_peak_lib.agents import BaseRetailAgent
8+
from holiday_peak_lib.agents.fastapi_mcp import FastAPIMCPServer
9+
10+
from .adapters import HITLAdapters, build_hitl_adapters
11+
12+
13+
class TruthHITLAgent(BaseRetailAgent):
14+
"""Agent that manages the human-in-the-loop review queue for AI-proposed attributes."""
15+
16+
def __init__(self, config: Any, *args: Any, **kwargs: Any) -> None:
17+
super().__init__(config, *args, **kwargs)
18+
self._adapters = build_hitl_adapters()
19+
20+
@property
21+
def adapters(self) -> HITLAdapters:
22+
return self._adapters
23+
24+
async def handle(self, request: dict[str, Any]) -> dict[str, Any]:
25+
action = request.get("action", "stats")
26+
27+
if action == "stats":
28+
return {"stats": self._adapters.review_manager.stats()}
29+
30+
entity_id = request.get("entity_id")
31+
if action == "list" and entity_id:
32+
items = self._adapters.review_manager.get_by_entity(entity_id)
33+
return {"entity_id": entity_id, "items": [i.model_dump() for i in items]}
34+
35+
return {"error": "unsupported action or missing entity_id", "action": action}
36+
37+
38+
def register_mcp_tools(mcp: FastAPIMCPServer, agent: BaseRetailAgent) -> None:
39+
"""Expose MCP tools for the HITL review workflow."""
40+
adapters = getattr(agent, "adapters", build_hitl_adapters())
41+
42+
async def get_review_queue(payload: dict[str, Any]) -> dict[str, Any]:
43+
entity_id = payload.get("entity_id")
44+
skip = int(payload.get("skip", 0))
45+
limit = int(payload.get("limit", 50))
46+
items = adapters.review_manager.list_pending(entity_id=entity_id, skip=skip, limit=limit)
47+
return {"items": [i.model_dump() for i in items], "count": len(items)}
48+
49+
async def get_review_stats(payload: dict[str, Any]) -> dict[str, Any]: # noqa: ARG001
50+
return {"stats": adapters.review_manager.stats()}
51+
52+
async def get_audit_log(payload: dict[str, Any]) -> dict[str, Any]:
53+
entity_id = payload.get("entity_id")
54+
events = adapters.review_manager.audit_log(entity_id=entity_id)
55+
return {"events": [e.model_dump() for e in events], "count": len(events)}
56+
57+
mcp.add_tool("/hitl/queue", get_review_queue)
58+
mcp.add_tool("/hitl/stats", get_review_stats)
59+
mcp.add_tool("/hitl/audit", get_audit_log)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""Event handlers for the Truth HITL service."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from datetime import datetime, timezone
7+
8+
from holiday_peak_lib.utils.event_hub import EventHandler
9+
from holiday_peak_lib.utils.logging import configure_logging
10+
11+
from .adapters import build_hitl_adapters
12+
from .review_manager import ReviewItem
13+
14+
15+
def build_event_handlers() -> dict[str, EventHandler]:
16+
"""Build event handlers for hitl-jobs Event Hub subscription."""
17+
logger = configure_logging(app_name="truth-hitl-events")
18+
adapters = build_hitl_adapters()
19+
20+
async def handle_hitl_job(_partition_context, event) -> None: # noqa: ANN001
21+
payload = json.loads(event.body_as_str())
22+
data = payload.get("data", {}) if isinstance(payload, dict) else {}
23+
24+
entity_id = data.get("entity_id")
25+
attr_id = data.get("attr_id")
26+
27+
if not entity_id or not attr_id:
28+
logger.info(
29+
"hitl_event_skipped event_type=%s reason=missing_entity_id_or_attr_id",
30+
payload.get("event_type"),
31+
)
32+
return
33+
34+
proposed_at_raw = data.get("proposed_at")
35+
try:
36+
proposed_at = (
37+
datetime.fromisoformat(proposed_at_raw)
38+
if proposed_at_raw
39+
else datetime.now(timezone.utc)
40+
)
41+
except ValueError:
42+
proposed_at = datetime.now(timezone.utc)
43+
44+
item = ReviewItem(
45+
entity_id=entity_id,
46+
attr_id=attr_id,
47+
field_name=data.get("field_name", ""),
48+
proposed_value=data.get("proposed_value"),
49+
confidence=float(data.get("confidence", 0.0)),
50+
current_value=data.get("current_value"),
51+
source=data.get("source", "ai"),
52+
proposed_at=proposed_at,
53+
product_title=data.get("product_title", ""),
54+
category_label=data.get("category_label", ""),
55+
status="pending_review",
56+
)
57+
58+
adapters.review_manager.enqueue(item)
59+
logger.info(
60+
"hitl_event_enqueued entity_id=%s attr_id=%s field_name=%s confidence=%.2f",
61+
entity_id,
62+
attr_id,
63+
item.field_name,
64+
item.confidence,
65+
)
66+
67+
return {"hitl-jobs": handle_hitl_job}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Truth HITL service entrypoint."""
2+
3+
import os
4+
5+
from holiday_peak_lib.agents import FoundryAgentConfig
6+
from holiday_peak_lib.agents.memory import ColdMemory, HotMemory, WarmMemory
7+
from holiday_peak_lib.app_factory import build_service_app
8+
from holiday_peak_lib.config import MemorySettings
9+
from holiday_peak_lib.utils import EventHubSubscription, create_eventhub_lifespan
10+
from truth_hitl.adapters import build_hitl_adapters
11+
from truth_hitl.agents import TruthHITLAgent, register_mcp_tools
12+
from truth_hitl.event_handlers import build_event_handlers
13+
from truth_hitl.routes import build_review_router
14+
15+
SERVICE_NAME = "truth-hitl"
16+
memory_settings = MemorySettings()
17+
endpoint = os.getenv("PROJECT_ENDPOINT") or os.getenv("FOUNDRY_ENDPOINT")
18+
project_name = os.getenv("PROJECT_NAME") or os.getenv("FOUNDRY_PROJECT_NAME")
19+
stream = (os.getenv("FOUNDRY_STREAM") or "").lower() in {"1", "true", "yes"}
20+
slm_agent_id = os.getenv("FOUNDRY_AGENT_ID_FAST")
21+
llm_agent_id = os.getenv("FOUNDRY_AGENT_ID_RICH")
22+
slm_deployment = os.getenv("MODEL_DEPLOYMENT_NAME_FAST")
23+
llm_deployment = os.getenv("MODEL_DEPLOYMENT_NAME_RICH")
24+
25+
slm_config = (
26+
FoundryAgentConfig(
27+
endpoint=endpoint,
28+
agent_id=slm_agent_id,
29+
deployment_name=slm_deployment,
30+
project_name=project_name,
31+
stream=stream,
32+
)
33+
if endpoint and slm_agent_id
34+
else None
35+
)
36+
37+
llm_config = (
38+
FoundryAgentConfig(
39+
endpoint=endpoint,
40+
agent_id=llm_agent_id,
41+
deployment_name=llm_deployment,
42+
project_name=project_name,
43+
stream=stream,
44+
)
45+
if endpoint and llm_agent_id
46+
else None
47+
)
48+
49+
app = build_service_app(
50+
SERVICE_NAME,
51+
agent_class=TruthHITLAgent,
52+
hot_memory=(HotMemory(memory_settings.redis_url) if memory_settings.redis_url else None),
53+
warm_memory=(
54+
WarmMemory(
55+
memory_settings.cosmos_account_uri,
56+
memory_settings.cosmos_database,
57+
memory_settings.cosmos_container,
58+
)
59+
if memory_settings.cosmos_account_uri
60+
else None
61+
),
62+
cold_memory=(
63+
ColdMemory(
64+
memory_settings.blob_account_url,
65+
memory_settings.blob_container,
66+
)
67+
if memory_settings.blob_account_url
68+
else None
69+
),
70+
slm_config=slm_config,
71+
llm_config=llm_config,
72+
mcp_setup=register_mcp_tools,
73+
lifespan=create_eventhub_lifespan(
74+
service_name=SERVICE_NAME,
75+
subscriptions=[
76+
EventHubSubscription("hitl-jobs", "hitl-service"),
77+
],
78+
handlers=build_event_handlers(),
79+
),
80+
)
81+
82+
# Mount the review REST routes
83+
_adapters = build_hitl_adapters()
84+
app.include_router(build_review_router(_adapters))

0 commit comments

Comments
 (0)