|
1 | 1 | import asyncio |
| 2 | +import json |
| 3 | +import os |
| 4 | +import shutil |
2 | 5 | from abc import ABC, abstractmethod |
| 6 | +from contextlib import suppress |
3 | 7 | from functools import partial |
4 | 8 | from random import randint |
| 9 | +from tempfile import TemporaryDirectory |
5 | 10 | from typing import Any, Callable, Optional, TypeVar |
6 | 11 |
|
7 | 12 | from pydoll.browser.interfaces import BrowserOptionsManager |
@@ -81,13 +86,20 @@ def __init__( |
81 | 86 | self._browser_process_manager = BrowserProcessManager() |
82 | 87 | self._temp_directory_manager = TempDirectoryManager() |
83 | 88 | self._connection_handler = ConnectionHandler(self._connection_port) |
| 89 | + self._backup_preferences_dir = '' |
84 | 90 |
|
85 | 91 | async def __aenter__(self) -> 'Browser': |
86 | 92 | """Async context manager entry.""" |
87 | 93 | return self |
88 | 94 |
|
89 | 95 | async def __aexit__(self, exc_type, exc_val, exc_tb): |
90 | 96 | """Async context manager exit with cleanup.""" |
| 97 | + if self._backup_preferences_dir: |
| 98 | + user_data_dir = self._get_user_data_dir() |
| 99 | + shutil.copy2( |
| 100 | + self._backup_preferences_dir, |
| 101 | + os.path.join(user_data_dir, 'Default', 'Preferences'), |
| 102 | + ) |
91 | 103 | if await self._is_browser_running(timeout=2): |
92 | 104 | await self.stop() |
93 | 105 |
|
@@ -117,9 +129,7 @@ async def start(self, headless: bool = False) -> Tab: |
117 | 129 | proxy_config = self._proxy_manager.get_proxy_credentials() |
118 | 130 |
|
119 | 131 | self._browser_process_manager.start_browser_process( |
120 | | - binary_location, |
121 | | - self._connection_port, |
122 | | - self.options.arguments, |
| 132 | + binary_location, self._connection_port, self.options.arguments |
123 | 133 | ) |
124 | 134 | await self._verify_browser_running() |
125 | 135 | await self._configure_proxy(proxy_config[0], proxy_config[1]) |
@@ -580,10 +590,60 @@ async def _execute_command(self, command: Command[T], timeout: int = 10) -> T: |
580 | 590 |
|
581 | 591 | def _setup_user_dir(self): |
582 | 592 | """Setup temporary user data directory if not specified in options.""" |
583 | | - if '--user-data-dir' not in [arg.split('=')[0] for arg in self.options.arguments]: |
584 | | - # For all browsers, use a temporary directory |
| 593 | + user_data_dir = self._get_user_data_dir() |
| 594 | + if user_data_dir and self.options.browser_preferences: |
| 595 | + self._set_browser_preferences_in_user_data_dir(user_data_dir) |
| 596 | + elif not user_data_dir: |
585 | 597 | temp_dir = self._temp_directory_manager.create_temp_dir() |
| 598 | + # For all browsers, use a temporary directory |
586 | 599 | self.options.arguments.append(f'--user-data-dir={temp_dir.name}') |
| 600 | + if self.options.browser_preferences: |
| 601 | + self._set_browser_preferences_in_temp_dir(temp_dir) |
| 602 | + |
| 603 | + def _set_browser_preferences_in_temp_dir(self, temp_dir: TemporaryDirectory): |
| 604 | + os.mkdir(os.path.join(temp_dir.name, 'Default')) |
| 605 | + preferences = self.options.browser_preferences |
| 606 | + with open( |
| 607 | + os.path.join(temp_dir.name, 'Default', 'Preferences'), 'w', encoding='utf-8' |
| 608 | + ) as json_file: |
| 609 | + json.dump(preferences, json_file) |
| 610 | + |
| 611 | + def _set_browser_preferences_in_user_data_dir(self, user_data_dir: str): |
| 612 | + """ |
| 613 | + Set browser preferences in the user data directory. |
| 614 | +
|
| 615 | + This function will: |
| 616 | + 1. Create a backup of the existing Preferences file if it exists |
| 617 | + 2. Create Default directory if it doesn't exist |
| 618 | + 3. Write the new preferences to the Preferences file |
| 619 | +
|
| 620 | + Args: |
| 621 | + user_data_dir: Path to the user data directory |
| 622 | + """ |
| 623 | + default_dir = os.path.join(user_data_dir, 'Default') |
| 624 | + os.makedirs(default_dir, exist_ok=True) |
| 625 | + |
| 626 | + preferences_path = os.path.join(default_dir, 'Preferences') |
| 627 | + self._backup_preferences_dir = os.path.join(default_dir, 'Preferences.backup') |
| 628 | + |
| 629 | + if os.path.exists(preferences_path): |
| 630 | + # Backup existing Preferences file |
| 631 | + shutil.copy2(preferences_path, self._backup_preferences_dir) |
| 632 | + |
| 633 | + preferences = {} |
| 634 | + if os.path.exists(preferences_path): |
| 635 | + with suppress(json.JSONDecodeError): |
| 636 | + with open(preferences_path, 'r', encoding='utf-8') as preferences_file: |
| 637 | + preferences = json.load(preferences_file) |
| 638 | + preferences.update(self.options.browser_preferences) |
| 639 | + with open(preferences_path, 'w', encoding='utf-8') as json_file: |
| 640 | + json.dump(preferences, json_file, indent=2) |
| 641 | + |
| 642 | + def _get_user_data_dir(self) -> Optional[str]: |
| 643 | + for arg in self.options.arguments: |
| 644 | + if arg.startswith('--user-data-dir='): |
| 645 | + return arg.split('=', 1)[1] |
| 646 | + return None |
587 | 647 |
|
588 | 648 | @abstractmethod |
589 | 649 | def _get_default_binary_location(self) -> str: |
|
0 commit comments