-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathutils.py
More file actions
49 lines (38 loc) · 1.69 KB
/
utils.py
File metadata and controls
49 lines (38 loc) · 1.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from restate import TerminalError
from app.utils.models import (
BookingResult,
FlightBooking,
HotelBooking,
)
async def request_mcp_approval(mcp_tool_name: str, awakeable_id: str) -> None:
"""Simulate requesting human review."""
print(f"🔔 Human review requested: {mcp_tool_name}")
print(f" Submit your mcp tool approval via: \n ")
print(
f" curl localhost:8080/restate/awakeables/{awakeable_id}/resolve --json true"
)
async def reserve_hotel(id: str, booking: HotelBooking) -> BookingResult:
"""Reserve a hotel (simulated)."""
print(f"🏨 Reserving hotel in {booking.name} for {booking.guests} guests")
return BookingResult(
id=id,
confirmation=f"Hotel {booking.name} booked for {booking.guests} guests on {booking.dates}",
)
async def reserve_flight(id: str, booking: FlightBooking) -> BookingResult:
"""Reserve a flight (simulated)."""
print(f"✈️ Reserving flight from {booking.origin} to {booking.destination}")
if booking.destination == "San Francisco" or booking.destination == "SFO":
print(f"[👻 SIMULATED] Flight booking failed: No flights to SFO available...")
raise TerminalError(
f"[👻 SIMULATED] Flight booking failed: No flights to SFO available..."
)
return BookingResult(
id=id,
confirmation=f"Flight from {booking.origin} to {booking.destination} on {booking.date} for {booking.passengers} passengers",
)
async def cancel_hotel(id: str) -> None:
"""Cancel hotel booking."""
print(f"❌ Cancelling hotel booking {id}")
async def cancel_flight(id: str) -> None:
"""Cancel flight booking."""
print(f"❌ Cancelling flight booking {id}")