diff --git a/main.py b/main.py index 39942fba..876fb714 100644 --- a/main.py +++ b/main.py @@ -29,7 +29,7 @@ from phone_agent.model import ModelConfig -def check_system_requirements() -> bool: +def check_system_requirements(backend: str) -> bool: """ Check system requirements before running the agent. @@ -41,6 +41,10 @@ def check_system_requirements() -> bool: Returns: True if all checks pass, False otherwise. """ + if backend == "harmony": + print("🔍 Skipping ADB checks for Harmony backend...") + return True + print("🔍 Checking system requirements...") print("-" * 50) @@ -321,6 +325,21 @@ def parse_args() -> argparse.Namespace: help="ADB device ID", ) + parser.add_argument( + "--backend", + type=str, + choices=["adb", "harmony"], + default=os.getenv("PHONE_AGENT_BACKEND", "adb"), + help="Device control backend: 'adb' for Android, 'harmony' for HarmonyOS (default: adb)", + ) + + parser.add_argument( + "--hdc-path", + type=str, + default=os.getenv("PHONE_AGENT_HDC_PATH"), + help="Path to hdc executable when using HarmonyOS backend", + ) + parser.add_argument( "--connect", "-c", @@ -464,7 +483,7 @@ def main(): return # Run system requirements check before proceeding - if not check_system_requirements(): + if not check_system_requirements(args.backend): sys.exit(1) # Check model API connectivity and model availability @@ -482,6 +501,8 @@ def main(): agent_config = AgentConfig( max_steps=args.max_steps, device_id=args.device_id, + backend=args.backend, + hdc_path=args.hdc_path, verbose=not args.quiet, lang=args.lang, ) diff --git a/phone_agent/actions/handler.py b/phone_agent/actions/handler.py index b8293f68..2d5d6329 100644 --- a/phone_agent/actions/handler.py +++ b/phone_agent/actions/handler.py @@ -6,20 +6,7 @@ from dataclasses import dataclass from typing import Any, Callable -from phone_agent.adb import ( - back, - clear_text, - detect_and_set_adb_keyboard, - double_tap, - home, - launch_app, - long_press, - restore_keyboard, - swipe, - tap, - type_text, -) -from phone_agent.config.timing import TIMING_CONFIG +from phone_agent.device.base import DeviceController @dataclass @@ -45,11 +32,11 @@ class ActionHandler: def __init__( self, - device_id: str | None = None, + controller: DeviceController, confirmation_callback: Callable[[str], bool] | None = None, takeover_callback: Callable[[str], None] | None = None, ): - self.device_id = device_id + self.controller = controller self.confirmation_callback = confirmation_callback or self._default_confirmation self.takeover_callback = takeover_callback or self._default_takeover @@ -132,7 +119,7 @@ def _handle_launch(self, action: dict, width: int, height: int) -> ActionResult: if not app_name: return ActionResult(False, False, "No app name specified") - success = launch_app(app_name, self.device_id) + success = self.controller.launch_app(app_name) if success: return ActionResult(True, False) return ActionResult(False, False, f"App not found: {app_name}") @@ -145,7 +132,6 @@ def _handle_tap(self, action: dict, width: int, height: int) -> ActionResult: x, y = self._convert_relative_to_absolute(element, width, height) - # Check for sensitive operation if "message" in action: if not self.confirmation_callback(action["message"]): return ActionResult( @@ -154,28 +140,13 @@ def _handle_tap(self, action: dict, width: int, height: int) -> ActionResult: message="User cancelled sensitive operation", ) - tap(x, y, self.device_id) + self.controller.tap(x, y) return ActionResult(True, False) def _handle_type(self, action: dict, width: int, height: int) -> ActionResult: """Handle text input action.""" text = action.get("text", "") - - # Switch to ADB keyboard - original_ime = detect_and_set_adb_keyboard(self.device_id) - time.sleep(TIMING_CONFIG.action.keyboard_switch_delay) - - # Clear existing text and type new text - clear_text(self.device_id) - time.sleep(TIMING_CONFIG.action.text_clear_delay) - - type_text(text, self.device_id) - time.sleep(TIMING_CONFIG.action.text_input_delay) - - # Restore original keyboard - restore_keyboard(original_ime, self.device_id) - time.sleep(TIMING_CONFIG.action.keyboard_restore_delay) - + self.controller.type_text(text) return ActionResult(True, False) def _handle_swipe(self, action: dict, width: int, height: int) -> ActionResult: @@ -189,17 +160,17 @@ def _handle_swipe(self, action: dict, width: int, height: int) -> ActionResult: start_x, start_y = self._convert_relative_to_absolute(start, width, height) end_x, end_y = self._convert_relative_to_absolute(end, width, height) - swipe(start_x, start_y, end_x, end_y, device_id=self.device_id) + self.controller.swipe(start_x, start_y, end_x, end_y) return ActionResult(True, False) def _handle_back(self, action: dict, width: int, height: int) -> ActionResult: """Handle back button action.""" - back(self.device_id) + self.controller.back() return ActionResult(True, False) def _handle_home(self, action: dict, width: int, height: int) -> ActionResult: """Handle home button action.""" - home(self.device_id) + self.controller.home() return ActionResult(True, False) def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult: @@ -209,7 +180,9 @@ def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionRes return ActionResult(False, False, "No element coordinates") x, y = self._convert_relative_to_absolute(element, width, height) - double_tap(x, y, self.device_id) + self.controller.tap(x, y) + time.sleep(0.1) + self.controller.tap(x, y) return ActionResult(True, False) def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult: @@ -219,7 +192,7 @@ def _handle_long_press(self, action: dict, width: int, height: int) -> ActionRes return ActionResult(False, False, "No element coordinates") x, y = self._convert_relative_to_absolute(element, width, height) - long_press(x, y, device_id=self.device_id) + self.controller.swipe(x, y, x, y) return ActionResult(True, False) def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult: diff --git a/phone_agent/agent.py b/phone_agent/agent.py index b1703161..6b2be2bf 100644 --- a/phone_agent/agent.py +++ b/phone_agent/agent.py @@ -7,8 +7,8 @@ from phone_agent.actions import ActionHandler from phone_agent.actions.handler import do, finish, parse_action -from phone_agent.adb import get_current_app, get_screenshot from phone_agent.config import get_messages, get_system_prompt +from phone_agent.device import AdbDeviceController, HarmonyDeviceController from phone_agent.model import ModelClient, ModelConfig from phone_agent.model.client import MessageBuilder @@ -22,6 +22,8 @@ class AgentConfig: lang: str = "cn" system_prompt: str | None = None verbose: bool = True + backend: str = "adb" + hdc_path: str | None = None def __post_init__(self): if self.system_prompt is None: @@ -72,8 +74,20 @@ def __init__( self.agent_config = agent_config or AgentConfig() self.model_client = ModelClient(self.model_config) + + if self.agent_config.backend == "adb": + self.controller = AdbDeviceController( + device_id=self.agent_config.device_id + ) + elif self.agent_config.backend == "harmony": + if not self.agent_config.hdc_path: + raise ValueError("hdc_path must be provided for harmony backend") + self.controller = HarmonyDeviceController(self.agent_config.hdc_path) + else: + raise ValueError(f"Unknown backend: {self.agent_config.backend}") + self.action_handler = ActionHandler( - device_id=self.agent_config.device_id, + controller=self.controller, confirmation_callback=confirmation_callback, takeover_callback=takeover_callback, ) @@ -139,9 +153,8 @@ def _execute_step( """Execute a single step of the agent loop.""" self._step_count += 1 - # Capture current screen state - screenshot = get_screenshot(self.agent_config.device_id) - current_app = get_current_app(self.agent_config.device_id) + screenshot = self.controller.get_screenshot() + current_app = self.controller.get_current_app() # Build messages if is_first: diff --git a/phone_agent/device/__init__.py b/phone_agent/device/__init__.py new file mode 100644 index 00000000..e0df458c --- /dev/null +++ b/phone_agent/device/__init__.py @@ -0,0 +1,11 @@ +from phone_agent.device.base import DeviceController +from phone_agent.device.adb_backend import AdbDeviceController +from phone_agent.device.harmony_backend import HarmonyDeviceController + +__all__ = [ + "DeviceController", + "AdbDeviceController", + "HarmonyDeviceController", +] + + diff --git a/phone_agent/device/adb_backend.py b/phone_agent/device/adb_backend.py new file mode 100644 index 00000000..9099079f --- /dev/null +++ b/phone_agent/device/adb_backend.py @@ -0,0 +1,57 @@ +import time + +from phone_agent.adb import ( + back as adb_back, + get_current_app as adb_get_current_app, + home as adb_home, + launch_app as adb_launch_app, + swipe as adb_swipe, + tap as adb_tap, +) +from phone_agent.adb.input import ( + clear_text, + detect_and_set_adb_keyboard, + restore_keyboard, + type_text as adb_type_text, +) +from phone_agent.adb.screenshot import Screenshot, get_screenshot as adb_get_screenshot +from phone_agent.config.timing import TIMING_CONFIG +from phone_agent.device.base import DeviceController + + +class AdbDeviceController(DeviceController): + def __init__(self, device_id: str | None = None): + self.device_id = device_id + + def get_screenshot(self) -> Screenshot: + return adb_get_screenshot(self.device_id) + + def tap(self, x: int, y: int) -> None: + adb_tap(x, y, self.device_id) + + def swipe(self, x1: int, y1: int, x2: int, y2: int) -> None: + adb_swipe(x1, y1, x2, y2, device_id=self.device_id) + + def back(self) -> None: + adb_back(self.device_id) + + def home(self) -> None: + adb_home(self.device_id) + + def type_text(self, text: str) -> None: + original_ime = detect_and_set_adb_keyboard(self.device_id) + time.sleep(TIMING_CONFIG.action.keyboard_switch_delay) + clear_text(self.device_id) + time.sleep(TIMING_CONFIG.action.text_clear_delay) + adb_type_text(text, self.device_id) + time.sleep(TIMING_CONFIG.action.text_input_delay) + restore_keyboard(original_ime, self.device_id) + time.sleep(TIMING_CONFIG.action.keyboard_restore_delay) + + def launch_app(self, app_name: str) -> bool: + return adb_launch_app(app_name, self.device_id) + + def get_current_app(self) -> str: + return adb_get_current_app(self.device_id) + + diff --git a/phone_agent/device/base.py b/phone_agent/device/base.py new file mode 100644 index 00000000..c953cbfe --- /dev/null +++ b/phone_agent/device/base.py @@ -0,0 +1,31 @@ +from typing import Protocol + +from phone_agent.adb.screenshot import Screenshot + + +class DeviceController(Protocol): + def get_screenshot(self) -> Screenshot: + ... + + def tap(self, x: int, y: int) -> None: + ... + + def swipe(self, x1: int, y1: int, x2: int, y2: int) -> None: + ... + + def back(self) -> None: + ... + + def home(self) -> None: + ... + + def type_text(self, text: str) -> None: + ... + + def launch_app(self, app_name: str) -> bool: + ... + + def get_current_app(self) -> str: + ... + + diff --git a/phone_agent/device/harmony_backend.py b/phone_agent/device/harmony_backend.py new file mode 100644 index 00000000..fc198280 --- /dev/null +++ b/phone_agent/device/harmony_backend.py @@ -0,0 +1,181 @@ +import base64 +import os +import subprocess +import tempfile +import time +import uuid +from io import BytesIO + +from PIL import Image + +from phone_agent.adb.screenshot import Screenshot +from phone_agent.device.base import DeviceController + + +class HarmonyDeviceController(DeviceController): + def __init__(self, hdc_path: str, target: str | None = None): + self.hdc_path = hdc_path + self.target = target + + def _hdc_prefix(self) -> list[str]: + cmd = [self.hdc_path] + if self.target: + cmd.extend(["-t", self.target]) + return cmd + + def get_screenshot(self) -> Screenshot: + remote_path = "/data/local/tmp/screenshot.png" + local_path = os.path.join( + tempfile.gettempdir(), f"harmony_screenshot_{uuid.uuid4()}.png" + ) + + try: + subprocess.run( + self._hdc_prefix() + ["shell", "rm", remote_path], + capture_output=True, + text=True, + ) + time.sleep(0.5) + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "screenCap", "-p", remote_path], + capture_output=True, + text=True, + ) + time.sleep(0.5) + subprocess.run( + self._hdc_prefix() + ["file", "recv", remote_path, local_path], + capture_output=True, + text=True, + ) + time.sleep(0.5) + + if not os.path.exists(local_path): + return self._create_fallback_screenshot(False) + + img = Image.open(local_path) + width, height = img.size + + buffered = BytesIO() + img.save(buffered, format="PNG") + base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + + os.remove(local_path) + + return Screenshot( + base64_data=base64_data, width=width, height=height, is_sensitive=False + ) + except Exception: + return self._create_fallback_screenshot(False) + + def tap(self, x: int, y: int) -> None: + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "click", str(x), str(y)], + capture_output=True, + text=True, + ) + + def swipe(self, x1: int, y1: int, x2: int, y2: int) -> None: + subprocess.run( + self._hdc_prefix() + + [ + "shell", + "uitest", + "uiInput", + "swipe", + str(x1), + str(y1), + str(x2), + str(y2), + "500", + ], + capture_output=True, + text=True, + ) + + def back(self) -> None: + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "keyEvent", "Back"], + capture_output=True, + text=True, + ) + + def home(self) -> None: + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "keyEvent", "Home"], + capture_output=True, + text=True, + ) + + def type_text(self, text: str) -> None: + safe_text = text.replace("\\n", "_").replace("\n", "_") + for ch in safe_text: + if ch == " ": + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "keyEvent", "2050"], + capture_output=True, + text=True, + ) + elif ch == "_": + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "keyEvent", "2054"], + capture_output=True, + text=True, + ) + elif ch.isdigit() or ("a" <= ch <= "z") or ("A" <= ch <= "Z"): + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "inputText", "1", "1", ch], + capture_output=True, + text=True, + ) + elif ch in "-.,!?@'°/:;()": + subprocess.run( + self._hdc_prefix() + + [ + "shell", + "uitest", + "uiInput", + "inputText", + "1", + "1", + ch, + ], + capture_output=True, + text=True, + ) + else: + subprocess.run( + self._hdc_prefix() + + ["shell", "uitest", "uiInput", "inputText", "1", "1", ch], + capture_output=True, + text=True, + ) + time.sleep(0.05) + + def launch_app(self, app_name: str) -> bool: + return False + + def get_current_app(self) -> str: + return "Harmony Device" + + def _create_fallback_screenshot(self, is_sensitive: bool) -> Screenshot: + width = 1080 + height = 2400 + img = Image.new("RGB", (width, height), color="black") + buffered = BytesIO() + img.save(buffered, format="PNG") + base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + return Screenshot( + base64_data=base64_data, + width=width, + height=height, + is_sensitive=is_sensitive, + ) + +