|
| 1 | +#!/usr/bin/env python3 |
| 2 | +#** ***************************************************************************** |
| 3 | +# * |
| 4 | +# * If not stated otherwise in this file or this component's LICENSE file the |
| 5 | +# * following copyright and licenses apply: |
| 6 | +# * |
| 7 | +# * Copyright 2025 RDK Management |
| 8 | +# * |
| 9 | +# * Licensed under the Apache License, Version 2.0 (the "License"); |
| 10 | +# * you may not use this file except in compliance with the License. |
| 11 | +# * You may obtain a copy of the License at |
| 12 | +# * |
| 13 | +# * |
| 14 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 15 | +# * |
| 16 | +# * Unless required by applicable law or agreed to in writing, software |
| 17 | +# * distributed under the License is distributed on an "AS IS" BASIS, |
| 18 | +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 19 | +# * See the License for the specific language governing permissions and |
| 20 | +# * limitations under the License. |
| 21 | +# * |
| 22 | +#* ****************************************************************************** |
| 23 | + |
| 24 | +import os |
| 25 | +import sys |
| 26 | +import time |
| 27 | +import re |
| 28 | +import yaml |
| 29 | + |
| 30 | +dir_path = os.path.dirname(os.path.realpath(__file__)) |
| 31 | +sys.path.append(dir_path) |
| 32 | +sys.path.append(os.path.join(dir_path, "../")) |
| 33 | + |
| 34 | +from framework.core.logModule import logModule |
| 35 | +from framework.core.streamToFile import StreamToFile |
| 36 | +from framework.core.utPlaneController import utPlaneController |
| 37 | +from .abstractCECController import CECInterface |
| 38 | +from framework.core.commandModules.sshConsole import sshConsole |
| 39 | + |
| 40 | +HDMICEC_DEVICE_LIST_FILE = "/tmp/hdmi_cec_device_list_info.txt" |
| 41 | +HDMICEC_PRINT_CEC_NETWORK_CONFIG_FILE = os.path.join(dir_path, "configuration", "virtual_cec_print_device_network_configuration.yaml") |
| 42 | +HOST_CMD_EXEC_TIMEOUT = 2 |
| 43 | + |
| 44 | +class virtualCECController(CECInterface): |
| 45 | + """ |
| 46 | + HDMI CEC related utility functions |
| 47 | + """ |
| 48 | + def __init__(self, adaptor: str, logger: logModule, streamLogger: StreamToFile, |
| 49 | + address: str, username: str = '', password: str = '', port: int = 22, prompt = '~#', |
| 50 | + device_configuration:str = '', control_port:int = 8080): |
| 51 | + """ |
| 52 | + Initializes the virtualCECController class for HDMI CEC device communication. |
| 53 | +
|
| 54 | + Args: |
| 55 | + adaptor (str): The adaptor file path for the parent class. |
| 56 | + logger (logModule): Logger module instance for logging operations. |
| 57 | + streamLogger (StreamToFile): Stream logger for file-based logging. |
| 58 | + address (str): IP address or hostname of the remote device. |
| 59 | + username (str, optional): SSH username for authentication. Defaults to ''. |
| 60 | + password (str, optional): SSH password for authentication. Defaults to ''. |
| 61 | + port (int, optional): SSH port number for connection. Defaults to 22. |
| 62 | + prompt (str, optional): Command prompt string for the SSH session. Defaults to '~#'. |
| 63 | + device_configuration (str, optional): Path to the HDMI CEC device network configuration YAML file. Defaults to ''. |
| 64 | + control_port (int, optional): Port number for ut-controller communication. Defaults to 8080. |
| 65 | +
|
| 66 | + """ |
| 67 | + super().__init__(adaptor, logger, streamLogger) |
| 68 | + |
| 69 | + self.control_port = control_port |
| 70 | + self.commandPrompt = prompt |
| 71 | + |
| 72 | + try: |
| 73 | + # Load the HDMI CEC device network configuration file |
| 74 | + with open(device_configuration, "r") as f: |
| 75 | + config_dict = yaml.safe_load(f) |
| 76 | + |
| 77 | + self.cecDeviceNetworkConfigString = yaml.dump(config_dict) |
| 78 | + |
| 79 | + # Load the print configuration file |
| 80 | + with open(HDMICEC_PRINT_CEC_NETWORK_CONFIG_FILE, "r") as f: |
| 81 | + print_dict = yaml.safe_load(f) |
| 82 | + |
| 83 | + self.printConfigString = yaml.dump(print_dict) |
| 84 | + |
| 85 | + self.session = sshConsole(self._log, address, username, password, port=port, prompt=prompt) |
| 86 | + |
| 87 | + self.utPlaneController = utPlaneController(self.session, port=self.control_port) |
| 88 | + except FileNotFoundError: |
| 89 | + self._log.critical(f"Device config file not found") |
| 90 | + raise |
| 91 | + except yaml.YAMLError as e: |
| 92 | + self._log.critical(f"Invalid YAML in device config file: {e}") |
| 93 | + raise |
| 94 | + except Exception as e: |
| 95 | + self._log.critical(f"Failed to load device configuration: {e}") |
| 96 | + raise |
| 97 | + |
| 98 | + def loadCecDeviceNetworkConfiguration(self, configString: str): |
| 99 | + """ |
| 100 | + Loads the HDMI CEC device network configuration file on to the vComponent. |
| 101 | + """ |
| 102 | + |
| 103 | + self.utPlaneController.sendMessage(configString) |
| 104 | + |
| 105 | + def readDeviceNetworkList(self) -> list: |
| 106 | + """ |
| 107 | + Reads the device network list from the HDMI CEC device. |
| 108 | +
|
| 109 | + Returns: |
| 110 | + list: A list of dictionaries representing discovered devices with details. |
| 111 | + """ |
| 112 | + result = self.session.read_until(self.commandPrompt) |
| 113 | + |
| 114 | + result = re.sub(r'\x1b\[[0-9;]*m', '', result) # remove ANSI color codes |
| 115 | + result = result.replace('\r', '') # normalize newlines |
| 116 | + result = re.sub(r'root@[\w\-\:\/# ]+', '', result) # remove shell prompt lines |
| 117 | + result = re.sub(r'curl:.*?\n', '', result, flags=re.DOTALL) # remove curl noise if any |
| 118 | + |
| 119 | + devices = [] |
| 120 | + |
| 121 | + # Regex to match device lines |
| 122 | + pattern = re.compile( |
| 123 | + r"- Name:\s*(?P<name>[^,]+),.*?" |
| 124 | + r"Active Source:\s*(?P<active>\d+),.*?" |
| 125 | + r"Logical-1:\s*(?P<logical1>-?\d+),.*?" |
| 126 | + r"Physical:\s*(?P<physical>[\d\.]+)", |
| 127 | + re.MULTILINE |
| 128 | + ) |
| 129 | + |
| 130 | + for match in pattern.finditer(result): |
| 131 | + devices.append({ |
| 132 | + "name": match.group("name").strip(), |
| 133 | + "physical address": match.group("physical"), |
| 134 | + "logical address": int(match.group("logical1")), |
| 135 | + "active source": int(match.group("active")), |
| 136 | + }) |
| 137 | + |
| 138 | + return devices |
| 139 | + |
| 140 | + def listDevices(self) -> list: |
| 141 | + """ |
| 142 | + Lists the devices currently available on the HDMI CEC network. |
| 143 | +
|
| 144 | + Returns: |
| 145 | + list: A list of dictionaries representing discovered devices with details. |
| 146 | + { |
| 147 | + "name": "name", |
| 148 | + "physical address": "0.0.0.0", |
| 149 | + "logical address": 0, |
| 150 | + "active source": 0, |
| 151 | + } |
| 152 | + """ |
| 153 | + # send command to CEC network to print device configuration |
| 154 | + self.utPlaneController.sendMessage(self.printConfigString) |
| 155 | + |
| 156 | + devices = self.readDeviceNetworkList() |
| 157 | + |
| 158 | + if devices is None or len(devices) == 0: |
| 159 | + self.session.write("cat " + HDMICEC_DEVICE_LIST_FILE) |
| 160 | + time.sleep(HOST_CMD_EXEC_TIMEOUT) |
| 161 | + devices = self.readDeviceNetworkList() |
| 162 | + |
| 163 | + return devices |
| 164 | + |
| 165 | + def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> bool: |
| 166 | + """ |
| 167 | + This function sends a specified opCode. |
| 168 | +
|
| 169 | + Args: |
| 170 | + sourceAddress (str): The logical address of the source device ('0'-'F'). |
| 171 | + destAddress (str): The logical address of the destination device ('0'-'F'). |
| 172 | + opCode (str): Operation code to send as a hexadecimal string e.g 0x81. |
| 173 | + payload (list): List of hexadecimal strings to be sent with the opCode. Optional. |
| 174 | +
|
| 175 | + Returns: |
| 176 | + bool: True if the message was sent successfully, False otherwise. |
| 177 | + """ |
| 178 | + # Format the payload: source, destination, opCode, and payload |
| 179 | + msg_payload = [f"0x{sourceAddress}{destAddress}", opCode] |
| 180 | + if payload: |
| 181 | + msg_payload.extend(payload) |
| 182 | + |
| 183 | + yaml_content = ( |
| 184 | + "HdmiCec:\n" |
| 185 | + " command: cec_message\n" |
| 186 | + " description: Send a CEC message\n" |
| 187 | + " message:\n" |
| 188 | + " user_defined: true\n" |
| 189 | + f" payload: {msg_payload}\n" |
| 190 | + ) |
| 191 | + |
| 192 | + try: |
| 193 | + result = self.utPlaneController.sendMessage(yaml_content) |
| 194 | + return bool(result) |
| 195 | + except Exception as e: |
| 196 | + self._log.critical(f"Failed to send CEC message: {e}") |
| 197 | + return False |
| 198 | + |
| 199 | + def start(self): |
| 200 | + self.loadCecDeviceNetworkConfiguration(self.cecDeviceNetworkConfigString) |
| 201 | + |
| 202 | + def stop(self): |
| 203 | + pass |
| 204 | + |
| 205 | + def receiveMessage(self,sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool: |
| 206 | + """ |
| 207 | + For a virtual component, the device network is only a simulation, so the controller |
| 208 | + does not generate any logs when messages are sent or received. Therefore, it always returns expected message. |
| 209 | +
|
| 210 | + Args: |
| 211 | + sourceAddress (str): The logical address of the source device (0-9 or A-F). |
| 212 | + destAddress (str): The logical address of the destination device (0-9 or A-F). |
| 213 | + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. |
| 214 | + timeout (int): The maximum amount of time, in seconds, that the method will |
| 215 | + wait for the message to be received. Defaults to 10. |
| 216 | + payload (list): List of hexidecimal strings to be sent with the opCode. Optional. |
| 217 | +
|
| 218 | + Returns: |
| 219 | + list: list of strings containing found message. |
| 220 | + """ |
| 221 | + message = "Received expected message" |
| 222 | + return message |
0 commit comments