Skip to content

Commit 2c8cd98

Browse files
committed
feat: Implement product agents and adapters for ACP transformation, assortment optimization, consistency validation, and normalization/classification
1 parent 0006dcf commit 2c8cd98

File tree

12 files changed

+607
-8
lines changed

12 files changed

+607
-8
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""Adapters for the product ACP transformation service."""
2+
from __future__ import annotations
3+
4+
from dataclasses import dataclass
5+
from typing import Optional
6+
7+
from holiday_peak_lib.adapters.mock_adapters import MockProductAdapter
8+
from holiday_peak_lib.adapters.product_adapter import ProductConnector
9+
from holiday_peak_lib.schemas.product import CatalogProduct
10+
11+
12+
@dataclass
13+
class AcpTransformationAdapters:
14+
"""Container for ACP transformation adapters."""
15+
16+
products: ProductConnector
17+
mapper: "AcpCatalogMapper"
18+
19+
20+
class AcpCatalogMapper:
21+
"""Map catalog products to ACP Product Feed-like fields."""
22+
23+
def to_acp_product(
24+
self,
25+
product: CatalogProduct,
26+
*,
27+
availability: str,
28+
currency: str = "usd",
29+
) -> dict[str, object]:
30+
sku = product.sku
31+
price = product.price if product.price is not None else 0.0
32+
image_url = product.image_url or "https://example.com/images/placeholder.png"
33+
product_url = f"https://example.com/products/{sku}"
34+
return {
35+
"item_id": sku,
36+
"title": product.name,
37+
"description": product.description or "",
38+
"url": product_url,
39+
"image_url": image_url,
40+
"brand": product.brand or "",
41+
"price": f"{price:.2f} {currency}",
42+
"availability": availability,
43+
"is_eligible_search": True,
44+
"is_eligible_checkout": True,
45+
"store_name": "Example Store",
46+
"seller_url": "https://example.com/store",
47+
"seller_privacy_policy": "https://example.com/privacy",
48+
"seller_tos": "https://example.com/terms",
49+
"return_policy": "https://example.com/returns",
50+
"return_window": 30,
51+
"target_countries": ["US"],
52+
"store_country": "US",
53+
}
54+
55+
56+
def build_acp_transformation_adapters(
57+
*, product_connector: Optional[ProductConnector] = None
58+
) -> AcpTransformationAdapters:
59+
"""Create adapters for ACP transformation workflows."""
60+
products = product_connector or ProductConnector(adapter=MockProductAdapter())
61+
mapper = AcpCatalogMapper()
62+
return AcpTransformationAdapters(products=products, mapper=mapper)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""Product ACP transformation agent implementation and MCP tool registration."""
2+
from __future__ import annotations
3+
4+
from typing import Any
5+
6+
from holiday_peak_lib.agents import BaseRetailAgent
7+
from holiday_peak_lib.agents.fastapi_mcp import FastAPIMCPServer
8+
9+
from .adapters import AcpTransformationAdapters, build_acp_transformation_adapters
10+
11+
12+
class ProductAcpTransformationAgent(BaseRetailAgent):
13+
"""Agent that transforms catalog products into ACP payloads."""
14+
15+
def __init__(self, config, *args: Any, **kwargs: Any) -> None:
16+
super().__init__(config, *args, **kwargs)
17+
self._adapters = build_acp_transformation_adapters()
18+
19+
@property
20+
def adapters(self) -> AcpTransformationAdapters:
21+
return self._adapters
22+
23+
async def handle(self, request: dict[str, Any]) -> dict[str, Any]:
24+
sku = request.get("sku")
25+
if not sku:
26+
return {"error": "sku is required"}
27+
availability = request.get("availability", "in_stock")
28+
currency = request.get("currency", "usd")
29+
30+
product = await self.adapters.products.get_product(str(sku))
31+
if not product:
32+
return {"error": "sku not found", "sku": sku}
33+
34+
acp = self.adapters.mapper.to_acp_product(
35+
product, availability=str(availability), currency=str(currency)
36+
)
37+
38+
if self.slm or self.llm:
39+
messages = [
40+
{"role": "system", "content": _acp_instructions()},
41+
{
42+
"role": "user",
43+
"content": {
44+
"sku": sku,
45+
"product": product.model_dump(),
46+
"acp_product": acp,
47+
},
48+
},
49+
]
50+
return await self.invoke_model(request=request, messages=messages)
51+
52+
return {
53+
"service": self.service_name,
54+
"sku": sku,
55+
"product": product.model_dump(),
56+
"acp_product": acp,
57+
}
58+
59+
60+
def register_mcp_tools(mcp: FastAPIMCPServer, agent: BaseRetailAgent) -> None:
61+
"""Expose MCP tools for ACP transformation workflows."""
62+
adapters = getattr(agent, "adapters", build_acp_transformation_adapters())
63+
64+
async def transform_product(payload: dict[str, Any]) -> dict[str, Any]:
65+
sku = payload.get("sku")
66+
if not sku:
67+
return {"error": "sku is required"}
68+
product = await adapters.products.get_product(str(sku))
69+
if not product:
70+
return {"error": "sku not found", "sku": sku}
71+
acp = adapters.mapper.to_acp_product(
72+
product,
73+
availability=str(payload.get("availability", "in_stock")),
74+
currency=str(payload.get("currency", "usd")),
75+
)
76+
return {"acp_product": acp}
77+
78+
async def get_product(payload: dict[str, Any]) -> dict[str, Any]:
79+
sku = payload.get("sku")
80+
if not sku:
81+
return {"error": "sku is required"}
82+
product = await adapters.products.get_product(str(sku))
83+
return {"product": product.model_dump() if product else None}
84+
85+
mcp.add_tool("/product/acp/transform", transform_product)
86+
mcp.add_tool("/product/acp/product", get_product)
87+
88+
89+
def _acp_instructions() -> str:
90+
return (
91+
"You are a product ACP transformation agent. "
92+
"Ensure ACP fields are populated, highlight missing inputs, "
93+
"and call out any normalization assumptions."
94+
)

apps/product-management-acp-transformation/src/product_management_acp_transformation/main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
"""ACP transformation service."""
22
from holiday_peak_lib.agents.memory import ColdMemory, HotMemory, WarmMemory
3-
from holiday_peak_lib.agents.service_agent import ServiceAgent
43
from holiday_peak_lib.app_factory import build_service_app
54
from holiday_peak_lib.config import MemorySettings
65

6+
from product_management_acp_transformation.agents import (
7+
ProductAcpTransformationAgent,
8+
register_mcp_tools,
9+
)
10+
711
SERVICE_NAME = "product-management-acp-transformation"
812
memory_settings = MemorySettings()
913
app = build_service_app(
1014
SERVICE_NAME,
11-
agent_class=ServiceAgent,
15+
agent_class=ProductAcpTransformationAgent,
1216
hot_memory=HotMemory(memory_settings.redis_url),
1317
warm_memory=WarmMemory(
1418
memory_settings.cosmos_account_uri,
@@ -19,4 +23,5 @@
1923
memory_settings.blob_account_url,
2024
memory_settings.blob_container,
2125
),
26+
mcp_setup=register_mcp_tools,
2227
)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""Adapters for the assortment optimization service."""
2+
from __future__ import annotations
3+
4+
from dataclasses import dataclass
5+
from typing import Any, Optional
6+
7+
from holiday_peak_lib.adapters.mock_adapters import MockProductAdapter
8+
from holiday_peak_lib.adapters.product_adapter import ProductConnector
9+
from holiday_peak_lib.schemas.product import CatalogProduct
10+
11+
12+
@dataclass
13+
class AssortmentAdapters:
14+
"""Container for assortment optimization adapters."""
15+
16+
products: ProductConnector
17+
optimizer: "AssortmentOptimizer"
18+
19+
20+
class AssortmentOptimizer:
21+
"""Score products for assortment decisions."""
22+
23+
async def score_products(self, products: list[CatalogProduct]) -> list[dict[str, Any]]:
24+
scored = []
25+
for product in products:
26+
rating = product.rating if product.rating is not None else 3.5
27+
price = product.price if product.price is not None else 0.0
28+
score = (rating * 2) - (price / 100)
29+
scored.append(
30+
{
31+
"sku": product.sku,
32+
"name": product.name,
33+
"rating": rating,
34+
"price": price,
35+
"score": round(score, 3),
36+
}
37+
)
38+
return sorted(scored, key=lambda item: item["score"], reverse=True)
39+
40+
async def recommend_assortment(
41+
self, products: list[CatalogProduct], *, target_size: int
42+
) -> dict[str, Any]:
43+
scored = await self.score_products(products)
44+
keep = scored[:target_size]
45+
drop = scored[target_size:]
46+
return {"keep": keep, "drop": drop, "target_size": target_size}
47+
48+
49+
def build_assortment_adapters(
50+
*, product_connector: Optional[ProductConnector] = None
51+
) -> AssortmentAdapters:
52+
"""Create adapters for assortment optimization workflows."""
53+
products = product_connector or ProductConnector(adapter=MockProductAdapter())
54+
optimizer = AssortmentOptimizer()
55+
return AssortmentAdapters(products=products, optimizer=optimizer)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""Assortment optimization agent implementation and MCP tool registration."""
2+
from __future__ import annotations
3+
4+
from typing import Any
5+
6+
from holiday_peak_lib.agents import BaseRetailAgent
7+
from holiday_peak_lib.agents.fastapi_mcp import FastAPIMCPServer
8+
9+
from .adapters import AssortmentAdapters, build_assortment_adapters
10+
11+
12+
class AssortmentOptimizationAgent(BaseRetailAgent):
13+
"""Agent that ranks products for assortment decisions."""
14+
15+
def __init__(self, config, *args: Any, **kwargs: Any) -> None:
16+
super().__init__(config, *args, **kwargs)
17+
self._adapters = build_assortment_adapters()
18+
19+
@property
20+
def adapters(self) -> AssortmentAdapters:
21+
return self._adapters
22+
23+
async def handle(self, request: dict[str, Any]) -> dict[str, Any]:
24+
skus = [str(sku) for sku in request.get("skus", [])]
25+
target_size = int(request.get("target_size", 5))
26+
if not skus:
27+
return {"error": "skus is required"}
28+
29+
products = []
30+
for sku in skus:
31+
product = await self.adapters.products.get_product(sku)
32+
if product:
33+
products.append(product)
34+
35+
if not products:
36+
return {"error": "no products found", "skus": skus}
37+
38+
recommendations = await self.adapters.optimizer.recommend_assortment(
39+
products, target_size=target_size
40+
)
41+
42+
if self.slm or self.llm:
43+
messages = [
44+
{"role": "system", "content": _assortment_instructions()},
45+
{
46+
"role": "user",
47+
"content": {
48+
"skus": skus,
49+
"products": [p.model_dump() for p in products],
50+
"assortment": recommendations,
51+
},
52+
},
53+
]
54+
return await self.invoke_model(request=request, messages=messages)
55+
56+
return {
57+
"service": self.service_name,
58+
"skus": skus,
59+
"assortment": recommendations,
60+
}
61+
62+
63+
def register_mcp_tools(mcp: FastAPIMCPServer, agent: BaseRetailAgent) -> None:
64+
"""Expose MCP tools for assortment optimization workflows."""
65+
adapters = getattr(agent, "adapters", build_assortment_adapters())
66+
67+
async def score_products(payload: dict[str, Any]) -> dict[str, Any]:
68+
skus = [str(sku) for sku in payload.get("skus", [])]
69+
if not skus:
70+
return {"error": "skus is required"}
71+
products = [p for p in [await adapters.products.get_product(sku) for sku in skus] if p]
72+
scored = await adapters.optimizer.score_products(products)
73+
return {"scores": scored}
74+
75+
async def recommend_assortment(payload: dict[str, Any]) -> dict[str, Any]:
76+
skus = [str(sku) for sku in payload.get("skus", [])]
77+
if not skus:
78+
return {"error": "skus is required"}
79+
products = [p for p in [await adapters.products.get_product(sku) for sku in skus] if p]
80+
target_size = int(payload.get("target_size", 5))
81+
recommendations = await adapters.optimizer.recommend_assortment(
82+
products, target_size=target_size
83+
)
84+
return {"assortment": recommendations}
85+
86+
mcp.add_tool("/assortment/score", score_products)
87+
mcp.add_tool("/assortment/recommendations", recommend_assortment)
88+
89+
90+
def _assortment_instructions() -> str:
91+
return (
92+
"You are an assortment optimization agent. "
93+
"Rank products by performance indicators and recommend the ideal set. "
94+
"Explain trade-offs and highlight missing signals."
95+
)

apps/product-management-assortment-optimization/src/product_management_assortment_optimization/main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
"""Assortment optimization service."""
22
from holiday_peak_lib.agents.memory import ColdMemory, HotMemory, WarmMemory
3-
from holiday_peak_lib.agents.service_agent import ServiceAgent
43
from holiday_peak_lib.app_factory import build_service_app
54
from holiday_peak_lib.config import MemorySettings
65

6+
from product_management_assortment_optimization.agents import (
7+
AssortmentOptimizationAgent,
8+
register_mcp_tools,
9+
)
10+
711
SERVICE_NAME = "product-management-assortment-optimization"
812
memory_settings = MemorySettings()
913
app = build_service_app(
1014
SERVICE_NAME,
11-
agent_class=ServiceAgent,
15+
agent_class=AssortmentOptimizationAgent,
1216
hot_memory=HotMemory(memory_settings.redis_url),
1317
warm_memory=WarmMemory(
1418
memory_settings.cosmos_account_uri,
@@ -19,4 +23,5 @@
1923
memory_settings.blob_account_url,
2024
memory_settings.blob_container,
2125
),
26+
mcp_setup=register_mcp_tools,
2227
)

0 commit comments

Comments
 (0)