Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 207 additions & 32 deletions pyocd/utility/rtt_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# pyOCD debugger
# Copyright (c) 2022 Samuel Dewan
# Copyright (c) 2026 Arm Limited
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -14,15 +15,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from abc import ABC, abstractmethod
import selectors
import socket
from typing import Optional, Sequence
from typing import Optional, Sequence, Callable, IO
import os
from time import sleep
from pathlib import Path
import logging

from ..core.soc_target import SoCTarget
from ..core import exceptions
from ..debug.rtt import RTTControlBlock, RTTUpChannel, RTTDownChannel
from ..utility.stdio import StdioHandler

LOG = logging.getLogger(__name__)

class RTTChanWorker(ABC):
"""@brief Source and sink for data to be transferred over RTT. """
Expand Down Expand Up @@ -51,7 +60,6 @@ def close(self):
"""@brief Cleanup channel worker and close any file descriptors."""
pass


class RTTChanTCPWorker(RTTChanWorker):
"""@brief Implementation of channel worker that forwards RTT data via a TCP
socket. """
Expand Down Expand Up @@ -127,10 +135,148 @@ def close(self):
self.client.close()

class RTTChanFileWorker(RTTChanWorker):
"""@brief Implementation of channel worker that write data from RTT channel
"""@brief Implementation of channel worker that writes data from RTT channel
to a file and optionally reads data from a file into an RTT
channel. """
_f_out: Optional[IO[bytes]]
_f_in: Optional[IO[bytes]]
_f_out_path: Optional[str]
_f_in_path: Optional[str]

def __init__(self, channel: int, file_out: str, file_in: Optional[str] = None):
"""
@param file_out The file to write RTT channel data to.
@param file_in The file to read data from into the RTT channel. If None, no data will be read.
"""
self._f_out = None
self._f_in = None
self._f_out_path = None
self._f_in_path = None

# Check if the folder exists for output file
dir_out = os.path.dirname(file_out)
if dir_out and not os.path.exists(dir_out):
f_name_out = os.path.basename(file_out)
raise FileNotFoundError(
f"Output directory '{dir_out}' for RTT channel {channel} (file '{f_name_out}') does not exist."
)
try:
self._f_out = open(file_out, 'wb')
self._f_out_path = file_out
except OSError as e:
raise OSError(f"Failed to open RTT output file {file_out}: {e}")

# Open files
Copy link
Collaborator

@RobertRostohar RobertRostohar Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is misleading since it applies only to input file (output file is opened before)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if file_in is not None:
if os.path.exists(file_in):
self._f_in = open(file_in, 'rb')
self._f_in_path = file_in
else:
LOG.debug("Input file '%s' for RTT channel %d does not exist", os.path.basename(file_in), channel)

def write_up_data(self, data: bytes):
if self._f_out is None:
return 0
return self._f_out.write(data)

def get_down_data(self):
if self._f_in is None:
return b''
return self._f_in.read(4096)

def close(self):
if self._f_out is not None:
self._f_out.close()
if self._f_in is not None:
self._f_in.close()

class RTTChanSystemViewWorker(RTTChanWorker):
"""@brief Implementation of channel worker that writes data from RTT channel
to a SystemView file and handles START and STOP commands. """
_START_CMD = b"\x01"
_STOP_CMD = b"\x02"

def __init__(self, rtt_server: RTTServer, rtt_channel: int, file_out: str, auto_start: bool = True, auto_stop: bool = True):
self._rtt_server = rtt_server
self._rtt_channel = rtt_channel
self._auto_start = auto_start
self._auto_stop = auto_stop

self._started = not auto_start
self._f_out = None
self._f_out_path = None

# Check if the folder exists for output file
dir_out = os.path.dirname(file_out)
if dir_out and not os.path.exists(dir_out):
f_name_out = os.path.basename(file_out)
raise FileNotFoundError(
f"Output directory '{dir_out}' for RTT channel {self._rtt_channel} (file '{f_name_out}') does not exist."
)
try:
self._f_out = open(file_out, 'wb')
self._f_out_path = file_out
except OSError as e:
raise OSError(f"Failed to open SystemView output file {file_out}: {e}")

def write_up_data(self, data: bytes):
if self._f_out is None:
return 0
if self._started:
return self._f_out.write(data)
return len(data)

def get_down_data(self):
if not self._started and self._auto_start:
if self._rtt_server.is_channel_configured(self._rtt_channel) == False:
# Should not happen
LOG.error("SystemView worker for channel %d does not have a configured RTT channel; ignoring start request", self._rtt_channel)
return b''
down_chan: RTTDownChannel = self._rtt_server.control_block.down_channels[self._rtt_channel]
if down_chan.bytes_free == down_chan.size:
# Channel is empty, can start
LOG.debug("SystemView START command for channel %d sent", self._rtt_channel)
if down_chan.write(self._START_CMD) == len(self._START_CMD):
self._started = True
return b""

def close(self):
if self._auto_stop:
if self._rtt_server.is_channel_configured(self._rtt_channel) == False:
# Should not happen
LOG.error("SystemView worker for channel %d does not have a configured RTT channel; ignoring stop request", self._rtt_channel)
else:
down_chan: RTTDownChannel = self._rtt_server.control_block.down_channels[self._rtt_channel]
LOG.debug("SystemView STOP command for channel %d sent", self._rtt_channel)
down_chan.write(self._STOP_CMD)
if self._f_out is not None:
self._f_out.close()

class RTTChanStdioWorker(RTTChanWorker):
"""@brief Implementation of channel worker that forwards RTT data via a STDIO"""

_stdio: StdioHandler

def __init__(self, channel: int, stdio: StdioHandler):
"""
@param stdio The STDIO handler to use for RTT channel data.
"""
self._stdio = stdio

def write_up_data(self, data: bytes):
if self._stdio is None:
return 0
return self._stdio.write(data)

def get_down_data(self):
if self._stdio is None:
return b''
return self._stdio.read(4096)

def close(self):
pass
# if self._stdio is not None:
# self._stdio.shutdown()

class RTTServer:
"""@brief Keeps track of polling for multiple active RTT channels and the
Expand Down Expand Up @@ -158,6 +304,31 @@ def __init__(self, target: SoCTarget, address: int, size: int,
self.up_buffers = None
self.down_buffers = None

def _channel_handler(self, ch_idx: int, worker: RTTChanWorker):
# Read from up channel
try:
up_chan: RTTUpChannel = self.control_block.up_channels[ch_idx]
except IndexError:
pass
else:
self.up_buffers[ch_idx] += up_chan.read()

# Write to worker
bytes_written = worker.write_up_data(self.up_buffers[ch_idx])
self.up_buffers[ch_idx] = self.up_buffers[ch_idx][bytes_written:]

# Read from worker
self.down_buffers[ch_idx] += worker.get_down_data()

# Write data to down channel
try:
down_chan: RTTDownChannel = self.control_block.down_channels[ch_idx]
except IndexError:
pass
else:
bytes_out: int = down_chan.write(self.down_buffers[ch_idx])
self.down_buffers[ch_idx] = self.down_buffers[ch_idx][bytes_out:]

def poll(self):
"""@brief Reads from and writes to active RTT channels. """
if not self.running:
Expand All @@ -167,30 +338,7 @@ def poll(self):
for i, worker in enumerate(self.workers):
if worker is None:
continue

# Read from up channel
try:
up_chan: RTTUpChannel = self.control_block.up_channels[i]
except IndexError:
pass
else:
self.up_buffers[i] += up_chan.read()

# Write to worker
bytes_written = worker.write_up_data(self.up_buffers[i])
self.up_buffers[i] = self.up_buffers[i][bytes_written:]

# Read from worker
self.down_buffers[i] += worker.get_down_data()

# Write data to down channel
try:
down_chan: RTTDownChannel = self.control_block.down_channels[i]
except IndexError:
pass
else:
bytes_out: int = down_chan.write(self.down_buffers[i])
self.down_buffers[i] = self.down_buffers[i][bytes_out:]
self._channel_handler(i, worker)

def start(self):
"""@brief Find and parse RTT control block. """
Expand All @@ -201,8 +349,8 @@ def start(self):
num_chans: int = max(num_up_chans, num_down_chans)

self.workers = [None] * num_chans
self.up_buffers = [bytes()] * num_up_chans
self.down_buffers = [bytes()] * num_down_chans
self.up_buffers = [bytes()] * num_chans
self.down_buffers = [bytes()] * num_chans

def stop(self):
"""@brief Close all RTT workers. """
Expand All @@ -222,6 +370,31 @@ def running(self):
"""@brief True if RTT is started. """
return self.workers is not None

def is_channel_idx_valid(self, channel: int) -> bool:
"""Return True if channel is a valid index for current workers."""
if not isinstance(channel, int):
return False
if not self.running:
return False
return 0 <= channel < len(self.workers)

def is_channel_configured(self, channel: int) -> bool:
"""Return True if channel is a valid index for current workers and has a worker configured."""
if not self.is_channel_idx_valid(channel):
return False
return self.workers[channel] is not None

def add_channel_worker(self, channel: int, worker: Callable[[], RTTChanWorker]):
self.workers[channel] = worker()

def remove_channel_worker(self, channel: int):
if not self.is_channel_idx_valid(channel):
raise exceptions.RTTError(f"Invalid channel index {channel}")
worker = self.workers[channel]
if worker is not None:
worker.close()
self.workers[channel] = None

def add_server(self, port: int, channel: int):
"""@brief Start a new TCP server to communicate with a given RTT channel.

Expand All @@ -232,18 +405,20 @@ def add_server(self, port: int, channel: int):
raise exceptions.RTTError("RTT is not yet started")
elif self.workers[channel] is not None:
raise exceptions.RTTError(f"RTT is already started for channel {channel}")
self.add_channel_worker(channel, lambda: RTTChanTCPWorker(port, listen = True))

self.workers[channel] = RTTChanTCPWorker(port, listen = True)

def stop_server(self, port: int):
def stop_server(self, channel: Optional[int] = None, port: Optional[int] = None):
"""@brief Stop a TCP server.

@param port The port of the server to be stopped.
"""

if not self.running:
return
if channel is not None:
return self.remove_channel_worker(channel)

# Fallback: if channel not specified, search for server with given port and stop it
for i, worker in enumerate(self.workers):
if isinstance(worker, RTTChanTCPWorker):
if worker.port == port:
Expand Down
Loading