diff --git a/appium/webdriver/appium_service.py b/appium/webdriver/appium_service.py index 37e2ddab..c84bfeda 100644 --- a/appium/webdriver/appium_service.py +++ b/appium/webdriver/appium_service.py @@ -17,13 +17,14 @@ import subprocess as sp import sys import time -from typing import Any, List, Optional, Set +from typing import Any, Callable, List, Optional, Set from selenium.webdriver.remote.remote_connection import urllib3 DEFAULT_HOST = '127.0.0.1' DEFAULT_PORT = 4723 STARTUP_TIMEOUT_MS = 60000 +STATE_CHECK_INTERVAL_MS = 500 MAIN_SCRIPT_PATH = 'appium/build/lib/main.js' STATUS_URL = '/status' DEFAULT_BASE_PATH = '/' @@ -37,111 +38,11 @@ class AppiumStartupError(RuntimeError): pass -def find_executable(executable: str) -> Optional[str]: - path = os.environ['PATH'] - paths = path.split(os.pathsep) - _, ext = os.path.splitext(executable) - if sys.platform == 'win32' and not ext: - executable = executable + '.exe' - - if os.path.isfile(executable): - return executable - - for p in paths: - full_path = os.path.join(p, executable) - if os.path.isfile(full_path): - return full_path - - return None - - -def get_node() -> str: - result = find_executable('node') - if result is None: - raise AppiumServiceError( - 'NodeJS main executable cannot be found. Make sure it is installed and present in PATH' - ) - return result - - -def get_npm() -> str: - result = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm') - if result is None: - raise AppiumServiceError( - 'Node Package Manager executable cannot be found. Make sure it is installed and present in PATH' - ) - return result - - -def get_main_script(node: Optional[str], npm: Optional[str]) -> str: - result: Optional[str] = None - npm_path = npm or get_npm() - for args in [['root', '-g'], ['root']]: - try: - modules_root = sp.check_output([npm_path] + args).strip().decode('utf-8') - if os.path.exists(os.path.join(modules_root, MAIN_SCRIPT_PATH)): - result = os.path.join(modules_root, MAIN_SCRIPT_PATH) - break - except sp.CalledProcessError: - continue - if result is None: - node_path = node or get_node() - try: - result = ( - sp.check_output([node_path, '-e', f'console.log(require.resolve("{MAIN_SCRIPT_PATH}"))']) - .decode('utf-8') - .strip() - ) - except sp.CalledProcessError as e: - raise AppiumServiceError(e.output) from e - return result - - -def parse_arg_value(args: List[str], arg_names: Set[str], default: str) -> str: - for idx, arg in enumerate(args): - if arg in arg_names and idx < len(args) - 1: - return args[idx + 1] - return default - - -def parse_port(args: List[str]) -> int: - return int(parse_arg_value(args, {'--port', '-p'}, str(DEFAULT_PORT))) - - -def parse_base_path(args: List[str]) -> str: - return parse_arg_value(args, {'--base-path', '-pa'}, DEFAULT_BASE_PATH) - - -def parse_host(args: List[str]) -> str: - return parse_arg_value(args, {'--address', '-a'}, DEFAULT_HOST) - - -def make_status_url(args: List[str]) -> str: - base_path = parse_base_path(args) - return STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"/+$", "", base_path)}{STATUS_URL}' - - class AppiumService: def __init__(self) -> None: self._process: Optional[sp.Popen] = None self._cmd: Optional[List[str]] = None - def _poll_status(self, host: str, port: int, path: str, timeout_ms: int) -> bool: - time_started_sec = time.time() - conn = urllib3.PoolManager(timeout=1.0) - while time.time() < time_started_sec + timeout_ms / 1000.0: - if not self.is_running: - raise AppiumStartupError() - # noinspection PyUnresolvedReferences - try: - resp = conn.request('HEAD', f'http://{host}:{port}{path}') - if resp.status < 400: - return True - except urllib3.exceptions.HTTPError: - pass - time.sleep(1.0) - return False - def start(self, **kwargs: Any) -> sp.Popen: """Starts Appium service with given arguments. @@ -173,8 +74,7 @@ def start(self, **kwargs: Any) -> sp.Popen: https://appium.io/docs/en/writing-running-appium/server-args/ for more details about possible arguments and their values. - Returns: - You can use Popen.communicate interface or stderr/stdout properties + :return: You can use Popen.communicate interface or stderr/stdout properties of the instance (stdout/stderr must not be set to None in such case) in order to retrieve the actual process output. """ @@ -201,11 +101,15 @@ def start(self, **kwargs: Any) -> sp.Popen: f'method arguments.' ) if timeout_ms > 0: - status_url_path = make_status_url(args) + server_url = _make_server_url(args) try: - if not self._poll_status(parse_host(args), parse_port(args), status_url_path, timeout_ms): + if not is_service_listening( + server_url, + timeout=timeout_ms / 1000, + custom_validator=self._assert_is_running, + ): error_msg = ( - f'Appium server has started but is not listening on {status_url_path} ' + f'Appium server has started but is not listening on {server_url} ' f'within {timeout_ms}ms timeout. Make sure proper values have been provided ' f'to --base-path, --address and --port process arguments.' ) @@ -215,6 +119,7 @@ def start(self, **kwargs: Any) -> sp.Popen: error_msg = startup_failure_msg if error_msg is not None: if stderr == sp.PIPE and self._process.stderr is not None: + # noinspection PyUnresolvedReferences err_output = self._process.stderr.read() if err_output: error_msg += f'\nOriginal error: {str(err_output)}' @@ -222,31 +127,37 @@ def start(self, **kwargs: Any) -> sp.Popen: raise AppiumServiceError(error_msg) return self._process - def stop(self) -> bool: + def stop(self, timeout: float = 5.5) -> bool: """Stops Appium service if it is running. The call will be ignored if the service is not running or has been already stopped. - Returns: - `True` if the service was running before being stopped + :param timeout: The maximum time in float seconds to wait + for the server process to terminate + :return: `True` if the service was running before being stopped """ - is_terminated = False + was_running = False if self.is_running: assert self._process + was_running = True self._process.terminate() - self._process.communicate(timeout=5) - is_terminated = True + try: + self._process.communicate(timeout=timeout) + except sp.SubprocessError: + if sys.platform == 'win32': + sp.call(['taskkill', '/f', '/pid', str(self._process.pid)]) + else: + self._process.kill() self._process = None self._cmd = None - return is_terminated + return was_running @property def is_running(self) -> bool: """Check if the service is running. - Returns: - bool: `True` if the service is running + :return: `True` if the service is running """ return self._process is not None and self._cmd is not None and self._process.poll() is None @@ -258,18 +169,147 @@ def is_listening(self) -> bool: The default host/port/base path values can be customized by providing --address/--port/--base-path command line arguments while starting the service. - Returns: - bool: `True` if the service is running and listening on the given/default host/port + :return: `True` if the service is running and listening on the given/default host/port """ if not self.is_running: return False assert self._cmd try: - return self._poll_status(parse_host(self._cmd), parse_port(self._cmd), make_status_url(self._cmd), 1000) + return is_service_listening( + _make_server_url(self._cmd), + timeout=STATE_CHECK_INTERVAL_MS, + custom_validator=self._assert_is_running, + ) except AppiumStartupError: return False + def _assert_is_running(self) -> None: + if not self.is_running: + raise AppiumStartupError() + + +def is_service_listening(url: str, timeout: float = 5, custom_validator: Optional[Callable[[], None]] = None) -> bool: + """ + Check if the service is running + + :param url: Full server url + :param timeout: Timeout in float seconds + :param custom_validator: Custom callable method to be executed upon each validation loop before the timeout happens + :return: True if Appium server is running before the timeout + """ + time_started_sec = time.perf_counter() + conn = urllib3.PoolManager(timeout=1.0) + while time.perf_counter() < time_started_sec + timeout: + if custom_validator is not None: + custom_validator() + # noinspection PyUnresolvedReferences + try: + resp = conn.request('HEAD', url) + if resp.status < 400: + return True + except urllib3.exceptions.HTTPError: + pass + time.sleep(STATE_CHECK_INTERVAL_MS / 1000.0) + return False + + +def find_executable(executable: str) -> Optional[str]: + path = os.environ['PATH'] + paths = path.split(os.pathsep) + _, ext = os.path.splitext(executable) + if sys.platform == 'win32' and not ext: + executable = executable + '.exe' + + if os.path.isfile(executable): + return executable + + for p in paths: + full_path = os.path.join(p, executable) + if os.path.isfile(full_path): + return full_path + + return None + + +def get_node() -> str: + result = find_executable('node') + if result is None: + raise AppiumServiceError( + 'NodeJS main executable cannot be found. Make sure it is installed and present in PATH' + ) + return result + + +def get_npm() -> str: + result = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm') + if result is None: + raise AppiumServiceError( + 'Node Package Manager executable cannot be found. Make sure it is installed and present in PATH' + ) + return result + + +def get_main_script(node: Optional[str], npm: Optional[str]) -> str: + result: Optional[str] = None + npm_path = npm or get_npm() + for args in [['root', '-g'], ['root']]: + try: + modules_root = sp.check_output([npm_path] + args).strip().decode('utf-8') + full_path = os.path.join(modules_root, *MAIN_SCRIPT_PATH.split('/')) + if os.path.exists(full_path): + result = full_path + break + except sp.CalledProcessError: + continue + if result is None: + node_path = node or get_node() + try: + result = ( + sp.check_output([node_path, '-e', f'console.log(require.resolve("{MAIN_SCRIPT_PATH}"))']) + .decode('utf-8') + .strip() + ) + except sp.CalledProcessError as e: + raise AppiumServiceError(e.output) from e + return result + + +def _parse_arg_value(args: List[str], arg_names: Set[str], default: str) -> str: + for idx, arg in enumerate(args): + if arg in arg_names and idx < len(args) - 1: + return args[idx + 1] + return default + + +def _parse_port(args: List[str]) -> int: + return int(_parse_arg_value(args, {'--port', '-p'}, str(DEFAULT_PORT))) + + +def _parse_base_path(args: List[str]) -> str: + return _parse_arg_value(args, {'--base-path', '-pa'}, DEFAULT_BASE_PATH) + + +def _parse_host(args: List[str]) -> str: + return _parse_arg_value(args, {'--address', '-a'}, DEFAULT_HOST) + + +def _parse_protocol(args: List[str]) -> str: + return ( + 'https' + if _parse_arg_value(args, {'--ssl-cert-path'}, '') and _parse_arg_value(args, {'--ssl-key-path'}, '') + else 'http' + ) + + +def _make_status_path(args: List[str]) -> str: + base_path = _parse_base_path(args) + return STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"/+$", "", base_path)}{STATUS_URL}' + + +def _make_server_url(args: List[str]) -> str: + return f'{_parse_protocol(args)}://{_parse_host(args)}:{_parse_port(args)}{_make_status_path(args)}' + if __name__ == '__main__': assert find_executable('node') is not None