Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 158 additions & 118 deletions appium/webdriver/appium_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import subprocess as sp
import sys
import time
from typing import Any, List, Optional, Set
from typing import Any, Callable, List, Optional, Set

from selenium.webdriver.remote.remote_connection import urllib3

DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 4723
STARTUP_TIMEOUT_MS = 60000
STATE_CHECK_INTERVAL_MS = 500
MAIN_SCRIPT_PATH = 'appium/build/lib/main.js'
STATUS_URL = '/status'
DEFAULT_BASE_PATH = '/'
Expand All @@ -37,111 +38,11 @@ class AppiumStartupError(RuntimeError):
pass


def find_executable(executable: str) -> Optional[str]:
path = os.environ['PATH']
paths = path.split(os.pathsep)
_, ext = os.path.splitext(executable)
if sys.platform == 'win32' and not ext:
executable = executable + '.exe'

if os.path.isfile(executable):
return executable

for p in paths:
full_path = os.path.join(p, executable)
if os.path.isfile(full_path):
return full_path

return None


def get_node() -> str:
result = find_executable('node')
if result is None:
raise AppiumServiceError(
'NodeJS main executable cannot be found. Make sure it is installed and present in PATH'
)
return result


def get_npm() -> str:
result = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm')
if result is None:
raise AppiumServiceError(
'Node Package Manager executable cannot be found. Make sure it is installed and present in PATH'
)
return result


def get_main_script(node: Optional[str], npm: Optional[str]) -> str:
result: Optional[str] = None
npm_path = npm or get_npm()
for args in [['root', '-g'], ['root']]:
try:
modules_root = sp.check_output([npm_path] + args).strip().decode('utf-8')
if os.path.exists(os.path.join(modules_root, MAIN_SCRIPT_PATH)):
result = os.path.join(modules_root, MAIN_SCRIPT_PATH)
break
except sp.CalledProcessError:
continue
if result is None:
node_path = node or get_node()
try:
result = (
sp.check_output([node_path, '-e', f'console.log(require.resolve("{MAIN_SCRIPT_PATH}"))'])
.decode('utf-8')
.strip()
)
except sp.CalledProcessError as e:
raise AppiumServiceError(e.output) from e
return result


def parse_arg_value(args: List[str], arg_names: Set[str], default: str) -> str:
for idx, arg in enumerate(args):
if arg in arg_names and idx < len(args) - 1:
return args[idx + 1]
return default


def parse_port(args: List[str]) -> int:
return int(parse_arg_value(args, {'--port', '-p'}, str(DEFAULT_PORT)))


def parse_base_path(args: List[str]) -> str:
return parse_arg_value(args, {'--base-path', '-pa'}, DEFAULT_BASE_PATH)


def parse_host(args: List[str]) -> str:
return parse_arg_value(args, {'--address', '-a'}, DEFAULT_HOST)


def make_status_url(args: List[str]) -> str:
base_path = parse_base_path(args)
return STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"/+$", "", base_path)}{STATUS_URL}'


class AppiumService:
def __init__(self) -> None:
self._process: Optional[sp.Popen] = None
self._cmd: Optional[List[str]] = None

def _poll_status(self, host: str, port: int, path: str, timeout_ms: int) -> bool:
time_started_sec = time.time()
conn = urllib3.PoolManager(timeout=1.0)
while time.time() < time_started_sec + timeout_ms / 1000.0:
if not self.is_running:
raise AppiumStartupError()
# noinspection PyUnresolvedReferences
try:
resp = conn.request('HEAD', f'http://{host}:{port}{path}')
if resp.status < 400:
return True
except urllib3.exceptions.HTTPError:
pass
time.sleep(1.0)
return False

def start(self, **kwargs: Any) -> sp.Popen:
"""Starts Appium service with given arguments.

Expand Down Expand Up @@ -173,8 +74,7 @@ def start(self, **kwargs: Any) -> sp.Popen:
https://appium.io/docs/en/writing-running-appium/server-args/ for more details
about possible arguments and their values.

Returns:
You can use Popen.communicate interface or stderr/stdout properties
:return: You can use Popen.communicate interface or stderr/stdout properties
of the instance (stdout/stderr must not be set to None in such case) in order to retrieve the actual process
output.
"""
Expand All @@ -201,11 +101,15 @@ def start(self, **kwargs: Any) -> sp.Popen:
f'method arguments.'
)
if timeout_ms > 0:
status_url_path = make_status_url(args)
server_url = _make_server_url(args)
try:
if not self._poll_status(parse_host(args), parse_port(args), status_url_path, timeout_ms):
if not is_service_listening(
server_url,
timeout=timeout_ms / 1000,
custom_validator=self._assert_is_running,
):
error_msg = (
f'Appium server has started but is not listening on {status_url_path} '
f'Appium server has started but is not listening on {server_url} '
f'within {timeout_ms}ms timeout. Make sure proper values have been provided '
f'to --base-path, --address and --port process arguments.'
)
Expand All @@ -215,38 +119,45 @@ def start(self, **kwargs: Any) -> sp.Popen:
error_msg = startup_failure_msg
if error_msg is not None:
if stderr == sp.PIPE and self._process.stderr is not None:
# noinspection PyUnresolvedReferences
err_output = self._process.stderr.read()
if err_output:
error_msg += f'\nOriginal error: {str(err_output)}'
self.stop()
raise AppiumServiceError(error_msg)
return self._process

def stop(self) -> bool:
def stop(self, timeout: float = 5.5) -> bool:
"""Stops Appium service if it is running.

The call will be ignored if the service is not running
or has been already stopped.

Returns:
`True` if the service was running before being stopped
:param timeout: The maximum time in float seconds to wait
for the server process to terminate
:return: `True` if the service was running before being stopped
"""
is_terminated = False
was_running = False
if self.is_running:
assert self._process
was_running = True
self._process.terminate()
self._process.communicate(timeout=5)
is_terminated = True
try:
self._process.communicate(timeout=timeout)
except sp.SubprocessError:
if sys.platform == 'win32':
sp.call(['taskkill', '/f', '/pid', str(self._process.pid)])
else:
self._process.kill()
self._process = None
self._cmd = None
return is_terminated
return was_running

@property
def is_running(self) -> bool:
"""Check if the service is running.

Returns:
bool: `True` if the service is running
:return: `True` if the service is running
"""
return self._process is not None and self._cmd is not None and self._process.poll() is None

Expand All @@ -258,18 +169,147 @@ def is_listening(self) -> bool:
The default host/port/base path values can be customized by providing
--address/--port/--base-path command line arguments while starting the service.

Returns:
bool: `True` if the service is running and listening on the given/default host/port
:return: `True` if the service is running and listening on the given/default host/port
"""
if not self.is_running:
return False

assert self._cmd
try:
return self._poll_status(parse_host(self._cmd), parse_port(self._cmd), make_status_url(self._cmd), 1000)
return is_service_listening(
_make_server_url(self._cmd),
timeout=STATE_CHECK_INTERVAL_MS,
custom_validator=self._assert_is_running,
)
except AppiumStartupError:
return False

def _assert_is_running(self) -> None:
if not self.is_running:
raise AppiumStartupError()


def is_service_listening(url: str, timeout: float = 5, custom_validator: Optional[Callable[[], None]] = None) -> bool:
"""
Check if the service is running

:param url: Full server url
:param timeout: Timeout in float seconds
:param custom_validator: Custom callable method to be executed upon each validation loop before the timeout happens
:return: True if Appium server is running before the timeout
"""
time_started_sec = time.perf_counter()
conn = urllib3.PoolManager(timeout=1.0)
while time.perf_counter() < time_started_sec + timeout:
if custom_validator is not None:
custom_validator()
# noinspection PyUnresolvedReferences
try:
resp = conn.request('HEAD', url)
if resp.status < 400:
return True
except urllib3.exceptions.HTTPError:
pass
time.sleep(STATE_CHECK_INTERVAL_MS / 1000.0)
return False


def find_executable(executable: str) -> Optional[str]:
path = os.environ['PATH']
paths = path.split(os.pathsep)
_, ext = os.path.splitext(executable)
if sys.platform == 'win32' and not ext:
executable = executable + '.exe'

if os.path.isfile(executable):
return executable

for p in paths:
full_path = os.path.join(p, executable)
if os.path.isfile(full_path):
return full_path

return None


def get_node() -> str:
result = find_executable('node')
if result is None:
raise AppiumServiceError(
'NodeJS main executable cannot be found. Make sure it is installed and present in PATH'
)
return result


def get_npm() -> str:
result = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm')
if result is None:
raise AppiumServiceError(
'Node Package Manager executable cannot be found. Make sure it is installed and present in PATH'
)
return result


def get_main_script(node: Optional[str], npm: Optional[str]) -> str:
result: Optional[str] = None
npm_path = npm or get_npm()
for args in [['root', '-g'], ['root']]:
try:
modules_root = sp.check_output([npm_path] + args).strip().decode('utf-8')
full_path = os.path.join(modules_root, *MAIN_SCRIPT_PATH.split('/'))
if os.path.exists(full_path):
result = full_path
break
except sp.CalledProcessError:
continue
if result is None:
node_path = node or get_node()
try:
result = (
sp.check_output([node_path, '-e', f'console.log(require.resolve("{MAIN_SCRIPT_PATH}"))'])
.decode('utf-8')
.strip()
)
except sp.CalledProcessError as e:
raise AppiumServiceError(e.output) from e
return result


def _parse_arg_value(args: List[str], arg_names: Set[str], default: str) -> str:
for idx, arg in enumerate(args):
if arg in arg_names and idx < len(args) - 1:
return args[idx + 1]
return default


def _parse_port(args: List[str]) -> int:
return int(_parse_arg_value(args, {'--port', '-p'}, str(DEFAULT_PORT)))


def _parse_base_path(args: List[str]) -> str:
return _parse_arg_value(args, {'--base-path', '-pa'}, DEFAULT_BASE_PATH)


def _parse_host(args: List[str]) -> str:
return _parse_arg_value(args, {'--address', '-a'}, DEFAULT_HOST)


def _parse_protocol(args: List[str]) -> str:
return (
'https'
if _parse_arg_value(args, {'--ssl-cert-path'}, '') and _parse_arg_value(args, {'--ssl-key-path'}, '')
else 'http'
)


def _make_status_path(args: List[str]) -> str:
base_path = _parse_base_path(args)
return STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"/+$", "", base_path)}{STATUS_URL}'


def _make_server_url(args: List[str]) -> str:
return f'{_parse_protocol(args)}://{_parse_host(args)}:{_parse_port(args)}{_make_status_path(args)}'


if __name__ == '__main__':
assert find_executable('node') is not None
Expand Down
Loading