|
| 1 | +# demo_synthetic.py |
| 2 | + |
| 3 | +import os |
| 4 | +import time |
| 5 | +from typing import List, Optional # Import Any for plan typing |
| 6 | + |
| 7 | +# Import necessary components from the project |
| 8 | +from omnimcp.synthetic_ui import ( |
| 9 | + generate_login_screen, |
| 10 | + simulate_action, |
| 11 | + draw_highlight, # Use the original draw_highlight from synthetic_ui |
| 12 | +) |
| 13 | +from omnimcp.core import plan_action_for_ui, LLMActionPlan # Import the Pydantic model |
| 14 | +from omnimcp.utils import logger # Assuming logger is configured elsewhere |
| 15 | +from omnimcp.types import UIElement # Import UIElement |
| 16 | + |
| 17 | +# --- Configuration --- |
| 18 | +OUTPUT_DIR = "demo_output_multistep" # Keep original output dir for synthetic demo |
| 19 | +SAVE_IMAGES = True |
| 20 | +MAX_STEPS = 6 # Keep original max steps for this demo |
| 21 | + |
| 22 | + |
| 23 | +def run_multi_step_demo(): |
| 24 | + """Runs the multi-step OmniMCP demo using synthetic UI and LLM planning.""" |
| 25 | + logger.info("--- Starting OmniMCP Multi-Step Synthetic Demo ---") |
| 26 | + os.makedirs(OUTPUT_DIR, exist_ok=True) |
| 27 | + |
| 28 | + # 1. Initial State & Goal |
| 29 | + logger.info("Generating initial login screen...") |
| 30 | + try: |
| 31 | + # Use save_path to ensure initial image is saved |
| 32 | + image, elements = generate_login_screen( |
| 33 | + save_path=os.path.join(OUTPUT_DIR, "step_0_state_initial.png") |
| 34 | + ) |
| 35 | + except Exception as e: |
| 36 | + logger.error(f"Failed to generate initial screen: {e}", exc_info=True) |
| 37 | + return |
| 38 | + |
| 39 | + user_goal = "Log in using username 'testuser' and password 'password123'" |
| 40 | + logger.info(f"User Goal: '{user_goal}'") |
| 41 | + |
| 42 | + action_history: List[str] = [] |
| 43 | + goal_achieved_flag = False # Use a flag to signal completion after the step runs |
| 44 | + last_step_completed = -1 # Track last successful step index |
| 45 | + |
| 46 | + # --- Main Loop --- |
| 47 | + for step in range(MAX_STEPS): |
| 48 | + logger.info(f"\n--- Step {step + 1}/{MAX_STEPS} ---") |
| 49 | + step_img_prefix = f"step_{step + 1}" # Use 1-based index for filenames |
| 50 | + |
| 51 | + # Save/Show current state *before* planning/highlighting |
| 52 | + current_state_img_path = os.path.join( |
| 53 | + OUTPUT_DIR, f"{step_img_prefix}_state.png" |
| 54 | + ) |
| 55 | + if SAVE_IMAGES: |
| 56 | + try: |
| 57 | + image.save(current_state_img_path) |
| 58 | + logger.info(f"Saved current state to {current_state_img_path}") |
| 59 | + except Exception as save_e: |
| 60 | + logger.warning(f"Could not save step state image: {save_e}") |
| 61 | + |
| 62 | + # 2. Plan Next Action |
| 63 | + logger.info("Planning action with LLM...") |
| 64 | + llm_plan: Optional[LLMActionPlan] = None |
| 65 | + target_element: Optional[UIElement] = None |
| 66 | + try: |
| 67 | + llm_plan, target_element = plan_action_for_ui( |
| 68 | + elements=elements, # Pass current elements |
| 69 | + user_goal=user_goal, |
| 70 | + action_history=action_history, |
| 71 | + step=step, # Pass step index |
| 72 | + ) |
| 73 | + |
| 74 | + logger.info(f"LLM Reasoning: {llm_plan.reasoning}") |
| 75 | + logger.info( |
| 76 | + f"LLM Proposed Action: {llm_plan.action} on Element ID: {llm_plan.element_id}" |
| 77 | + ) |
| 78 | + if llm_plan.text_to_type: |
| 79 | + logger.info(f"Text to Type: '{llm_plan.text_to_type}'") |
| 80 | + if llm_plan.key_info: |
| 81 | + logger.info(f"Key Info: '{llm_plan.key_info}'") |
| 82 | + logger.info(f"LLM Goal Complete Assessment: {llm_plan.is_goal_complete}") |
| 83 | + |
| 84 | + # 3. Check for Goal Completion Flag (but don't break loop yet) |
| 85 | + if llm_plan.is_goal_complete: |
| 86 | + logger.info( |
| 87 | + "LLM flag indicates goal should be complete after this action." |
| 88 | + ) |
| 89 | + goal_achieved_flag = ( |
| 90 | + True # Set flag to break after this step's simulation |
| 91 | + ) |
| 92 | + |
| 93 | + # --- Updated Validation Check --- |
| 94 | + # Validate target element ONLY IF the goal is NOT yet complete AND action requires it |
| 95 | + if not goal_achieved_flag: |
| 96 | + # Click requires a valid target element found in the current state |
| 97 | + if llm_plan.action == "click" and not target_element: |
| 98 | + logger.error( |
| 99 | + f"LLM planned 'click' on invalid element ID ({llm_plan.element_id}). Stopping." |
| 100 | + ) |
| 101 | + break # Stop if click is impossible |
| 102 | + |
| 103 | + # Type MIGHT require a target in synthetic demo, depending on simulate_action logic |
| 104 | + # If simulate_action assumes type always targets a field, uncomment below |
| 105 | + # if llm_plan.action == "type" and not target_element: |
| 106 | + # logger.error(f"LLM planned 'type' on invalid element ID ({llm_plan.element_id}). Stopping.") |
| 107 | + # break |
| 108 | + # --- End Updated Validation Check --- |
| 109 | + |
| 110 | + # 4. Visualize Planned Action (uses synthetic_ui.draw_highlight) |
| 111 | + highlight_img_path = os.path.join( |
| 112 | + OUTPUT_DIR, f"{step_img_prefix}_highlight.png" |
| 113 | + ) |
| 114 | + if target_element: # Only draw highlight if element exists |
| 115 | + try: |
| 116 | + # Pass the llm_plan to the draw_highlight function |
| 117 | + highlighted_image = draw_highlight( |
| 118 | + image, |
| 119 | + target_element, |
| 120 | + plan=llm_plan, # Pass the plan object here |
| 121 | + color="lime", |
| 122 | + width=4, |
| 123 | + ) |
| 124 | + if SAVE_IMAGES: |
| 125 | + highlighted_image.save(highlight_img_path) |
| 126 | + logger.info( |
| 127 | + f"Saved highlighted action with text to {highlight_img_path}" |
| 128 | + ) |
| 129 | + except Exception as draw_e: |
| 130 | + logger.warning(f"Could not save highlight image: {draw_e}") |
| 131 | + else: |
| 132 | + logger.info("No target element to highlight for this step.") |
| 133 | + |
| 134 | + # Record action for history *before* simulation changes state |
| 135 | + action_desc = f"Action: {llm_plan.action}" |
| 136 | + if llm_plan.text_to_type: |
| 137 | + action_desc += f" '{llm_plan.text_to_type}'" |
| 138 | + if llm_plan.key_info: |
| 139 | + action_desc += f" Key='{llm_plan.key_info}'" # Add key_info if present |
| 140 | + if target_element: |
| 141 | + action_desc += ( |
| 142 | + f" on Element ID {target_element.id} ('{target_element.content}')" |
| 143 | + ) |
| 144 | + action_history.append(action_desc) |
| 145 | + logger.debug(f"Added to history: {action_desc}") |
| 146 | + |
| 147 | + # 5. Simulate Action -> Get New State (ALWAYS run this for the planned step) |
| 148 | + logger.info("Simulating action...") |
| 149 | + # Extract username now in case login is successful in this step |
| 150 | + username = next( |
| 151 | + ( |
| 152 | + el.content |
| 153 | + for el in elements |
| 154 | + if el.id == 0 and el.type == "text_field" |
| 155 | + ), |
| 156 | + "User", |
| 157 | + ) |
| 158 | + |
| 159 | + # simulate_action needs to handle the LLMActionPlan type |
| 160 | + new_image, new_elements = simulate_action( |
| 161 | + image, elements, llm_plan, username_for_login=username |
| 162 | + ) |
| 163 | + |
| 164 | + # Basic check if state actually changed |
| 165 | + state_changed = ( |
| 166 | + (id(new_image) != id(image)) |
| 167 | + or (len(elements) != len(new_elements)) |
| 168 | + or any( |
| 169 | + e1.to_dict() != e2.to_dict() |
| 170 | + for e1, e2 in zip(elements, new_elements) |
| 171 | + ) |
| 172 | + ) |
| 173 | + |
| 174 | + image, elements = new_image, new_elements # Update state for next loop |
| 175 | + |
| 176 | + if state_changed: |
| 177 | + logger.info( |
| 178 | + f"State updated for next step. New element count: {len(elements)}" |
| 179 | + ) |
| 180 | + else: |
| 181 | + logger.warning( |
| 182 | + "Simulation did not result in a detectable state change." |
| 183 | + ) |
| 184 | + |
| 185 | + # Mark step as completed successfully before checking goal flag or pausing |
| 186 | + last_step_completed = step |
| 187 | + |
| 188 | + # 6. NOW check the flag to break *after* simulation |
| 189 | + if goal_achieved_flag: |
| 190 | + logger.success( |
| 191 | + "Goal completion flag was set, ending loop after simulation." |
| 192 | + ) |
| 193 | + break |
| 194 | + |
| 195 | + # Pause briefly between steps |
| 196 | + time.sleep(1) |
| 197 | + |
| 198 | + except Exception as e: |
| 199 | + logger.error(f"Error during step {step + 1}: {e}", exc_info=True) |
| 200 | + break # Stop on error |
| 201 | + |
| 202 | + # --- End of Loop --- |
| 203 | + logger.info("\n--- Multi-Step Synthetic Demo Finished ---") |
| 204 | + if goal_achieved_flag: |
| 205 | + logger.success("Overall goal marked as achieved by LLM during execution.") |
| 206 | + elif last_step_completed == MAX_STEPS - 1: |
| 207 | + # Reached end without goal flag, but no error broke the loop |
| 208 | + logger.warning( |
| 209 | + f"Reached maximum steps ({MAX_STEPS}) without goal completion flag being set." |
| 210 | + ) |
| 211 | + else: |
| 212 | + # Loop broke early due to error or other condition |
| 213 | + logger.error( |
| 214 | + f"Execution stopped prematurely after Step {last_step_completed + 1} (check logs)." |
| 215 | + ) |
| 216 | + |
| 217 | + # Save final state |
| 218 | + final_state_img_path = os.path.join(OUTPUT_DIR, "final_state.png") |
| 219 | + if SAVE_IMAGES: |
| 220 | + try: |
| 221 | + image.save(final_state_img_path) |
| 222 | + logger.info(f"Saved final state to {final_state_img_path}") |
| 223 | + except Exception as save_e: |
| 224 | + logger.warning(f"Could not save final state image: {save_e}") |
| 225 | + |
| 226 | + |
| 227 | +if __name__ == "__main__": |
| 228 | + # Add basic check for API key if running this directly |
| 229 | + # (Although synthetic demo doesn't *strictly* need it if core allows planning without it) |
| 230 | + # from omnimcp.config import config # Example if config is needed |
| 231 | + # if not config.ANTHROPIC_API_KEY: |
| 232 | + # print("Warning: ANTHROPIC_API_KEY not found. LLM planning might fail.") |
| 233 | + run_multi_step_demo() |
0 commit comments