|
9 | 9 | import pdb
|
10 | 10 | import traceback
|
11 | 11 | from typing import Optional, Type
|
| 12 | +from PIL import Image, ImageDraw, ImageFont |
| 13 | +import os |
| 14 | +import base64 |
| 15 | +import io |
12 | 16 |
|
13 | 17 | from browser_use.agent.prompts import SystemPrompt
|
14 | 18 | from browser_use.agent.service import Agent
|
@@ -227,6 +231,119 @@ async def step(self, step_info: Optional[CustomAgentStepInfo] = None) -> None:
|
227 | 231 | )
|
228 | 232 | if state:
|
229 | 233 | self._make_history_item(model_output, state, result)
|
| 234 | + def create_history_gif( |
| 235 | + self, |
| 236 | + output_path: str = 'agent_history.gif', |
| 237 | + duration: int = 3000, |
| 238 | + show_goals: bool = True, |
| 239 | + show_task: bool = True, |
| 240 | + show_logo: bool = False, |
| 241 | + font_size: int = 40, |
| 242 | + title_font_size: int = 56, |
| 243 | + goal_font_size: int = 44, |
| 244 | + margin: int = 40, |
| 245 | + line_spacing: float = 1.5, |
| 246 | + ) -> None: |
| 247 | + """Create a GIF from the agent's history with overlaid task and goal text.""" |
| 248 | + if not self.history.history: |
| 249 | + logger.warning('No history to create GIF from') |
| 250 | + return |
| 251 | + |
| 252 | + images = [] |
| 253 | + # if history is empty or first screenshot is None, we can't create a gif |
| 254 | + if not self.history.history or not self.history.history[0].state.screenshot: |
| 255 | + logger.warning('No history or first screenshot to create GIF from') |
| 256 | + return |
| 257 | + |
| 258 | + # Try to load nicer fonts |
| 259 | + try: |
| 260 | + # Try different font options in order of preference |
| 261 | + font_options = ['Helvetica', 'Arial', 'DejaVuSans', 'Verdana'] |
| 262 | + font_loaded = False |
| 263 | + |
| 264 | + for font_name in font_options: |
| 265 | + try: |
| 266 | + import platform |
| 267 | + if platform.system() == "Windows": |
| 268 | + # Need to specify the abs font path on Windows |
| 269 | + font_name = os.path.join(os.getenv("WIN_FONT_DIR", "C:\\Windows\\Fonts"), font_name + ".ttf") |
| 270 | + regular_font = ImageFont.truetype(font_name, font_size) |
| 271 | + title_font = ImageFont.truetype(font_name, title_font_size) |
| 272 | + goal_font = ImageFont.truetype(font_name, goal_font_size) |
| 273 | + font_loaded = True |
| 274 | + break |
| 275 | + except OSError: |
| 276 | + continue |
| 277 | + |
| 278 | + if not font_loaded: |
| 279 | + raise OSError('No preferred fonts found') |
| 280 | + |
| 281 | + except OSError: |
| 282 | + regular_font = ImageFont.load_default() |
| 283 | + title_font = ImageFont.load_default() |
| 284 | + |
| 285 | + goal_font = regular_font |
| 286 | + |
| 287 | + # Load logo if requested |
| 288 | + logo = None |
| 289 | + if show_logo: |
| 290 | + try: |
| 291 | + logo = Image.open('./static/browser-use.png') |
| 292 | + # Resize logo to be small (e.g., 40px height) |
| 293 | + logo_height = 150 |
| 294 | + aspect_ratio = logo.width / logo.height |
| 295 | + logo_width = int(logo_height * aspect_ratio) |
| 296 | + logo = logo.resize((logo_width, logo_height), Image.Resampling.LANCZOS) |
| 297 | + except Exception as e: |
| 298 | + logger.warning(f'Could not load logo: {e}') |
| 299 | + |
| 300 | + # Create task frame if requested |
| 301 | + if show_task and self.task: |
| 302 | + task_frame = self._create_task_frame( |
| 303 | + self.task, |
| 304 | + self.history.history[0].state.screenshot, |
| 305 | + title_font, |
| 306 | + regular_font, |
| 307 | + logo, |
| 308 | + line_spacing, |
| 309 | + ) |
| 310 | + images.append(task_frame) |
| 311 | + |
| 312 | + # Process each history item |
| 313 | + for i, item in enumerate(self.history.history, 1): |
| 314 | + if not item.state.screenshot: |
| 315 | + continue |
| 316 | + |
| 317 | + # Convert base64 screenshot to PIL Image |
| 318 | + img_data = base64.b64decode(item.state.screenshot) |
| 319 | + image = Image.open(io.BytesIO(img_data)) |
| 320 | + |
| 321 | + if show_goals and item.model_output: |
| 322 | + image = self._add_overlay_to_image( |
| 323 | + image=image, |
| 324 | + step_number=i, |
| 325 | + goal_text=item.model_output.current_state.thought, |
| 326 | + regular_font=regular_font, |
| 327 | + title_font=title_font, |
| 328 | + margin=margin, |
| 329 | + logo=logo, |
| 330 | + ) |
| 331 | + |
| 332 | + images.append(image) |
| 333 | + |
| 334 | + if images: |
| 335 | + # Save the GIF |
| 336 | + images[0].save( |
| 337 | + output_path, |
| 338 | + save_all=True, |
| 339 | + append_images=images[1:], |
| 340 | + duration=duration, |
| 341 | + loop=0, |
| 342 | + optimize=False, |
| 343 | + ) |
| 344 | + logger.info(f'Created GIF at {output_path}') |
| 345 | + else: |
| 346 | + logger.warning('No images found in history to create GIF') |
230 | 347 |
|
231 | 348 | async def run(self, max_steps: int = 100) -> AgentHistoryList:
|
232 | 349 | """Execute the task with maximum number of steps"""
|
@@ -283,3 +400,6 @@ async def run(self, max_steps: int = 100) -> AgentHistoryList:
|
283 | 400 |
|
284 | 401 | if not self.injected_browser and self.browser:
|
285 | 402 | await self.browser.close()
|
| 403 | + |
| 404 | + if self.generate_gif: |
| 405 | + self.create_history_gif() |
0 commit comments