|
| 1 | +from logging import warning |
| 2 | +from typing import Optional, Tuple |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +from PIL import Image, ImageDraw |
| 6 | +from playwright.sync_api import Page |
| 7 | + |
| 8 | +""" |
| 9 | +This module contains utility functions for handling observations and actions in the context of agent interactions. |
| 10 | +""" |
| 11 | + |
| 12 | + |
| 13 | +def tag_screenshot_with_action(screenshot: Image, action: str) -> Image: |
| 14 | + """ |
| 15 | + If action is a coordinate action, try to render it on the screenshot. |
| 16 | +
|
| 17 | + e.g. mouse_click(120, 130) -> draw a dot at (120, 130) on the screenshot |
| 18 | +
|
| 19 | + Args: |
| 20 | + screenshot: The screenshot to tag. |
| 21 | + action: The action to tag the screenshot with. |
| 22 | +
|
| 23 | + Returns: |
| 24 | + The tagged screenshot. |
| 25 | +
|
| 26 | + Raises: |
| 27 | + ValueError: If the action parsing fails. |
| 28 | + """ |
| 29 | + if action.startswith("mouse_click"): |
| 30 | + try: |
| 31 | + coords = action[action.index("(") + 1 : action.index(")")].split(",") |
| 32 | + coords = [c.strip() for c in coords] |
| 33 | + if len(coords) not in [2, 3]: |
| 34 | + raise ValueError(f"Invalid coordinate format: {coords}") |
| 35 | + if coords[0].startswith("x="): |
| 36 | + coords[0] = coords[0][2:] |
| 37 | + if coords[1].startswith("y="): |
| 38 | + coords[1] = coords[1][2:] |
| 39 | + x, y = float(coords[0].strip()), float(coords[1].strip()) |
| 40 | + draw = ImageDraw.Draw(screenshot) |
| 41 | + radius = 5 |
| 42 | + draw.ellipse( |
| 43 | + (x - radius, y - radius, x + radius, y + radius), fill="blue", outline="blue" |
| 44 | + ) |
| 45 | + except (ValueError, IndexError) as e: |
| 46 | + warning(f"Failed to parse action '{action}': {e}") |
| 47 | + |
| 48 | + elif action.startswith("mouse_drag_and_drop"): |
| 49 | + try: |
| 50 | + func_name, parsed_args = parse_func_call_string(action) |
| 51 | + if func_name == "mouse_drag_and_drop" and parsed_args is not None: |
| 52 | + args, kwargs = parsed_args |
| 53 | + x1, y1, x2, y2 = None, None, None, None |
| 54 | + |
| 55 | + if args and len(args) >= 4: |
| 56 | + # Positional arguments: mouse_drag_and_drop(x1, y1, x2, y2) |
| 57 | + x1, y1, x2, y2 = map(float, args[:4]) |
| 58 | + elif kwargs: |
| 59 | + # Keyword arguments: mouse_drag_and_drop(from_x=x1, from_y=y1, to_x=x2, to_y=y2) |
| 60 | + x1 = float(kwargs.get("from_x", 0)) |
| 61 | + y1 = float(kwargs.get("from_y", 0)) |
| 62 | + x2 = float(kwargs.get("to_x", 0)) |
| 63 | + y2 = float(kwargs.get("to_y", 0)) |
| 64 | + |
| 65 | + if all(coord is not None for coord in [x1, y1, x2, y2]): |
| 66 | + draw = ImageDraw.Draw(screenshot) |
| 67 | + # Draw the main line |
| 68 | + draw.line((x1, y1, x2, y2), fill="red", width=2) |
| 69 | + # Draw arrowhead at the end point using the helper function |
| 70 | + draw_arrowhead(draw, (x1, y1), (x2, y2)) |
| 71 | + except (ValueError, IndexError) as e: |
| 72 | + warning(f"Failed to parse action '{action}': {e}") |
| 73 | + return screenshot |
| 74 | + |
| 75 | + |
| 76 | +def add_mouse_pointer_from_action(screenshot: Image, action: str) -> Image.Image: |
| 77 | + |
| 78 | + if action.startswith("mouse_click"): |
| 79 | + try: |
| 80 | + coords = action[action.index("(") + 1 : action.index(")")].split(",") |
| 81 | + coords = [c.strip() for c in coords] |
| 82 | + if len(coords) not in [2, 3]: |
| 83 | + raise ValueError(f"Invalid coordinate format: {coords}") |
| 84 | + if coords[0].startswith("x="): |
| 85 | + coords[0] = coords[0][2:] |
| 86 | + if coords[1].startswith("y="): |
| 87 | + coords[1] = coords[1][2:] |
| 88 | + x, y = int(coords[0].strip()), int(coords[1].strip()) |
| 89 | + screenshot = draw_mouse_pointer(screenshot, x, y) |
| 90 | + except (ValueError, IndexError) as e: |
| 91 | + warning(f"Failed to parse action '{action}': {e}") |
| 92 | + return screenshot |
| 93 | + |
| 94 | + |
| 95 | +def draw_mouse_pointer(image: Image.Image, x: int, y: int) -> Image.Image: |
| 96 | + """ |
| 97 | + Draws a semi-transparent mouse pointer at (x, y) on the image. |
| 98 | + Returns a new image with the pointer drawn. |
| 99 | +
|
| 100 | + Args: |
| 101 | + image: The image to draw the mouse pointer on. |
| 102 | + x: The x coordinate for the mouse pointer. |
| 103 | + y: The y coordinate for the mouse pointer. |
| 104 | +
|
| 105 | + Returns: |
| 106 | + A new image with the mouse pointer drawn. |
| 107 | + """ |
| 108 | + pointer_size = 20 # Length of the pointer |
| 109 | + overlay = image.convert("RGBA").copy() |
| 110 | + draw = ImageDraw.Draw(overlay) |
| 111 | + |
| 112 | + # Define pointer shape (a simple arrow) |
| 113 | + pointer_shape = [ |
| 114 | + (x, y), |
| 115 | + (x + pointer_size, y + pointer_size // 2), |
| 116 | + (x + pointer_size // 2, y + pointer_size // 2), |
| 117 | + (x + pointer_size // 2, y + pointer_size), |
| 118 | + ] |
| 119 | + |
| 120 | + draw.polygon(pointer_shape, fill=(0, 0, 0, 128)) # 50% transparent black |
| 121 | + |
| 122 | + return Image.alpha_composite(image.convert("RGBA"), overlay) |
| 123 | + |
| 124 | + |
| 125 | +def draw_arrowhead(draw, start, end, arrow_length=15, arrow_angle=30): |
| 126 | + from math import atan2, cos, radians, sin |
| 127 | + |
| 128 | + angle = atan2(end[1] - start[1], end[0] - start[0]) |
| 129 | + left = ( |
| 130 | + end[0] - arrow_length * cos(angle - radians(arrow_angle)), |
| 131 | + end[1] - arrow_length * sin(angle - radians(arrow_angle)), |
| 132 | + ) |
| 133 | + right = ( |
| 134 | + end[0] - arrow_length * cos(angle + radians(arrow_angle)), |
| 135 | + end[1] - arrow_length * sin(angle + radians(arrow_angle)), |
| 136 | + ) |
| 137 | + draw.line([end, left], fill="red", width=4) |
| 138 | + draw.line([end, right], fill="red", width=4) |
| 139 | + |
| 140 | + |
| 141 | +def draw_click_indicator(image: Image.Image, x: int, y: int) -> Image.Image: |
| 142 | + """ |
| 143 | + Draws a click indicator (+ shape with disconnected lines) at (x, y) on the image. |
| 144 | + Returns a new image with the click indicator drawn. |
| 145 | +
|
| 146 | + Args: |
| 147 | + image: The image to draw the click indicator on. |
| 148 | + x: The x coordinate for the click indicator. |
| 149 | + y: The y coordinate for the click indicator. |
| 150 | +
|
| 151 | + Returns: |
| 152 | + A new image with the click indicator drawn. |
| 153 | + """ |
| 154 | + line_length = 10 # Length of each line segment |
| 155 | + gap = 4 # Gap from center point |
| 156 | + line_width = 2 # Thickness of lines |
| 157 | + |
| 158 | + overlay = image.convert("RGBA").copy() |
| 159 | + draw = ImageDraw.Draw(overlay) |
| 160 | + |
| 161 | + # Draw 4 lines forming a + shape with gaps in the center |
| 162 | + # Each line has a white outline and black center for visibility on any background |
| 163 | + |
| 164 | + # Top line |
| 165 | + draw.line( |
| 166 | + [(x, y - gap - line_length), (x, y - gap)], fill=(255, 255, 255, 200), width=line_width + 2 |
| 167 | + ) # White outline |
| 168 | + draw.line( |
| 169 | + [(x, y - gap - line_length), (x, y - gap)], fill=(0, 0, 0, 255), width=line_width |
| 170 | + ) # Black center |
| 171 | + |
| 172 | + # Bottom line |
| 173 | + draw.line( |
| 174 | + [(x, y + gap), (x, y + gap + line_length)], fill=(255, 255, 255, 200), width=line_width + 2 |
| 175 | + ) # White outline |
| 176 | + draw.line( |
| 177 | + [(x, y + gap), (x, y + gap + line_length)], fill=(0, 0, 0, 255), width=line_width |
| 178 | + ) # Black center |
| 179 | + |
| 180 | + # Left line |
| 181 | + draw.line( |
| 182 | + [(x - gap - line_length, y), (x - gap, y)], fill=(255, 255, 255, 200), width=line_width + 2 |
| 183 | + ) # White outline |
| 184 | + draw.line( |
| 185 | + [(x - gap - line_length, y), (x - gap, y)], fill=(0, 0, 0, 255), width=line_width |
| 186 | + ) # Black center |
| 187 | + |
| 188 | + # Right line |
| 189 | + draw.line( |
| 190 | + [(x + gap, y), (x + gap + line_length, y)], fill=(255, 255, 255, 200), width=line_width + 2 |
| 191 | + ) # White outline |
| 192 | + draw.line( |
| 193 | + [(x + gap, y), (x + gap + line_length, y)], fill=(0, 0, 0, 255), width=line_width |
| 194 | + ) # Black center |
| 195 | + |
| 196 | + return Image.alpha_composite(image.convert("RGBA"), overlay) |
| 197 | + |
| 198 | + |
| 199 | +def zoom_webpage(page: Page, zoom_factor: float = 1.5): |
| 200 | + """ |
| 201 | + Zooms the webpage to the specified zoom factor. |
| 202 | +
|
| 203 | + NOTE: Click actions with bid doesn't work properly when zoomed in. |
| 204 | +
|
| 205 | + Args: |
| 206 | + page: The Playwright Page object. |
| 207 | + zoom_factor: The zoom factor to apply (default is 1.5). |
| 208 | +
|
| 209 | + Returns: |
| 210 | + Page: The modified Playwright Page object. |
| 211 | +
|
| 212 | + Raises: |
| 213 | + ValueError: If zoom_factor is less than or equal to 0. |
| 214 | + """ |
| 215 | + |
| 216 | + if zoom_factor <= 0: |
| 217 | + raise ValueError("Zoom factor must be greater than 0.") |
| 218 | + |
| 219 | + page.evaluate(f"document.documentElement.style.zoom='{zoom_factor*100}%'") |
| 220 | + return page |
| 221 | + |
| 222 | + |
| 223 | +def parse_func_call_string(call_str: str) -> Tuple[Optional[str], Optional[Tuple[list, dict]]]: |
| 224 | + """ |
| 225 | + Parse a function call string and extract the function name and arguments. |
| 226 | +
|
| 227 | + Args: |
| 228 | + call_str (str): A string like "mouse_click(100, 200)" or "mouse_drag_and_drop(x=10, y=20)" |
| 229 | +
|
| 230 | + Returns: |
| 231 | + Tuple (func_name, (args, kwargs)), or (None, None) if parsing fails |
| 232 | + """ |
| 233 | + import ast |
| 234 | + |
| 235 | + try: |
| 236 | + tree = ast.parse(call_str.strip(), mode="eval") |
| 237 | + if not isinstance(tree.body, ast.Call): |
| 238 | + return None, None |
| 239 | + |
| 240 | + call_node = tree.body |
| 241 | + |
| 242 | + # Function name |
| 243 | + if isinstance(call_node.func, ast.Name): |
| 244 | + func_name = call_node.func.id |
| 245 | + else: |
| 246 | + return None, None |
| 247 | + |
| 248 | + # Positional arguments |
| 249 | + args = [] |
| 250 | + for arg in call_node.args: |
| 251 | + try: |
| 252 | + args.append(ast.literal_eval(arg)) |
| 253 | + except (ValueError, TypeError): |
| 254 | + return None, None |
| 255 | + |
| 256 | + # Keyword arguments |
| 257 | + kwargs = {} |
| 258 | + for kw in call_node.keywords: |
| 259 | + try: |
| 260 | + kwargs[kw.arg] = ast.literal_eval(kw.value) |
| 261 | + except (ValueError, TypeError): |
| 262 | + return None, None |
| 263 | + |
| 264 | + return func_name, (args, kwargs) |
| 265 | + |
| 266 | + except (SyntaxError, ValueError, TypeError): |
| 267 | + return None, None |
0 commit comments