|
1 | 1 | import asyncio |
| 2 | +import json |
| 3 | +import os |
| 4 | +import shutil |
2 | 5 | from abc import ABC, abstractmethod |
| 6 | +from contextlib import suppress |
3 | 7 | from functools import partial |
4 | 8 | from random import randint |
| 9 | +from tempfile import TemporaryDirectory |
5 | 10 | from typing import Any, Callable, Optional |
6 | 11 |
|
7 | 12 | from pydoll.browser.interfaces import BrowserOptionsManager |
@@ -77,13 +82,20 @@ def __init__( |
77 | 82 | self._browser_process_manager = BrowserProcessManager() |
78 | 83 | self._temp_directory_manager = TempDirectoryManager() |
79 | 84 | self._connection_handler = ConnectionHandler(self._connection_port) |
| 85 | + self._backup_preferences_dir = '' |
80 | 86 |
|
81 | 87 | async def __aenter__(self) -> 'Browser': |
82 | 88 | """Async context manager entry.""" |
83 | 89 | return self |
84 | 90 |
|
85 | 91 | async def __aexit__(self, exc_type, exc_val, exc_tb): |
86 | 92 | """Async context manager exit with cleanup.""" |
| 93 | + if self._backup_preferences_dir: |
| 94 | + user_data_dir = self._get_user_data_dir() |
| 95 | + shutil.copy2( |
| 96 | + self._backup_preferences_dir, |
| 97 | + os.path.join(user_data_dir, 'Default', 'Preferences'), |
| 98 | + ) |
87 | 99 | if await self._is_browser_running(timeout=2): |
88 | 100 | await self.stop() |
89 | 101 |
|
@@ -113,9 +125,7 @@ async def start(self, headless: bool = False) -> Tab: |
113 | 125 | proxy_config = self._proxy_manager.get_proxy_credentials() |
114 | 126 |
|
115 | 127 | self._browser_process_manager.start_browser_process( |
116 | | - binary_location, |
117 | | - self._connection_port, |
118 | | - self.options.arguments, |
| 128 | + binary_location, self._connection_port, self.options.arguments |
119 | 129 | ) |
120 | 130 | await self._verify_browser_running() |
121 | 131 | await self._configure_proxy(proxy_config[0], proxy_config[1]) |
@@ -199,12 +209,13 @@ async def new_tab(self, url: str = '', browser_context_id: Optional[str] = None) |
199 | 209 | """ |
200 | 210 | response: CreateTargetResponse = await self._execute_command( |
201 | 211 | TargetCommands.create_target( |
202 | | - url=url, |
203 | 212 | browser_context_id=browser_context_id, |
204 | 213 | ) |
205 | 214 | ) |
206 | 215 | target_id = response['result']['targetId'] |
207 | | - return Tab(self, self._connection_port, target_id, browser_context_id) |
| 216 | + tab = Tab(self, self._connection_port, target_id, browser_context_id) |
| 217 | + if url: await tab.go_to(url) |
| 218 | + return tab |
208 | 219 |
|
209 | 220 | async def get_targets(self) -> list[TargetInfo]: |
210 | 221 | """ |
@@ -584,10 +595,60 @@ async def _execute_command( |
584 | 595 |
|
585 | 596 | def _setup_user_dir(self): |
586 | 597 | """Setup temporary user data directory if not specified in options.""" |
587 | | - if '--user-data-dir' not in [arg.split('=')[0] for arg in self.options.arguments]: |
588 | | - # For all browsers, use a temporary directory |
| 598 | + user_data_dir = self._get_user_data_dir() |
| 599 | + if user_data_dir and self.options.browser_preferences: |
| 600 | + self._set_browser_preferences_in_user_data_dir(user_data_dir) |
| 601 | + elif not user_data_dir: |
589 | 602 | temp_dir = self._temp_directory_manager.create_temp_dir() |
| 603 | + # For all browsers, use a temporary directory |
590 | 604 | self.options.arguments.append(f'--user-data-dir={temp_dir.name}') |
| 605 | + if self.options.browser_preferences: |
| 606 | + self._set_browser_preferences_in_temp_dir(temp_dir) |
| 607 | + |
| 608 | + def _set_browser_preferences_in_temp_dir(self, temp_dir: TemporaryDirectory): |
| 609 | + os.mkdir(os.path.join(temp_dir.name, 'Default')) |
| 610 | + preferences = self.options.browser_preferences |
| 611 | + with open( |
| 612 | + os.path.join(temp_dir.name, 'Default', 'Preferences'), 'w', encoding='utf-8' |
| 613 | + ) as json_file: |
| 614 | + json.dump(preferences, json_file) |
| 615 | + |
| 616 | + def _set_browser_preferences_in_user_data_dir(self, user_data_dir: str): |
| 617 | + """ |
| 618 | + Set browser preferences in the user data directory. |
| 619 | +
|
| 620 | + This function will: |
| 621 | + 1. Create a backup of the existing Preferences file if it exists |
| 622 | + 2. Create Default directory if it doesn't exist |
| 623 | + 3. Write the new preferences to the Preferences file |
| 624 | +
|
| 625 | + Args: |
| 626 | + user_data_dir: Path to the user data directory |
| 627 | + """ |
| 628 | + default_dir = os.path.join(user_data_dir, 'Default') |
| 629 | + os.makedirs(default_dir, exist_ok=True) |
| 630 | + |
| 631 | + preferences_path = os.path.join(default_dir, 'Preferences') |
| 632 | + self._backup_preferences_dir = os.path.join(default_dir, 'Preferences.backup') |
| 633 | + |
| 634 | + if os.path.exists(preferences_path): |
| 635 | + # Backup existing Preferences file |
| 636 | + shutil.copy2(preferences_path, self._backup_preferences_dir) |
| 637 | + |
| 638 | + preferences = {} |
| 639 | + if os.path.exists(preferences_path): |
| 640 | + with suppress(json.JSONDecodeError): |
| 641 | + with open(preferences_path, 'r', encoding='utf-8') as preferences_file: |
| 642 | + preferences = json.load(preferences_file) |
| 643 | + preferences.update(self.options.browser_preferences) |
| 644 | + with open(preferences_path, 'w', encoding='utf-8') as json_file: |
| 645 | + json.dump(preferences, json_file, indent=2) |
| 646 | + |
| 647 | + def _get_user_data_dir(self) -> Optional[str]: |
| 648 | + for arg in self.options.arguments: |
| 649 | + if arg.startswith('--user-data-dir='): |
| 650 | + return arg.split('=', 1)[1] |
| 651 | + return None |
591 | 652 |
|
592 | 653 | @abstractmethod |
593 | 654 | def _get_default_binary_location(self) -> str: |
|
0 commit comments