|
| 1 | +"""Script generation service for browser-use.""" |
| 2 | +import logging |
| 3 | +from typing import Dict, List |
| 4 | + |
| 5 | +logger = logging.getLogger(__name__) |
| 6 | + |
| 7 | + |
| 8 | +class ScriptGenerator: |
| 9 | + """ |
| 10 | + Generates scripts from browser-use action logs. |
| 11 | + |
| 12 | + This class converts logged browser actions into either BrowserQL or BaaS V2 scripts. |
| 13 | + Only successful actions are included in the generated scripts. |
| 14 | + """ |
| 15 | + |
| 16 | + def __init__(self, action_log: List[Dict] = None): |
| 17 | + """ |
| 18 | + Initialize the script generator. |
| 19 | + |
| 20 | + Args: |
| 21 | + action_log: List of action log entries to convert |
| 22 | + """ |
| 23 | + self.action_log = action_log or [] |
| 24 | + |
| 25 | + def set_action_log(self, action_log: List[Dict]) -> None: |
| 26 | + """ |
| 27 | + Set the action log to convert. |
| 28 | + |
| 29 | + Args: |
| 30 | + action_log: List of action log entries |
| 31 | + """ |
| 32 | + self.action_log = action_log |
| 33 | + |
| 34 | + def to_browserql(self) -> str: |
| 35 | + """ |
| 36 | + Convert the action log to a BrowserQL script. |
| 37 | + |
| 38 | + Returns: |
| 39 | + str: BrowserQL script as a string |
| 40 | + """ |
| 41 | + if not self.action_log: |
| 42 | + return "# No actions to convert" |
| 43 | + |
| 44 | + script = "mutation AutomateTask {\n" |
| 45 | + |
| 46 | + for action in self.action_log: |
| 47 | + action_type = action.get("action_type") |
| 48 | + params = action.get("params", {}) |
| 49 | + selector_info = action.get("selector") |
| 50 | + |
| 51 | + if action_type == "go_to_url": |
| 52 | + url = params.get("url", "") |
| 53 | + script += f' goto(url: "{url}") {{\n status\n }}\n\n' |
| 54 | + |
| 55 | + elif action_type == "click_element_by_index" and selector_info: |
| 56 | + selector = selector_info.get("xpath", "") |
| 57 | + script += f' click(selector: "{selector}") {{\n selector\n time\n }}\n\n' |
| 58 | + |
| 59 | + elif action_type == "input_text" and selector_info: |
| 60 | + selector = selector_info.get("xpath", "") |
| 61 | + text = params.get("text", "") |
| 62 | + script += f' type(selector: "{selector}", text: "{text}") {{\n selector\n text\n }}\n\n' |
| 63 | + |
| 64 | + elif action_type == "scroll_down": |
| 65 | + script += f' scroll(direction: "down", amount: {params.get("amount", 100)}) {{\n status\n }}\n\n' |
| 66 | + |
| 67 | + elif action_type == "scroll_up": |
| 68 | + script += f' scroll(direction: "up", amount: {params.get("amount", 100)}) {{\n status\n }}\n\n' |
| 69 | + |
| 70 | + elif action_type == "extract_content" and selector_info: |
| 71 | + selector = selector_info.get("xpath", "") |
| 72 | + script += f' querySelector(selector: "{selector}") {{\n innerHTML\n }}\n\n' |
| 73 | + |
| 74 | + elif action_type == "done": |
| 75 | + pass |
| 76 | + |
| 77 | + else: |
| 78 | + script += f' # Unsupported action: {action_type}\n' |
| 79 | + |
| 80 | + script += "}" |
| 81 | + return script |
| 82 | + |
| 83 | + def to_baas_v2(self, language: str = "javascript") -> str: |
| 84 | + """ |
| 85 | + Convert the action log to a BaaS V2 script. |
| 86 | + |
| 87 | + Args: |
| 88 | + language: Programming language for the script (javascript, python) |
| 89 | + |
| 90 | + Returns: |
| 91 | + str: BaaS V2 script as a string |
| 92 | + """ |
| 93 | + if not self.action_log: |
| 94 | + return "# No actions to convert" |
| 95 | + |
| 96 | + if language.lower() == "javascript": |
| 97 | + return self._to_puppeteer_js() |
| 98 | + elif language.lower() == "python": |
| 99 | + return self._to_playwright_python() |
| 100 | + else: |
| 101 | + return f"# Unsupported language: {language}" |
| 102 | + |
| 103 | + def _to_puppeteer_js(self) -> str: |
| 104 | + """ |
| 105 | + Convert the action log to a Puppeteer JavaScript script. |
| 106 | + |
| 107 | + Returns: |
| 108 | + str: Puppeteer script as a string |
| 109 | + """ |
| 110 | + script = [ |
| 111 | + "// Puppeteer script generated from browser-use actions", |
| 112 | + "const puppeteer = require('puppeteer');", |
| 113 | + "", |
| 114 | + "async function run() {", |
| 115 | + " const browser = await puppeteer.connect({", |
| 116 | + " browserWSEndpoint: `wss://production-sfo.browserless.io?token=${TOKEN}`", |
| 117 | + " });", |
| 118 | + " const page = await browser.newPage();", |
| 119 | + "" |
| 120 | + ] |
| 121 | + |
| 122 | + for action in self.action_log: |
| 123 | + action_type = action.get("action_type") |
| 124 | + params = action.get("params", {}) |
| 125 | + selector_info = action.get("selector") |
| 126 | + |
| 127 | + if action_type == "go_to_url": |
| 128 | + url = params.get("url", "") |
| 129 | + script.append(f' await page.goto("{url}");') |
| 130 | + |
| 131 | + elif action_type == "click_element_by_index" and selector_info: |
| 132 | + selector = selector_info.get("xpath", "") |
| 133 | + script.append(f' await page.click("{selector}");') |
| 134 | + |
| 135 | + elif action_type == "input_text" and selector_info: |
| 136 | + selector = selector_info.get("xpath", "") |
| 137 | + text = params.get("text", "") |
| 138 | + script.append(f' await page.type("{selector}", "{text}");') |
| 139 | + |
| 140 | + elif action_type == "scroll_down": |
| 141 | + amount = params.get("amount", 100) |
| 142 | + script.append(f' await page.evaluate(() => window.scrollBy(0, {amount}));') |
| 143 | + |
| 144 | + elif action_type == "scroll_up": |
| 145 | + amount = params.get("amount", 100) |
| 146 | + script.append(f' await page.evaluate(() => window.scrollBy(0, -{amount}));') |
| 147 | + |
| 148 | + elif action_type == "extract_content" and selector_info: |
| 149 | + selector = selector_info.get("xpath", "") |
| 150 | + script.append(f' const content = await page.$eval("{selector}", el => el.textContent);') |
| 151 | + script.append(' console.log(content);') |
| 152 | + |
| 153 | + elif action_type == "done": |
| 154 | + pass |
| 155 | + |
| 156 | + else: |
| 157 | + script.append(f' // Unsupported action: {action_type}') |
| 158 | + |
| 159 | + script.extend([ |
| 160 | + "", |
| 161 | + " await browser.close();", |
| 162 | + "}", |
| 163 | + "", |
| 164 | + "run().catch(console.error);" |
| 165 | + ]) |
| 166 | + |
| 167 | + return "\n".join(script) |
| 168 | + |
| 169 | + def _to_playwright_python(self) -> str: |
| 170 | + """ |
| 171 | + Convert the action log to a Playwright Python script. |
| 172 | + |
| 173 | + Returns: |
| 174 | + str: Playwright Python script as a string |
| 175 | + """ |
| 176 | + script = [ |
| 177 | + "# Playwright Python script generated from browser-use actions", |
| 178 | + "import asyncio", |
| 179 | + "from playwright.async_api import async_playwright", |
| 180 | + "", |
| 181 | + "async def run():", |
| 182 | + " async with async_playwright() as p:", |
| 183 | + " browser = await p.chromium.connect_over_cdp(", |
| 184 | + " endpoint_url='wss://production-sfo.browserless.io?token=YOUR_API_TOKEN_HERE'", |
| 185 | + " )", |
| 186 | + " page = await browser.new_page()", |
| 187 | + "" |
| 188 | + ] |
| 189 | + |
| 190 | + for action in self.action_log: |
| 191 | + action_type = action.get("action_type") |
| 192 | + params = action.get("params", {}) |
| 193 | + selector_info = action.get("selector") |
| 194 | + |
| 195 | + if action_type == "go_to_url": |
| 196 | + url = params.get("url", "") |
| 197 | + script.append(f' await page.goto("{url}")') |
| 198 | + |
| 199 | + elif action_type == "click_element_by_index" and selector_info: |
| 200 | + selector = selector_info.get("xpath", "") |
| 201 | + script.append(f' await page.click("{selector}")') |
| 202 | + |
| 203 | + elif action_type == "input_text" and selector_info: |
| 204 | + selector = selector_info.get("xpath", "") |
| 205 | + text = params.get("text", "") |
| 206 | + script.append(f' await page.fill("{selector}", "{text}")') |
| 207 | + |
| 208 | + elif action_type == "scroll_down": |
| 209 | + amount = params.get("amount", 100) |
| 210 | + script.append(f' await page.evaluate(f"window.scrollBy(0, {amount})")') |
| 211 | + |
| 212 | + elif action_type == "scroll_up": |
| 213 | + amount = params.get("amount", 100) |
| 214 | + script.append(f' await page.evaluate(f"window.scrollBy(0, -{amount})")') |
| 215 | + |
| 216 | + elif action_type == "extract_content" and selector_info: |
| 217 | + selector = selector_info.get("xpath", "") |
| 218 | + script.append(f' content = await page.text_content("{selector}")') |
| 219 | + script.append(' print(content)') |
| 220 | + |
| 221 | + elif action_type == "done": |
| 222 | + pass |
| 223 | + |
| 224 | + else: |
| 225 | + script.append(f' # Unsupported action: {action_type}') |
| 226 | + |
| 227 | + script.extend([ |
| 228 | + "", |
| 229 | + " await browser.close()", |
| 230 | + "", |
| 231 | + "asyncio.run(run())" |
| 232 | + ]) |
| 233 | + |
| 234 | + return "\n".join(script) |
| 235 | + |
| 236 | + def get_script(self, format_type: str, **kwargs) -> str: |
| 237 | + """ |
| 238 | + Get a script in the specified format. |
| 239 | + |
| 240 | + Args: |
| 241 | + format_type: Script format type ('browserql' or 'baas_v2') |
| 242 | + **kwargs: Additional arguments for the converter |
| 243 | + |
| 244 | + Returns: |
| 245 | + str: Generated script |
| 246 | + """ |
| 247 | + if format_type.lower() == "browserql": |
| 248 | + return self.to_browserql() |
| 249 | + elif format_type.lower() == "baas_v2": |
| 250 | + language = kwargs.get("language", "javascript") |
| 251 | + return self.to_baas_v2(language=language) |
| 252 | + else: |
| 253 | + return f"# Unsupported format: {format_type}" |
0 commit comments