Skip to content

Commit 853ee1e

Browse files
committed
feat: Implement logistics agents and adapters for carrier selection, ETA computation, returns support, and route issue detection
1 parent 2c8cd98 commit 853ee1e

File tree

12 files changed

+552
-8
lines changed

12 files changed

+552
-8
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Adapters for the logistics carrier selection service."""
2+
from __future__ import annotations
3+
4+
from dataclasses import dataclass
5+
from typing import Any, Optional
6+
7+
from holiday_peak_lib.adapters.logistics_adapter import LogisticsConnector
8+
from holiday_peak_lib.adapters.mock_adapters import MockLogisticsAdapter
9+
from holiday_peak_lib.schemas.logistics import LogisticsContext
10+
11+
12+
@dataclass
13+
class CarrierSelectionAdapters:
14+
"""Container for carrier selection adapters."""
15+
16+
logistics: LogisticsConnector
17+
selector: "CarrierSelector"
18+
19+
20+
class CarrierSelector:
21+
"""Choose a carrier based on shipment attributes."""
22+
23+
async def select(self, context: LogisticsContext) -> dict[str, Any]:
24+
shipment = context.shipment
25+
service = shipment.service_level or "standard"
26+
carrier = shipment.carrier or ("priority-carrier" if service == "express" else "economy-carrier")
27+
return {
28+
"tracking_id": shipment.tracking_id,
29+
"service_level": service,
30+
"recommended_carrier": carrier,
31+
"reason": "matched service level",
32+
}
33+
34+
35+
def build_carrier_selection_adapters(
36+
*, logistics_connector: Optional[LogisticsConnector] = None
37+
) -> CarrierSelectionAdapters:
38+
"""Create adapters for carrier selection workflows.
39+
40+
Uses mock adapters by default to keep local development lightweight.
41+
"""
42+
logistics = logistics_connector or LogisticsConnector(adapter=MockLogisticsAdapter())
43+
selector = CarrierSelector()
44+
return CarrierSelectionAdapters(logistics=logistics, selector=selector)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Logistics carrier selection agent implementation and MCP tool registration."""
2+
from __future__ import annotations
3+
4+
from typing import Any
5+
6+
from holiday_peak_lib.agents import BaseRetailAgent
7+
from holiday_peak_lib.agents.fastapi_mcp import FastAPIMCPServer
8+
9+
from .adapters import CarrierSelectionAdapters, build_carrier_selection_adapters
10+
11+
12+
class CarrierSelectionAgent(BaseRetailAgent):
13+
"""Agent that recommends a carrier for a shipment."""
14+
15+
def __init__(self, config, *args: Any, **kwargs: Any) -> None:
16+
super().__init__(config, *args, **kwargs)
17+
self._adapters = build_carrier_selection_adapters()
18+
19+
@property
20+
def adapters(self) -> CarrierSelectionAdapters:
21+
return self._adapters
22+
23+
async def handle(self, request: dict[str, Any]) -> dict[str, Any]:
24+
tracking_id = request.get("tracking_id")
25+
if not tracking_id:
26+
return {"error": "tracking_id is required"}
27+
28+
context = await self.adapters.logistics.build_logistics_context(str(tracking_id))
29+
if not context:
30+
return {"error": "shipment not found", "tracking_id": tracking_id}
31+
32+
recommendation = await self.adapters.selector.select(context)
33+
34+
if self.slm or self.llm:
35+
messages = [
36+
{"role": "system", "content": _carrier_instructions()},
37+
{
38+
"role": "user",
39+
"content": {
40+
"tracking_id": tracking_id,
41+
"shipment": context.model_dump(),
42+
"recommendation": recommendation,
43+
},
44+
},
45+
]
46+
return await self.invoke_model(request=request, messages=messages)
47+
48+
return {
49+
"service": self.service_name,
50+
"tracking_id": tracking_id,
51+
"shipment": context.model_dump(),
52+
"recommendation": recommendation,
53+
}
54+
55+
56+
def register_mcp_tools(mcp: FastAPIMCPServer, agent: BaseRetailAgent) -> None:
57+
"""Expose MCP tools for carrier selection workflows."""
58+
adapters = getattr(agent, "adapters", build_carrier_selection_adapters())
59+
60+
async def get_logistics_context(payload: dict[str, Any]) -> dict[str, Any]:
61+
tracking_id = payload.get("tracking_id")
62+
if not tracking_id:
63+
return {"error": "tracking_id is required"}
64+
context = await adapters.logistics.build_logistics_context(str(tracking_id))
65+
return {"logistics_context": context.model_dump() if context else None}
66+
67+
async def get_carrier_recommendation(payload: dict[str, Any]) -> dict[str, Any]:
68+
tracking_id = payload.get("tracking_id")
69+
if not tracking_id:
70+
return {"error": "tracking_id is required"}
71+
context = await adapters.logistics.build_logistics_context(str(tracking_id))
72+
if not context:
73+
return {"error": "shipment not found", "tracking_id": tracking_id}
74+
recommendation = await adapters.selector.select(context)
75+
return {"recommendation": recommendation}
76+
77+
mcp.add_tool("/logistics/carrier/context", get_logistics_context)
78+
mcp.add_tool("/logistics/carrier/recommendation", get_carrier_recommendation)
79+
80+
81+
def _carrier_instructions() -> str:
82+
return (
83+
"You are a logistics carrier selection agent. "
84+
"Recommend the best carrier based on service level and constraints. "
85+
"Explain trade-offs and risks."
86+
)

apps/logistics-carrier-selection/src/logistics_carrier_selection/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
"""Logistics carrier selection service."""
22
from holiday_peak_lib.agents.memory import ColdMemory, HotMemory, WarmMemory
3-
from holiday_peak_lib.agents.service_agent import ServiceAgent
43
from holiday_peak_lib.app_factory import build_service_app
54
from holiday_peak_lib.config import MemorySettings
65

6+
from logistics_carrier_selection.agents import CarrierSelectionAgent, register_mcp_tools
7+
78
SERVICE_NAME = "logistics-carrier-selection"
89
memory_settings = MemorySettings()
910
app = build_service_app(
1011
SERVICE_NAME,
11-
agent_class=ServiceAgent,
12+
agent_class=CarrierSelectionAgent,
1213
hot_memory=HotMemory(memory_settings.redis_url),
1314
warm_memory=WarmMemory(
1415
memory_settings.cosmos_account_uri,
@@ -19,4 +20,5 @@
1920
memory_settings.blob_account_url,
2021
memory_settings.blob_container,
2122
),
23+
mcp_setup=register_mcp_tools,
2224
)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Adapters for the logistics ETA computation service."""
2+
from __future__ import annotations
3+
4+
from dataclasses import dataclass
5+
from datetime import datetime, timedelta
6+
from typing import Any, Optional
7+
8+
from holiday_peak_lib.adapters.logistics_adapter import LogisticsConnector
9+
from holiday_peak_lib.adapters.mock_adapters import MockLogisticsAdapter
10+
from holiday_peak_lib.schemas.logistics import LogisticsContext
11+
12+
13+
@dataclass
14+
class EtaComputationAdapters:
15+
"""Container for ETA computation adapters."""
16+
17+
logistics: LogisticsConnector
18+
estimator: "EtaEstimator"
19+
20+
21+
class EtaEstimator:
22+
"""Compute ETA based on event timeline."""
23+
24+
async def compute_eta(self, context: LogisticsContext) -> dict[str, Any]:
25+
shipment = context.shipment
26+
base_eta = shipment.eta
27+
if base_eta:
28+
return {
29+
"tracking_id": shipment.tracking_id,
30+
"eta": base_eta.isoformat(),
31+
"source": "carrier",
32+
}
33+
now = datetime.utcnow()
34+
inferred_eta = now + timedelta(days=2)
35+
return {
36+
"tracking_id": shipment.tracking_id,
37+
"eta": inferred_eta.isoformat(),
38+
"source": "estimated",
39+
}
40+
41+
42+
def build_eta_adapters(
43+
*, logistics_connector: Optional[LogisticsConnector] = None
44+
) -> EtaComputationAdapters:
45+
"""Create adapters for ETA computation workflows.
46+
47+
Uses mock adapters by default to keep local development lightweight.
48+
"""
49+
logistics = logistics_connector or LogisticsConnector(adapter=MockLogisticsAdapter())
50+
estimator = EtaEstimator()
51+
return EtaComputationAdapters(logistics=logistics, estimator=estimator)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Logistics ETA computation agent implementation and MCP tool registration."""
2+
from __future__ import annotations
3+
4+
from typing import Any
5+
6+
from holiday_peak_lib.agents import BaseRetailAgent
7+
from holiday_peak_lib.agents.fastapi_mcp import FastAPIMCPServer
8+
9+
from .adapters import EtaComputationAdapters, build_eta_adapters
10+
11+
12+
class EtaComputationAgent(BaseRetailAgent):
13+
"""Agent that computes updated ETA for shipments."""
14+
15+
def __init__(self, config, *args: Any, **kwargs: Any) -> None:
16+
super().__init__(config, *args, **kwargs)
17+
self._adapters = build_eta_adapters()
18+
19+
@property
20+
def adapters(self) -> EtaComputationAdapters:
21+
return self._adapters
22+
23+
async def handle(self, request: dict[str, Any]) -> dict[str, Any]:
24+
tracking_id = request.get("tracking_id")
25+
if not tracking_id:
26+
return {"error": "tracking_id is required"}
27+
28+
context = await self.adapters.logistics.build_logistics_context(str(tracking_id))
29+
if not context:
30+
return {"error": "shipment not found", "tracking_id": tracking_id}
31+
32+
eta = await self.adapters.estimator.compute_eta(context)
33+
34+
if self.slm or self.llm:
35+
messages = [
36+
{"role": "system", "content": _eta_instructions()},
37+
{
38+
"role": "user",
39+
"content": {
40+
"tracking_id": tracking_id,
41+
"logistics_context": context.model_dump(),
42+
"eta": eta,
43+
},
44+
},
45+
]
46+
return await self.invoke_model(request=request, messages=messages)
47+
48+
return {
49+
"service": self.service_name,
50+
"tracking_id": tracking_id,
51+
"logistics_context": context.model_dump(),
52+
"eta": eta,
53+
}
54+
55+
56+
def register_mcp_tools(mcp: FastAPIMCPServer, agent: BaseRetailAgent) -> None:
57+
"""Expose MCP tools for ETA computation workflows."""
58+
adapters = getattr(agent, "adapters", build_eta_adapters())
59+
60+
async def get_logistics_context(payload: dict[str, Any]) -> dict[str, Any]:
61+
tracking_id = payload.get("tracking_id")
62+
if not tracking_id:
63+
return {"error": "tracking_id is required"}
64+
context = await adapters.logistics.build_logistics_context(str(tracking_id))
65+
return {"logistics_context": context.model_dump() if context else None}
66+
67+
async def get_eta(payload: dict[str, Any]) -> dict[str, Any]:
68+
tracking_id = payload.get("tracking_id")
69+
if not tracking_id:
70+
return {"error": "tracking_id is required"}
71+
context = await adapters.logistics.build_logistics_context(str(tracking_id))
72+
if not context:
73+
return {"error": "shipment not found", "tracking_id": tracking_id}
74+
eta = await adapters.estimator.compute_eta(context)
75+
return {"eta": eta}
76+
77+
mcp.add_tool("/logistics/eta/context", get_logistics_context)
78+
mcp.add_tool("/logistics/eta", get_eta)
79+
80+
81+
def _eta_instructions() -> str:
82+
return (
83+
"You are a logistics ETA computation agent. "
84+
"Provide the most accurate ETA and explain confidence level. "
85+
"Highlight delays and risk factors."
86+
)

apps/logistics-eta-computation/src/logistics_eta_computation/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
"""Logistics ETA computation service."""
22
from holiday_peak_lib.agents.memory import ColdMemory, HotMemory, WarmMemory
3-
from holiday_peak_lib.agents.service_agent import ServiceAgent
43
from holiday_peak_lib.app_factory import build_service_app
54
from holiday_peak_lib.config import MemorySettings
65

6+
from logistics_eta_computation.agents import EtaComputationAgent, register_mcp_tools
7+
78
SERVICE_NAME = "logistics-eta-computation"
89
memory_settings = MemorySettings()
910
app = build_service_app(
1011
SERVICE_NAME,
11-
agent_class=ServiceAgent,
12+
agent_class=EtaComputationAgent,
1213
hot_memory=HotMemory(memory_settings.redis_url),
1314
warm_memory=WarmMemory(
1415
memory_settings.cosmos_account_uri,
@@ -19,4 +20,5 @@
1920
memory_settings.blob_account_url,
2021
memory_settings.blob_container,
2122
),
23+
mcp_setup=register_mcp_tools,
2224
)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Adapters for the logistics returns support service."""
2+
from __future__ import annotations
3+
4+
from dataclasses import dataclass
5+
from typing import Any, Optional
6+
7+
from holiday_peak_lib.adapters.logistics_adapter import LogisticsConnector
8+
from holiday_peak_lib.adapters.mock_adapters import MockLogisticsAdapter
9+
from holiday_peak_lib.schemas.logistics import LogisticsContext
10+
11+
12+
@dataclass
13+
class ReturnsSupportAdapters:
14+
"""Container for returns support adapters."""
15+
16+
logistics: LogisticsConnector
17+
assistant: "ReturnsSupportAssistant"
18+
19+
20+
class ReturnsSupportAssistant:
21+
"""Generate returns support guidance from logistics context."""
22+
23+
async def build_returns_plan(self, context: LogisticsContext) -> dict[str, Any]:
24+
shipment = context.shipment
25+
status = shipment.status
26+
return {
27+
"tracking_id": shipment.tracking_id,
28+
"status": status,
29+
"eligible_for_return": status in {"delivered", "in_transit"},
30+
"next_steps": _return_next_steps(status),
31+
}
32+
33+
34+
def build_returns_support_adapters(
35+
*, logistics_connector: Optional[LogisticsConnector] = None
36+
) -> ReturnsSupportAdapters:
37+
"""Create adapters for returns support workflows.
38+
39+
Uses mock adapters by default to keep local development lightweight.
40+
"""
41+
logistics = logistics_connector or LogisticsConnector(adapter=MockLogisticsAdapter())
42+
assistant = ReturnsSupportAssistant()
43+
return ReturnsSupportAdapters(logistics=logistics, assistant=assistant)
44+
45+
46+
def _return_next_steps(status: str) -> list[str]:
47+
if status == "delivered":
48+
return ["Confirm return reason", "Issue return label", "Schedule pickup"]
49+
return ["Confirm shipment status", "Offer return window", "Notify support"]

0 commit comments

Comments
 (0)