diff --git a/examples/dexa_scan_intake/README.md b/examples/dexa_scan_intake/README.md new file mode 100644 index 00000000..c12c2487 --- /dev/null +++ b/examples/dexa_scan_intake/README.md @@ -0,0 +1,99 @@ +# DEXA Scan Intake Agent + +A voice agent that answers questions about DEXA scans using an embedded knowledge base and Exa web search. + +## Overview + +This example creates a DEXA scan customer support agent that: +- Answers common questions about DEXA scans from an embedded knowledge base +- Searches the web via Exa for information beyond its knowledge base +- Uses natural, voice-friendly responses without formatting +- Gracefully ends calls when the user is done + +## Setup + +### Prerequisites + +- [Anthropic API key](https://console.anthropic.com/) +- [Exa API key](https://dashboard.exa.ai/api-keys) + +### Environment Variables + +```bash +export ANTHROPIC_API_KEY=your-anthropic-key +export EXA_API_KEY=your-exa-key +``` + +### Installation + +```bash +cd examples/dexa_scan_intake +uv sync +``` + +## Running + +```bash +python main.py +``` + +Then connect: + +```bash +cartesia chat 8000 +``` + +## How It Works + +The agent has two main components: + +1. **Knowledge Base** - A comprehensive DEXA FAQ embedded in the system prompt covering: + - What DEXA is and how it works + - What measurements DEXA provides + - Accuracy and safety information + - Preparation and what to expect + - How often to scan + - Who should get a DEXA scan + +2. **`search_dexa_info`** - A `@loopback_tool` that searches the web via Exa when questions go beyond the knowledge base + +## Configuration + +### Exa Search Parameters + +```python +client.search_and_contents( + f"DEXA scan {query}", + num_results=5, + type="auto", + text={"max_characters": 800} +) +``` + +### LLM Configuration + +```python +LlmConfig( + system_prompt=SYSTEM_PROMPT, + introduction=INTRODUCTION, + max_tokens=300, + temperature=0.7, +) +``` + +## Example Conversations + +**User**: "What exactly does a DEXA scan measure?" + +**Agent**: "A DEXA scan measures your total body fat percentage and how it's distributed, your lean muscle mass broken down by body region, bone mineral density, and visceral fat around your organs. It also shows symmetry between your left and right sides." + +**User**: "How much radiation is there?" + +**Agent**: "DEXA uses very low radiation, about one tenth of a standard chest X-ray. A single scan is roughly 0.001 millisieverts, which is actually less than the natural background radiation you'd get in a typical day. It's considered very safe." + +## Roadmap + +Future features planned for this agent: +- Appointment scheduling assistance +- Intake form completion +- Provider location lookup diff --git a/examples/dexa_scan_intake/appointment_scheduler.py b/examples/dexa_scan_intake/appointment_scheduler.py new file mode 100644 index 00000000..4bc7214f --- /dev/null +++ b/examples/dexa_scan_intake/appointment_scheduler.py @@ -0,0 +1,372 @@ +"""Appointment scheduling for DEXA scans with mock availability data.""" + +import asyncio +import random +from datetime import datetime, timedelta +from typing import Annotated, Optional +from loguru import logger + +from line.llm_agent import ToolEnv, loopback_tool + +# Mock BodySpec locations in San Francisco +LOCATIONS = { + "financial_district": { + "name": "Financial District", + "address": "123 Market Street, Suite 400, San Francisco, CA 94105", + "hours": "Mon-Fri 8am-6pm, Sat 9am-2pm", + }, + "soma": { + "name": "SoMa", + "address": "456 Howard Street, San Francisco, CA 94103", + "hours": "Mon-Fri 7am-7pm, Sat-Sun 9am-4pm", + }, + "marina": { + "name": "Marina District", + "address": "789 Chestnut Street, San Francisco, CA 94123", + "hours": "Mon-Fri 8am-5pm, Sat 10am-3pm", + }, + "castro": { + "name": "Castro", + "address": "321 Castro Street, San Francisco, CA 94114", + "hours": "Mon-Fri 9am-6pm, Sat 10am-2pm", + }, + "sunset": { + "name": "Sunset District", + "address": "555 Irving Street, San Francisco, CA 94122", + "hours": "Mon-Fri 8am-5pm, Sat 9am-1pm", + }, +} + + +def _generate_mock_availability(location_id: str, days_ahead: int = 7) -> list[dict]: + """Generate mock availability slots for a location.""" + slots = [] + base_date = datetime.now() + + # Generate slots for the next N days + for day_offset in range(1, days_ahead + 1): + date = base_date + timedelta(days=day_offset) + + # Skip Sundays for most locations + if date.weekday() == 6 and location_id not in ["soma"]: + continue + + # Generate 2-4 available slots per day + num_slots = random.randint(2, 4) + + # Available time slots based on location hours + if date.weekday() < 5: # Weekday + possible_times = ["9:00 AM", "10:30 AM", "12:00 PM", "2:00 PM", "3:30 PM", "5:00 PM"] + else: # Weekend + possible_times = ["10:00 AM", "11:30 AM", "1:00 PM"] + + selected_times = random.sample(possible_times, min(num_slots, len(possible_times))) + selected_times.sort() + + for time in selected_times: + slots.append({ + "date": date.strftime("%A, %B %d"), + "date_iso": date.strftime("%Y-%m-%d"), + "time": time, + "location_id": location_id, + "location_name": LOCATIONS[location_id]["name"], + "slot_id": f"{location_id}_{date.strftime('%Y%m%d')}_{time.replace(':', '').replace(' ', '')}", + }) + + return slots + + +class AppointmentScheduler: + """Manages appointment scheduling with mock availability.""" + + def __init__(self): + self._selected_slot: Optional[dict] = None + self._booked_appointment: Optional[dict] = None + self._availability_cache: dict[str, list[dict]] = {} + self._contact_for_link: Optional[dict] = None + logger.info("AppointmentScheduler initialized") + + def get_locations(self) -> list[dict]: + """Get list of all locations.""" + return [ + {"id": loc_id, **loc_data} + for loc_id, loc_data in LOCATIONS.items() + ] + + def get_availability(self, location_id: Optional[str] = None, days_ahead: int = 7) -> dict: + """Get available appointment slots.""" + if location_id and location_id not in LOCATIONS: + return { + "success": False, + "error": f"Unknown location. Available locations: {', '.join(LOCATIONS.keys())}", + } + + all_slots = [] + + if location_id: + # Get availability for specific location + slots = _generate_mock_availability(location_id, days_ahead) + all_slots.extend(slots) + else: + # Get availability for all locations + for loc_id in LOCATIONS: + slots = _generate_mock_availability(loc_id, days_ahead) + all_slots.extend(slots[:3]) # Limit to 3 per location for voice readability + + # Sort by date then time + all_slots.sort(key=lambda x: (x["date_iso"], x["time"])) + + return { + "success": True, + "slots": all_slots[:10], # Limit to 10 for voice + "total_available": len(all_slots), + "showing": min(10, len(all_slots)), + } + + def select_slot(self, slot_description: str) -> dict: + """Select an appointment slot based on user description.""" + # Get fresh availability + all_slots = [] + for loc_id in LOCATIONS: + slots = _generate_mock_availability(loc_id, 14) + all_slots.extend(slots) + + # Try to match the description + slot_description_lower = slot_description.lower() + + for slot in all_slots: + # Check if description matches date, time, or location + if (slot["date"].lower() in slot_description_lower or + slot["time"].lower() in slot_description_lower or + slot["location_name"].lower() in slot_description_lower or + slot["date_iso"] in slot_description_lower): + + # Additional matching for day of week + day_match = any(day in slot_description_lower for day in + ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]) + time_match = any(t in slot_description_lower for t in + ["morning", "afternoon", "9", "10", "11", "12", "1", "2", "3", "4", "5"]) + + if day_match or time_match or slot["location_name"].lower() in slot_description_lower: + self._selected_slot = slot + return { + "success": True, + "selected": slot, + "message": f"Selected {slot['time']} on {slot['date']} at {slot['location_name']}", + } + + return { + "success": False, + "error": "Could not find a matching slot. Please specify date, time, and/or location more clearly.", + } + + async def book_appointment(self, first_name: str, last_name: str, email: str, phone: str) -> dict: + """Book the selected appointment slot.""" + if not self._selected_slot: + return { + "success": False, + "error": "No slot selected. Please select a time slot first.", + } + + # Simulate API call + await asyncio.sleep(0.5) + + confirmation_number = f"BS-{random.randint(100000, 999999)}" + + self._booked_appointment = { + "confirmation_number": confirmation_number, + "slot": self._selected_slot, + "patient": { + "first_name": first_name, + "last_name": last_name, + "email": email, + "phone": phone, + }, + } + + logger.info(f"Booked appointment: {self._booked_appointment}") + + return { + "success": True, + "confirmation_number": confirmation_number, + "appointment": { + "date": self._selected_slot["date"], + "time": self._selected_slot["time"], + "location": self._selected_slot["location_name"], + "address": LOCATIONS[self._selected_slot["location_id"]]["address"], + }, + "message": "Appointment booked successfully! A confirmation email will be sent shortly.", + } + + async def send_availability_link(self, first_name: str, last_name: str, email: str, phone: str) -> dict: + """Send a link to check more availabilities via email.""" + # Simulate API call + await asyncio.sleep(0.3) + + self._contact_for_link = { + "first_name": first_name, + "last_name": last_name, + "email": email, + "phone": phone, + } + + logger.info(f"Sending availability link to {email}") + + return { + "success": True, + "message": f"A link to view all available appointments has been sent to {email}. " + "The link will be valid for 48 hours.", + } + + def reset(self): + """Reset scheduler state.""" + self._selected_slot = None + self._booked_appointment = None + self._availability_cache = {} + self._contact_for_link = None + + +# Global scheduler instance +_scheduler_instance: Optional[AppointmentScheduler] = None + + +def get_scheduler() -> AppointmentScheduler: + """Get or create the scheduler instance.""" + global _scheduler_instance + if _scheduler_instance is None: + _scheduler_instance = AppointmentScheduler() + return _scheduler_instance + + +def reset_scheduler_instance(): + """Reset the global scheduler instance.""" + global _scheduler_instance + _scheduler_instance = None + + +# Tool definitions + +@loopback_tool +async def list_locations(ctx: ToolEnv) -> str: + """List all BodySpec DEXA scan locations in San Francisco.""" + scheduler = get_scheduler() + locations = scheduler.get_locations() + + result = "We have 5 locations in San Francisco:\n" + for loc in locations: + result += f"- {loc['name']}: {loc['address']}. Hours: {loc['hours']}\n" + + return result + + +@loopback_tool +async def check_availability( + ctx: ToolEnv, + location: Annotated[str, "Optional location name or ID. Leave empty for all locations."] = "", +) -> str: + """Check available appointment times. Can filter by location or show all.""" + scheduler = get_scheduler() + + # Map common location names to IDs + location_map = { + "financial": "financial_district", + "financial district": "financial_district", + "downtown": "financial_district", + "soma": "soma", + "south of market": "soma", + "marina": "marina", + "castro": "castro", + "sunset": "sunset", + } + + location_id = None + if location: + location_lower = location.lower() + location_id = location_map.get(location_lower, location_lower) + if location_id not in LOCATIONS: + location_id = None # Fall back to all locations + + result = scheduler.get_availability(location_id) + + if not result["success"]: + return result["error"] + + slots = result["slots"] + if not slots: + return "No available appointments found in the next week. Would you like me to send you a link to check more dates?" + + # Format for voice + output = f"I found {result['total_available']} available slots. Here are some options:\n" + + current_date = None + for slot in slots: + if slot["date"] != current_date: + current_date = slot["date"] + output += f"\n{slot['date']}:\n" + output += f"- {slot['time']} at {slot['location_name']}\n" + + if result["total_available"] > result["showing"]: + output += f"\nThere are {result['total_available'] - result['showing']} more slots available. " + output += "Let me know if you'd like to see a specific location or I can send you a link to view all options." + + return output + + +@loopback_tool +async def select_appointment_slot( + ctx: ToolEnv, + selection: Annotated[str, "The user's selection - can include date, time, and/or location"], +) -> str: + """Select an appointment slot based on user's preference (date, time, location).""" + scheduler = get_scheduler() + result = scheduler.select_slot(selection) + + if not result["success"]: + return result["error"] + + slot = result["selected"] + return ( + f"Got it! I've selected {slot['time']} on {slot['date']} at our {slot['location_name']} location. " + f"To confirm this booking, I'll need your name, email, and phone number." + ) + + +@loopback_tool +async def book_appointment( + ctx: ToolEnv, + first_name: Annotated[str, "Patient's first name"], + last_name: Annotated[str, "Patient's last name"], + email: Annotated[str, "Patient's email address"], + phone: Annotated[str, "Patient's phone number"], +) -> str: + """Book the selected appointment slot with patient information.""" + scheduler = get_scheduler() + result = await scheduler.book_appointment(first_name, last_name, email, phone) + + if not result["success"]: + return result["error"] + + appt = result["appointment"] + return ( + f"Your appointment is confirmed! Confirmation number: {result['confirmation_number']}. " + f"You're scheduled for {appt['time']} on {appt['date']} at our {appt['location']} location, " + f"{appt['address']}. A confirmation email is on its way to {email}." + ) + + +@loopback_tool +async def send_availability_link( + ctx: ToolEnv, + first_name: Annotated[str, "User's first name"], + last_name: Annotated[str, "User's last name"], + email: Annotated[str, "User's email address"], + phone: Annotated[str, "User's phone number"], +) -> str: + """Send a link to view all available appointments when the shown times don't work.""" + scheduler = get_scheduler() + result = await scheduler.send_availability_link(first_name, last_name, email, phone) + + if not result["success"]: + return "There was an issue sending the link. Please try again." + + return result["message"] diff --git a/examples/dexa_scan_intake/cartesia.toml b/examples/dexa_scan_intake/cartesia.toml new file mode 100644 index 00000000..7c3b9c87 --- /dev/null +++ b/examples/dexa_scan_intake/cartesia.toml @@ -0,0 +1,11 @@ +[cartesia] +name = "DEXA Scan Intake" +description = "Voice agent that answers DEXA scan questions and helps with intake" +version = "0.1.0" + +[cartesia.server] +port = 8000 +host = "0.0.0.0" + +[cartesia.environment] +required_vars = ["ANTHROPIC_API_KEY", "EXA_API_KEY"] diff --git a/examples/dexa_scan_intake/intake_form.py b/examples/dexa_scan_intake/intake_form.py new file mode 100644 index 00000000..f4cec909 --- /dev/null +++ b/examples/dexa_scan_intake/intake_form.py @@ -0,0 +1,641 @@ +"""Intake form management for DEXA scan appointments.""" + +import asyncio +import json +from typing import Annotated, Any, Optional +from loguru import logger + +from line.llm_agent import ToolEnv, loopback_tool + +# Form field definitions +FORM_FIELDS = [ + # Personal Information + {"id": "first_name", "text": "What is your first name?", "type": "string", "section": "personal", "required": True, "confirm": True}, + {"id": "last_name", "text": "What is your last name?", "type": "string", "section": "personal", "required": True, "confirm": True}, + {"id": "email", "text": "What is your email address?", "type": "string", "section": "personal", "required": True, "confirm": True}, + {"id": "phone", "text": "What is your phone number?", "type": "string", "section": "personal", "required": True, "confirm": True}, + {"id": "date_of_birth", "text": "What is your date of birth?", "type": "string", "section": "personal", "required": True}, + { + "id": "ethnicity", + "text": "What is your ethnicity?", + "type": "select", + "section": "personal", + "required": True, + "options": [ + {"value": "asian", "text": "Asian"}, + {"value": "black", "text": "Black or African American"}, + {"value": "hispanic", "text": "Hispanic or Latino"}, + {"value": "native", "text": "Native American or Alaska Native"}, + {"value": "pacific", "text": "Native Hawaiian or Pacific Islander"}, + {"value": "white", "text": "White"}, + {"value": "mixed", "text": "Two or more races"}, + {"value": "other", "text": "Other"}, + {"value": "prefer_not", "text": "Prefer not to say"}, + ], + }, + { + "id": "gender", + "text": "For comparative statistics, would you like to be compared to the male or female population?", + "type": "select", + "section": "personal", + "required": True, + "options": [ + {"value": "male", "text": "Male"}, + {"value": "female", "text": "Female"}, + ], + }, + {"id": "height_inches", "text": "What is your height in inches? For example, 5 foot 8 would be 68 inches.", "type": "number", "section": "personal", "required": True, "min": 36, "max": 96}, + {"id": "weight_pounds", "text": "What is your weight in pounds?", "type": "number", "section": "personal", "required": True, "min": 50, "max": 700}, + # Qualifying Questions + {"id": "q_weight_concerns", "text": "Do you currently or have you ever had concerns about your weight?", "type": "boolean", "section": "qualifying", "required": True}, + {"id": "q_reduce_body_fat", "text": "Would you like to reduce your current body fat percentage?", "type": "boolean", "section": "qualifying", "required": True}, + {"id": "q_athlete", "text": "Are you an athlete or fitness enthusiast interested in competing or performing at a higher level?", "type": "boolean", "section": "qualifying", "required": True}, + {"id": "q_family_history", "text": "Do you or any immediate family members have heart disease or diabetes?", "type": "boolean", "section": "qualifying", "required": True}, + {"id": "q_high_blood_pressure", "text": "Do you have high blood pressure?", "type": "boolean", "section": "qualifying", "required": True}, + {"id": "q_injuries", "text": "Are you currently suffering from any joint, muscular, or ligament injuries?", "type": "boolean", "section": "qualifying", "required": True}, + # Disqualifying Questions + {"id": "disq_barium_xray", "text": "Have you had a barium X-ray in the last 2 weeks?", "type": "boolean", "section": "disqualifying", "required": True}, + {"id": "disq_nuclear_scan", "text": "Have you had a nuclear medicine scan or injection of an X-ray dye in the last week?", "type": "boolean", "section": "disqualifying", "required": True}, +] + + +class IntakeForm: + """Manages the DEXA scan intake form state and provides tools for the agent.""" + + def __init__(self): + self._fields = FORM_FIELDS.copy() + self._answers: dict[str, Any] = {} + self._current_index: int = 0 + self._is_started: bool = False + self._is_submitted: bool = False + logger.info(f"IntakeForm initialized with {len(self._fields)} fields") + + def start_form(self) -> dict: + """Start the intake form process.""" + if not self._is_started: + self._is_started = True + logger.info("Intake form started") + return self.get_status() + + def restart_form(self) -> dict: + """Clear all answers and restart the form.""" + self._answers = {} + self._current_index = 0 + self._is_started = True + self._is_submitted = False + logger.info("Intake form restarted") + return { + "success": True, + "message": "Form has been cleared and restarted", + "next_question": self._format_question(self._fields[0]), + } + + def get_status(self) -> dict: + """Get current form status.""" + current = self._get_current_field() + answered_count = len(self._answers) + total_count = len(self._fields) + + return { + "is_started": self._is_started, + "is_complete": current is None, + "is_submitted": self._is_submitted, + "progress": f"{answered_count}/{total_count}", + "answered_fields": list(self._answers.keys()), + "current_question": self._format_question(current) if current else None, + "current_section": current["section"] if current else None, + } + + def _get_field_by_id(self, field_id: str) -> Optional[dict]: + """Get a field definition by its ID.""" + for field in self._fields: + if field["id"] == field_id: + return field + return None + + def _get_field_index(self, field_id: str) -> int: + """Get the index of a field by its ID. Returns -1 if not found.""" + for i, field in enumerate(self._fields): + if field["id"] == field_id: + return i + return -1 + + def get_answered_fields_summary(self) -> dict: + """Get a summary of all answered fields with their values.""" + answered = [] + for field in self._fields: + if field["id"] in self._answers: + value = self._answers[field["id"]] + # Format the value for display + if field["type"] == "boolean": + display_value = "Yes" if value else "No" + elif field["type"] == "select": + # Find the text for the selected option + display_value = value + for opt in field.get("options", []): + if opt["value"] == value: + display_value = opt["text"] + break + else: + display_value = str(value) + + answered.append({ + "field_id": field["id"], + "question": field["text"], + "answer": display_value, + "raw_value": value, + }) + return { + "answered_count": len(answered), + "total_count": len(self._fields), + "fields": answered, + } + + def edit_answer(self, field_id: str, new_answer: str) -> dict: + """Edit a previously answered field without changing the current position.""" + if not self._is_started: + return { + "success": False, + "error": "Form has not been started yet.", + } + + if self._is_submitted: + return { + "success": False, + "error": "Form has already been submitted. Cannot edit answers.", + } + + field = self._get_field_by_id(field_id) + if not field: + available = [f["id"] for f in self._fields] + return { + "success": False, + "error": f"Unknown field '{field_id}'. Available fields: {', '.join(available)}", + } + + if field_id not in self._answers: + return { + "success": False, + "error": f"Field '{field_id}' has not been answered yet. Cannot edit.", + } + + # Validate and process the new answer + success, processed, error = self._process_answer(new_answer, field) + if not success: + return { + "success": False, + "error": error, + "field_id": field_id, + "question": self._format_question(field), + } + + old_value = self._answers[field_id] + self._answers[field_id] = processed + logger.info(f"Edited '{field_id}': {old_value} -> {processed}") + + # Get current question info + current = self._get_current_field() + + return { + "success": True, + "field_id": field_id, + "old_value": old_value, + "new_value": processed, + "message": f"Updated {field_id} from '{old_value}' to '{processed}'.", + "current_question": self._format_question(current) if current else None, + "is_complete": current is None, + } + + def go_back_to_question(self, field_id: str) -> dict: + """Go back to a specific question, clearing all answers from that point forward.""" + if not self._is_started: + return { + "success": False, + "error": "Form has not been started yet.", + } + + if self._is_submitted: + return { + "success": False, + "error": "Form has already been submitted. Use restart_form to start over.", + } + + field_index = self._get_field_index(field_id) + if field_index == -1: + available = [f["id"] for f in self._fields] + return { + "success": False, + "error": f"Unknown field '{field_id}'. Available fields: {', '.join(available)}", + } + + # Clear answers from this field forward + cleared_fields = [] + for i in range(field_index, len(self._fields)): + fid = self._fields[i]["id"] + if fid in self._answers: + del self._answers[fid] + cleared_fields.append(fid) + + self._current_index = field_index + logger.info(f"Went back to '{field_id}', cleared: {cleared_fields}") + + field = self._fields[field_index] + return { + "success": True, + "message": f"Returned to question: {field['text']}", + "cleared_fields": cleared_fields, + "current_question": self._format_question(field), + "progress": f"{len(self._answers)}/{len(self._fields)}", + } + + def _get_current_field(self) -> Optional[dict]: + """Get the current field to ask about.""" + if self._current_index < len(self._fields): + return self._fields[self._current_index] + return None + + def _format_question(self, field: dict) -> str: + """Format a field as a question for the agent.""" + text = field["text"] + ftype = field["type"] + + if ftype == "select" and "options" in field: + opts = ", ".join(o["text"] for o in field["options"]) + text += f" The options are: {opts}." + elif ftype == "boolean": + text += " Yes or no?" + elif ftype == "number": + if "min" in field and "max" in field: + text += f" Please provide a number between {field['min']} and {field['max']}." + + return text + + def _process_answer(self, answer: str, field: dict) -> tuple[bool, Any, str]: + """Process and validate an answer. Returns (success, processed_value, error_message).""" + answer = answer.strip() + ftype = field["type"] + + if ftype == "string": + if not answer: + return False, None, "Please provide a non-empty answer." + return True, answer, "" + + elif ftype == "number": + try: + # Handle common spoken numbers + num = float(answer.replace(",", "")) + if "min" in field and num < field["min"]: + return False, None, f"The number should be at least {field['min']}." + if "max" in field and num > field["max"]: + return False, None, f"The number should be no more than {field['max']}." + return True, int(num) if num.is_integer() else num, "" + except ValueError: + return False, None, "Please provide a valid number." + + elif ftype == "boolean": + lower = answer.lower() + if lower in ["yes", "true", "y", "1", "yeah", "yep", "correct", "right", "affirmative"]: + return True, True, "" + elif lower in ["no", "false", "n", "0", "nope", "nah", "negative"]: + return True, False, "" + return False, None, "Please answer yes or no." + + elif ftype == "select": + lower = answer.lower() + for opt in field.get("options", []): + if lower in (opt["text"].lower(), opt["value"].lower()) or lower in opt["text"].lower(): + return True, opt["value"], "" + opts = ", ".join(o["text"] for o in field["options"]) + return False, None, f"Please choose from: {opts}." + + return True, answer, "" + + def record_answer(self, answer: str) -> dict: + """Record an answer to the current field.""" + if not self._is_started: + return { + "success": False, + "error": "Form has not been started yet. Start the form first.", + "next_question": None, + } + + field = self._get_current_field() + if not field: + return { + "success": False, + "error": "Form is already complete. Submit the form or restart if needed.", + "is_complete": True, + } + + success, processed, error = self._process_answer(answer, field) + if not success: + return { + "success": False, + "error": error, + "current_question": self._format_question(field), + } + + self._answers[field["id"]] = processed + self._current_index += 1 + logger.info(f"Recorded '{field['id']}': {processed}") + + next_field = self._get_current_field() + + # Check section transitions + section_message = "" + if next_field and field["section"] != next_field["section"]: + if next_field["section"] == "qualifying": + section_message = "Now I need to ask a few qualifying questions. " + elif next_field["section"] == "disqualifying": + section_message = "Almost done. Just two more quick questions. " + + return { + "success": True, + "recorded_field": field["id"], + "recorded_value": processed, + "section_message": section_message, + "next_question": self._format_question(next_field) if next_field else None, + "next_field": next_field, + "is_complete": next_field is None, + "progress": f"{len(self._answers)}/{len(self._fields)}", + } + + def get_form_data(self) -> dict: + """Get the complete form data as JSON-serializable dict.""" + return { + "personal_info": { + "first_name": self._answers.get("first_name"), + "last_name": self._answers.get("last_name"), + "email": self._answers.get("email"), + "phone": self._answers.get("phone"), + "date_of_birth": self._answers.get("date_of_birth"), + "ethnicity": self._answers.get("ethnicity"), + "gender": self._answers.get("gender"), + "height_inches": self._answers.get("height_inches"), + "weight_pounds": self._answers.get("weight_pounds"), + }, + "qualifying_questions": { + "weight_concerns": self._answers.get("q_weight_concerns"), + "reduce_body_fat": self._answers.get("q_reduce_body_fat"), + "athlete": self._answers.get("q_athlete"), + "family_history": self._answers.get("q_family_history"), + "high_blood_pressure": self._answers.get("q_high_blood_pressure"), + "injuries": self._answers.get("q_injuries"), + }, + "disqualifying_questions": { + "barium_xray": self._answers.get("disq_barium_xray"), + "nuclear_scan": self._answers.get("disq_nuclear_scan"), + }, + } + + def check_eligibility(self) -> dict: + """Check if the user is eligible for a DEXA scan based on answers.""" + # Must answer YES to at least one qualifying question + qualifying = [ + self._answers.get("q_weight_concerns"), + self._answers.get("q_reduce_body_fat"), + self._answers.get("q_athlete"), + self._answers.get("q_family_history"), + self._answers.get("q_high_blood_pressure"), + self._answers.get("q_injuries"), + ] + has_qualifying = any(q is True for q in qualifying if q is not None) + + # Must answer NO to both disqualifying questions + barium = self._answers.get("disq_barium_xray") + nuclear = self._answers.get("disq_nuclear_scan") + has_disqualifying = barium is True or nuclear is True + + eligible = has_qualifying and not has_disqualifying + + reasons = [] + if not has_qualifying: + reasons.append("You must answer yes to at least one qualifying question.") + if barium is True: + reasons.append("A barium X-ray within 2 weeks disqualifies you. Please reschedule.") + if nuclear is True: + reasons.append("A nuclear medicine scan or X-ray dye injection within 1 week disqualifies you. Please reschedule.") + + return { + "eligible": eligible, + "reasons": reasons, + } + + async def submit_form(self) -> dict: + """Submit the completed form to the API.""" + if self._current_index < len(self._fields): + return { + "success": False, + "error": "Form is not complete. Please answer all questions first.", + } + + if self._is_submitted: + return { + "success": False, + "error": "Form has already been submitted.", + } + + eligibility = self.check_eligibility() + form_data = self.get_form_data() + + # Mock API call + await asyncio.sleep(0.5) + logger.info(f"Submitting form data: {json.dumps(form_data, indent=2)}") + + self._is_submitted = True + + if not eligibility["eligible"]: + return { + "success": True, + "submitted": True, + "eligible": False, + "message": "Form submitted but you are not currently eligible for a scan.", + "reasons": eligibility["reasons"], + "contact": "Please contact support@bodyspec.com for assistance.", + } + + return { + "success": True, + "submitted": True, + "eligible": True, + "message": "Form submitted successfully! You are eligible for a DEXA scan.", + "confirmation_number": f"DEXA-{hash(json.dumps(form_data)) % 100000:05d}", + "next_steps": "You can now book an appointment. We will send a confirmation email shortly.", + } + + +# Global form instance (per session - in production this would be per-call) +_form_instance: Optional[IntakeForm] = None + + +def get_form() -> IntakeForm: + """Get or create the form instance.""" + global _form_instance + if _form_instance is None: + _form_instance = IntakeForm() + return _form_instance + + +def reset_form_instance(): + """Reset the global form instance.""" + global _form_instance + _form_instance = None + + +# Tool definitions + +@loopback_tool(is_background=True) +async def start_intake_form(ctx: ToolEnv) -> str: + """Start the DEXA scan intake form. Use when user wants to book an appointment, get started, or asks how often they should scan.""" + form = get_form() + status = form.start_form() + + first_question = status.get("current_question", "") + return ( + f"Starting intake form. Progress: {status['progress']}. " + f"First question: {first_question}" + ) + + +@loopback_tool(is_background=True) +async def record_intake_answer( + ctx: ToolEnv, + answer: Annotated[str, "The user's confirmed answer to record (NOT 'yes' or confirmations)"], +) -> str: + """Record the user's answer to the current intake form question. + IMPORTANT: Only call this with the actual answer value, never with 'yes' or confirmation words. + For fields with confirm=true: first confirm verbally with user, then call this tool after they confirm.""" + form = get_form() + result = form.record_answer(answer) + + if not result["success"]: + return f"Error: {result.get('error', 'Unknown error')}. Current question: {result.get('current_question', '')}" + + recorded_field = result.get("recorded_field", "") + + if result["is_complete"]: + eligibility = form.check_eligibility() + if eligibility["eligible"]: + return f"Recorded {recorded_field}. Form complete! User is eligible. Ask if they want to submit." + else: + return f"Recorded {recorded_field}. Form complete but eligibility issue. Ask if they want to submit anyway." + + section_msg = result.get("section_message", "") + next_q = result.get("next_question", "") + next_field = result.get("next_field", {}) + requires_confirm = next_field.get("confirm", False) + + confirm_note = " (confirm answer before recording)" if requires_confirm else "" + return f"Recorded {recorded_field}. {section_msg}Next question{confirm_note}: {next_q}" + + +@loopback_tool(is_background=True) +async def get_intake_form_status(ctx: ToolEnv) -> str: + """Check the current status of the intake form including progress and next question.""" + form = get_form() + status = form.get_status() + + if not status["is_started"]: + return "Intake form has not been started yet." + + if status["is_submitted"]: + return "Intake form has already been submitted." + + if status["is_complete"]: + return f"Intake form is complete with {status['progress']} questions answered. Ready to submit." + + return ( + f"Form in progress: {status['progress']} answered. " + f"Current section: {status['current_section']}. " + f"Current question: {status['current_question']}" + ) + + +@loopback_tool(is_background=True) +async def restart_intake_form(ctx: ToolEnv) -> str: + """Clear all answers and restart the intake form from the beginning. Only use if the user explicitly asks to start over.""" + form = get_form() + result = form.restart_form() + return f"Form restarted. {result['next_question']}" + + +@loopback_tool(is_background=True) +async def submit_intake_form(ctx: ToolEnv) -> str: + """Submit the completed intake form. Only use after all questions are answered.""" + form = get_form() + result = await form.submit_form() + + if not result["success"]: + return f"Could not submit: {result['error']}" + + if not result["eligible"]: + reasons = " ".join(result["reasons"]) + return f"Form submitted. Unfortunately, {reasons} {result['contact']}" + + return ( + f"Form submitted successfully! Confirmation number: {result['confirmation_number']}. " + f"{result['next_steps']}" + ) + + +@loopback_tool(is_background=True) +async def edit_intake_answer( + ctx: ToolEnv, + field_id: Annotated[str, "The ID of the field to edit (e.g., 'email', 'first_name', 'phone')"], + new_answer: Annotated[str, "The new answer to set for this field"], +) -> str: + """Edit a previous answer in the intake form without changing the current question. + Use when the user wants to correct a specific answer they gave earlier (e.g., 'actually my email is different'). + The form will continue from where it left off after editing.""" + form = get_form() + result = form.edit_answer(field_id, new_answer) + + if not result["success"]: + return f"Could not edit answer: {result['error']}" + + response = f"Updated {result['field_id']}: '{result['old_value']}' → '{result['new_value']}'. " + + if result["is_complete"]: + return response + "Form is complete and ready to submit." + else: + return response + f"Continuing with: {result['current_question']}" + + +@loopback_tool(is_background=True) +async def go_back_in_intake_form( + ctx: ToolEnv, + field_id: Annotated[str, "The ID of the field to go back to (e.g., 'email', 'first_name', 'date_of_birth')"], +) -> str: + """Go back to a previous question in the intake form to re-answer it and subsequent questions. + Use when the user wants to go back and redo from a certain point (e.g., 'wait, go back to the email question'). + This will clear answers from that question forward.""" + form = get_form() + result = form.go_back_to_question(field_id) + + if not result["success"]: + return f"Could not go back: {result['error']}" + + response = f"Going back. " + if result["cleared_fields"]: + response += f"Cleared {len(result['cleared_fields'])} answer(s). " + response += f"Progress: {result['progress']}. " + response += f"Question: {result['current_question']}" + + return response + + +@loopback_tool(is_background=True) +async def list_intake_answers(ctx: ToolEnv) -> str: + """List all answers the user has provided so far in the intake form. + Use when the user asks to review their answers or wants to know what they've entered.""" + form = get_form() + summary = form.get_answered_fields_summary() + + if summary["answered_count"] == 0: + return "No answers recorded yet. The form may not have been started." + + parts = [f"Answered {summary['answered_count']}/{summary['total_count']} questions:\n"] + for field in summary["fields"]: + parts.append(f"- {field['field_id']}: {field['answer']}\n") + + return "".join(parts) diff --git a/examples/dexa_scan_intake/main.py b/examples/dexa_scan_intake/main.py new file mode 100644 index 00000000..8b1cc18e --- /dev/null +++ b/examples/dexa_scan_intake/main.py @@ -0,0 +1,175 @@ +"""DEXA Scan Intake Agent with knowledge base and Exa web search.""" + +import os +import time +from typing import AsyncIterable, List + +from loguru import logger + +from line.events import AgentSendText +from line.llm_agent import LlmAgent, LlmConfig, end_call +from line.voice_agent_app import AgentEnv, CallRequest, VoiceAgentApp + +from tools import lookup_past_appointments, search_dexa_info, lookup_dexa_knowledge +from intake_form import ( + start_intake_form, + record_intake_answer, + get_intake_form_status, + restart_intake_form, + submit_intake_form, + edit_intake_answer, + go_back_in_intake_form, + list_intake_answers, + reset_form_instance, +) +from appointment_scheduler import ( + list_locations, + check_availability, + select_appointment_slot, + book_appointment, + send_availability_link, + reset_scheduler_instance, +) + + +class TTFCTracker: + """Tracks time to first token (TTFT) and time to first text chunk (TTFC) for LLM responses.""" + + def __init__(self, log_interval: int = 5): + self._ttft_times: List[float] = [] # Time to first token (any output) + self._ttfc_times: List[float] = [] # Time to first text chunk + self._turn_count: int = 0 + self._log_interval = log_interval + + def record_ttft(self, ttft_ms: float): + """Record time to first token (any output from LLM).""" + self._ttft_times.append(ttft_ms) + logger.info(f"TTFT turn {self._turn_count + 1}: {ttft_ms:.1f}ms") + + def record_ttfc(self, ttfc_ms: float): + """Record time to first text chunk.""" + self._ttfc_times.append(ttfc_ms) + self._turn_count += 1 + logger.info(f"TTFC turn {self._turn_count}: {ttfc_ms:.1f}ms") + + if self._turn_count % self._log_interval == 0: + ttft_avg = sum(self._ttft_times) / len(self._ttft_times) if self._ttft_times else 0 + ttfc_avg = sum(self._ttfc_times) / len(self._ttfc_times) if self._ttfc_times else 0 + logger.info(f"Averages over {self._turn_count} turns - TTFT: {ttft_avg:.1f}ms, TTFC: {ttfc_avg:.1f}ms") + + def reset(self): + """Reset tracking for a new call.""" + self._ttft_times = [] + self._ttfc_times = [] + self._turn_count = 0 + + +class TTFCWrappedAgent: + """Wraps an LlmAgent to track time to first token and first text chunk.""" + + def __init__(self, agent: LlmAgent, tracker: TTFCTracker): + self._agent = agent + self._tracker = tracker + + async def process(self, env, event) -> AsyncIterable: + """Process an event and track TTFT and TTFC.""" + start_time = time.perf_counter() + first_token_seen = False + first_text_seen = False + + async for output in self._agent.process(env, event): + if not first_token_seen: + ttft_ms = (time.perf_counter() - start_time) * 1000 + self._tracker.record_ttft(ttft_ms) + first_token_seen = True + + if not first_text_seen and isinstance(output, AgentSendText): + ttfc_ms = (time.perf_counter() - start_time) * 1000 + self._tracker.record_ttfc(ttfc_ms) + first_text_seen = True + + yield output + + +# Global TTFC tracker instance (reset per call) +_ttfc_tracker = TTFCTracker(log_interval=5) + + +SYSTEM_PROMPT = """You are a friendly voice assistant for a DEXA scanning facility. Keep responses SHORT - this is a phone call. + +# Tools Available +- lookup_dexa_knowledge: Answer DEXA questions (topics: what_is_dexa, how_it_works, safety, preparation, etc.) +- search_dexa_info: Web search for current info (say "let me look that up" first) +- lookup_past_appointments: Find patient history (need: first_name, last_name, date_of_birth as YYYY-MM-DD) +- Intake form: start_intake_form, record_intake_answer, get_intake_form_status, edit_intake_answer, submit_intake_form +- Scheduling: list_locations, check_availability, select_appointment_slot, book_appointment + +# Intake Form Rules +- Ask ONE question at a time +- For name/email/phone: FIRST say it back to confirm ("so that's john at gmail dot com?"), THEN call record_intake_answer with the value AFTER user confirms +- For all other fields (yes/no, numbers, etc): call record_intake_answer immediately with the answer +- NEVER call record_intake_answer with "yes" or "correct" - only call it with the actual answer value +- Field IDs for edits: first_name, last_name, email, phone, date_of_birth, ethnicity, gender, height_inches, weight_pounds + +# Locations +5 SF locations: Financial District, SoMa, Marina, Castro, Sunset + +# Voice Style +- 1-2 sentences max. Be brief. +- Use contractions: you'll, it's, we've +- Occasional fillers: "um", "so", "let's see" +- Never use bullet points or special characters +- Never start with "Great question!" - just answer directly +""" + +INTRODUCTION = ( + "Hey! Thanks for calling. So, I can help you with pretty much anything about DEXA scans, " + "What can I help you with?" +) + + +async def get_agent(env: AgentEnv, call_request: CallRequest): + logger.info(f"Starting new DEXA intake call: {call_request}") + + # Reset state for new call + reset_form_instance() + reset_scheduler_instance() + _ttfc_tracker.reset() + + agent = LlmAgent( + model="gemini/gemini-2.5-flash", + api_key=os.getenv("GEMINI_API_KEY"), + tools=[ + lookup_dexa_knowledge, + search_dexa_info, + lookup_past_appointments, + start_intake_form, + record_intake_answer, + get_intake_form_status, + restart_intake_form, + submit_intake_form, + edit_intake_answer, + go_back_in_intake_form, + list_intake_answers, + list_locations, + check_availability, + select_appointment_slot, + book_appointment, + send_availability_link, + end_call, + ], + config=LlmConfig( + system_prompt=SYSTEM_PROMPT, + introduction=INTRODUCTION, + temperature=1, + ), + ) + + # Wrap agent with TTFC tracking + return TTFCWrappedAgent(agent, _ttfc_tracker) + + +app = VoiceAgentApp(get_agent=get_agent) + +if __name__ == "__main__": + app.run() diff --git a/examples/dexa_scan_intake/pyproject.toml b/examples/dexa_scan_intake/pyproject.toml new file mode 100644 index 00000000..07c61ccb --- /dev/null +++ b/examples/dexa_scan_intake/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "dexa-scan-intake" +version = "0.1.0" +description = "Customer support agent that answers questions on DEXA scan, and completes an intake form for scheduling." +requires-python = ">=3.10" +dependencies = [ + "cartesia-line==0.2.2", + "exa-py>=1.0.0", + "loguru>=0.7.0", +] + +[tool.setuptools] +py-modules = ["main", "tools", "intake_form", "appointment_scheduler"] diff --git a/examples/dexa_scan_intake/tools.py b/examples/dexa_scan_intake/tools.py new file mode 100644 index 00000000..980934ca --- /dev/null +++ b/examples/dexa_scan_intake/tools.py @@ -0,0 +1,251 @@ +"""Tools for the DEXA Scan Intake Agent.""" + +import asyncio +import os +from typing import Annotated + +from exa_py import Exa +from loguru import logger + +from line.llm_agent import ToolEnv, loopback_tool + +# Mock database of users and their past appointments +MOCK_USER_DATABASE = { + ("lucy", "liu", "1990-05-15"): { + "user_id": "USR001", + "email": "lucy.liu@example.com", + "appointments": [ + { + "date": "2025-01-15", + "time": "10:30 AM", + "location": "San Francisco - Financial District", + "type": "Full Body DEXA Scan", + "summary": { + "body_fat_percentage": 22.3, + "lean_mass_lbs": 98.5, + "bone_density_tscore": 1.2, + "visceral_fat_lbs": 1.8, + }, + }, + { + "date": "2024-10-08", + "time": "2:00 PM", + "location": "San Francisco - Financial District", + "type": "Full Body DEXA Scan", + "summary": { + "body_fat_percentage": 24.1, + "lean_mass_lbs": 95.2, + "bone_density_tscore": 1.1, + "visceral_fat_lbs": 2.1, + }, + }, + ], + }, + ("john", "smith", "1985-08-22"): { + "user_id": "USR002", + "email": "john.smith@example.com", + "appointments": [ + { + "date": "2025-02-01", + "time": "9:00 AM", + "location": "Los Angeles - Santa Monica", + "type": "Full Body DEXA Scan", + "summary": { + "body_fat_percentage": 18.5, + "lean_mass_lbs": 145.3, + "bone_density_tscore": 0.8, + "visceral_fat_lbs": 1.2, + }, + }, + ], + }, + ("sarah", "johnson", "1992-03-10"): { + "user_id": "USR003", + "email": "sarah.j@example.com", + "appointments": [ + { + "date": "2024-12-20", + "time": "11:15 AM", + "location": "San Diego - Downtown", + "type": "Full Body DEXA Scan", + "summary": { + "body_fat_percentage": 26.8, + "lean_mass_lbs": 88.4, + "bone_density_tscore": 0.5, + "visceral_fat_lbs": 2.4, + }, + }, + { + "date": "2024-09-15", + "time": "3:30 PM", + "location": "San Diego - Downtown", + "type": "Full Body DEXA Scan", + "summary": { + "body_fat_percentage": 28.2, + "lean_mass_lbs": 86.1, + "bone_density_tscore": 0.4, + "visceral_fat_lbs": 2.7, + }, + }, + { + "date": "2024-06-01", + "time": "10:00 AM", + "location": "San Diego - Downtown", + "type": "Full Body DEXA Scan", + "summary": { + "body_fat_percentage": 29.5, + "lean_mass_lbs": 84.8, + "bone_density_tscore": 0.3, + "visceral_fat_lbs": 2.9, + }, + }, + ], + }, +} + + +async def _mock_api_call(first_name: str, last_name: str, date_of_birth: str) -> dict: + """Simulate an async API call to the patient records system.""" + # Simulate network latency + await asyncio.sleep(0.5) + + # Normalize inputs for lookup + key = (first_name.lower().strip(), last_name.lower().strip(), date_of_birth.strip()) + + if key in MOCK_USER_DATABASE: + return {"success": True, "data": MOCK_USER_DATABASE[key]} + else: + return {"success": False, "error": "No matching records found"} + + +# DEXA Knowledge Base - used by lookup_dexa_knowledge tool +DEXA_KNOWLEDGE = { + "what_is_dexa": "DEXA (Dual-Energy X-ray Absorptiometry) uses two X-ray beams to measure body composition and bone density with high precision, distinguishing bone, lean tissue, and fat.", + "how_it_works": "You lie on an open table while a scanning arm passes over you, emitting low-dose X-rays. Different tissues absorb different amounts, allowing precise measurement of bone, muscle, and fat.", + "what_it_measures": "Total body fat percentage, lean muscle mass by region, bone mineral density, visceral fat (around organs), and left/right symmetry.", + "accuracy": "DEXA is the gold standard with 1-2% margin of error for body fat. Much more accurate than scales, calipers, or underwater weighing.", + "safety": "Very safe - uses 1/10th the radiation of a chest X-ray (0.001 mSv), less than daily background radiation.", + "preparation": "Wear comfortable clothes without metal. Avoid calcium supplements 24hrs before. Stay hydrated. No fasting needed but avoid large meals. Remove jewelry.", + "what_to_expect": "Takes 7-10 minutes. Lie still on your back. Painless and non-invasive. The arm passes over but doesn't touch you.", + "frequency": "Every 3-6 months for tracking changes. More frequent scans may not show meaningful differences.", + "visceral_fat": "Fat around internal organs. High visceral fat increases risk of diabetes, heart disease, and metabolic syndrome. DEXA measures this directly.", + "results": "Shows body fat percentage (essential/athletic/fit/average/obese ranges), lean mass, bone density vs peers, and regional breakdown.", + "who_should_get": "Athletes, fitness trackers, bone health concerns, weight management, older adults, anyone wanting baseline health metrics.", +} + + +@loopback_tool +async def lookup_dexa_knowledge( + ctx: ToolEnv, + topic: Annotated[ + str, + "The topic to look up. Options: what_is_dexa, how_it_works, what_it_measures, accuracy, safety, preparation, what_to_expect, frequency, visceral_fat, results, who_should_get", + ], +) -> str: + """Look up information about DEXA scans from the knowledge base. Use this to answer questions about DEXA.""" + topic_key = topic.lower().replace(" ", "_").replace("-", "_") + + # Try exact match first + if topic_key in DEXA_KNOWLEDGE: + return DEXA_KNOWLEDGE[topic_key] + + # Try partial match + for key, value in DEXA_KNOWLEDGE.items(): + if topic_key in key or key in topic_key: + return value + + # Return all topics if no match + return f"Topic '{topic}' not found. Available: {', '.join(DEXA_KNOWLEDGE.keys())}" + + +@loopback_tool(is_background=True) +async def search_dexa_info( + ctx: ToolEnv, + query: Annotated[ + str, + "The search query about DEXA scans, body composition, or related health topics.", + ], +) -> str: + """Search the web for DEXA scan information, providers, or related health topics.""" + logger.info(f"Performing Exa search for DEXA info: '{query}'") + + api_key = os.environ.get("EXA_API_KEY") + if not api_key: + return "Web search is unavailable. I can still answer based on my knowledge base." + + try: + client = Exa(api_key=api_key) + results = await asyncio.to_thread( + client.search_and_contents, + f"DEXA scan {query}", + num_results=5, + type="auto", + text={"max_characters": 800}, + ) + + if not results or not results.results: + return "I couldn't find specific information about that. Let me help with what I know." + + content_parts = [f"Search results for: '{query}'\n"] + for i, result in enumerate(results.results[:5]): + content_parts.append(f"\n--- Source {i + 1}: {result.title} ---\n") + if result.text: + content_parts.append(f"{result.text}\n") + content_parts.append(f"URL: {result.url}\n") + + logger.info(f"Search completed: {len(results.results)} sources found") + return "".join(content_parts) + + except Exception as e: + logger.error(f"Exa search failed: {e}") + return "Search encountered an issue. I can still help with my existing knowledge." + + +@loopback_tool +async def lookup_past_appointments( + ctx: ToolEnv, + first_name: Annotated[str, "The patient's first name."], + last_name: Annotated[str, "The patient's last name."], + date_of_birth: Annotated[str, "The patient's date of birth in YYYY-MM-DD format."], +) -> str: + """Look up a patient's past DEXA scan appointments and results after verifying their identity.""" + logger.info(f"Looking up appointments for {first_name} {last_name}, DOB: {date_of_birth}") + + # Make async API call to patient records system + response = await _mock_api_call(first_name, last_name, date_of_birth) + + if not response["success"]: + return ( + "I wasn't able to find any records matching that information. " + "Please double-check the name spelling and date of birth format, which should be " + "year, month, day, like 1990-05-15. If you're a new patient, you may not have any " + "records in our system yet." + ) + + data = response["data"] + appointments = data["appointments"] + + if not appointments: + return f"I found your account, but you don't have any past appointments on record yet." + + # Format the appointments for voice output + result_parts = [f"I found {len(appointments)} appointment(s) on record for {first_name.title()}.\n\n"] + + for i, appt in enumerate(appointments, 1): + result_parts.append(f"Appointment {i}:\n") + result_parts.append(f"- Date: {appt['date']} at {appt['time']}\n") + result_parts.append(f"- Location: {appt['location']}\n") + result_parts.append(f"- Type: {appt['type']}\n") + + summary = appt["summary"] + result_parts.append(f"- Results summary: Body fat {summary['body_fat_percentage']}%, ") + result_parts.append(f"Lean mass {summary['lean_mass_lbs']} lbs, ") + result_parts.append(f"Bone density T-score {summary['bone_density_tscore']}, ") + result_parts.append(f"Visceral fat {summary['visceral_fat_lbs']} lbs\n\n") + + result_parts.append( + "For full detailed reports with charts and complete regional breakdowns, " + "the patient can visit their dashboard at bodyspec.com and log in with their account." + ) + + return "".join(result_parts)