Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/cfdp-libre-cube-crosstest/tmtccmd-client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from logging import basicConfig
from pathlib import Path
from queue import Empty
from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast

from common import REMOTE_CFG_FOR_DEST_ENTITY, UDP_SERVER_PORT, UDP_TM_SERVER_PORT
from common import REMOTE_ENTITY_ID as REMOTE_ENTITY_ID_RAW
Expand All @@ -24,7 +24,6 @@
TransactionId,
TransmissionMode,
)
from spacepackets.cfdp.pdu import AbstractFileDirectiveBase
from spacepackets.cfdp.pdu.helper import PduFactory
from spacepackets.countdown import Countdown
from spacepackets.seqcount import SeqCountProvider
Expand All @@ -50,6 +49,9 @@
TransactionParams,
)

if TYPE_CHECKING:
from spacepackets.cfdp.pdu import AbstractFileDirectiveBase

SOURCE_ENTITY_ID = ByteFieldU16(SOURCE_ENTITY_ID_RAW)
DEST_ENTITY_ID = ByteFieldU16(REMOTE_ENTITY_ID_RAW)

Expand Down Expand Up @@ -292,7 +294,7 @@ def source_entity_handler(
ready = select.select([tm_client], [], [], 0)
if ready[0]:
data, _ = tm_client.recvfrom(4096)
packet = cast(AbstractFileDirectiveBase, PduFactory.from_raw(data))
packet = cast("AbstractFileDirectiveBase", PduFactory.from_raw(data))
packet_received = True
except Empty:
pass
Expand Down
11 changes: 11 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Run with `just all`
all: fmt check test

fmt:
ruff format

check:
ruff check

test:
pytest
95 changes: 46 additions & 49 deletions src/cfdppy/handler/dest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class CompletionDisposition(enum.Enum):
@dataclass
class _DestFileParams(_FileParamsBase):
file_name: Path
file_size_eof: int | None

@classmethod
def empty(cls) -> _DestFileParams:
Expand All @@ -91,38 +90,41 @@ def empty(cls) -> _DestFileParams:
crc32=b"",
file_size=None,
file_name=Path(),
file_size_eof=None,
# file_size_eof=None,
metadata_only=False,
)

def reset(self) -> None:
super().reset()
self.file_name = Path()
self.file_size_eof = None


class TransactionStep(enum.Enum):
IDLE = 0
TRANSACTION_START = 1
"""Metadata was received, which triggered a transaction start."""

WAITING_FOR_METADATA = 2
"""Special state which is only used for acknowledged mode. The CFDP entity is still waiting
for a missing metadata PDU to be re-sent. Until then, all arriving file data PDUs will only
update the internal lost segment tracker. When the EOF PDU arrives, the state will be left.
Please note that deferred lost segment handling might also be active when this state is set."""

RECEIVING_FILE_DATA = 3

RECV_FILE_DATA_WITH_CHECK_LIMIT_HANDLING = 4
"""This is the check timer step as specified in chapter 4.6.3.3 b) of the standard.
The destination handler will still check for file data PDUs which might lead to a full
file transfer completion."""
SENDING_EOF_ACK_PDU = 5
"""Sending the ACK (EOF) packet."""

WAITING_FOR_MISSING_DATA = 6
"""Only relevant for acknowledged mode: Wait for lost metadata and file segments as part of
the deferred lost segments detection procedure."""

TRANSFER_COMPLETION = 7
"""File transfer complete. Perform checksum verification and notice of completion. Please
note that this does not necessarily mean that the file transfer was completed successfully."""

SENDING_FINISHED_PDU = 8
WAITING_FOR_FINISHED_ACK = 9

Expand Down Expand Up @@ -162,6 +164,7 @@ def coalesce_lost_segments(self) -> None:
if len(self.lost_segments) <= 1:
return
merged_segments = []
# Initialize to the first entry.
current_start, current_end = next(iter(self.lost_segments.items()))

for seg_start, seg_end in self.lost_segments.items():
Expand Down Expand Up @@ -220,6 +223,8 @@ def remove_lost_segment(self, segment_to_remove: tuple[int, int]) -> bool:
@dataclass
class _AckedModeParams:
lost_seg_tracker: LostSegmentTracker = field(default=LostSegmentTracker())
# Extra parameter: Missing metadata is not tracked inside the lost segment tracker, so we
# need an extra parameter for this.
metadata_missing: bool = False
last_start_offset: int = 0
last_end_offset: int = 0
Expand Down Expand Up @@ -520,11 +525,11 @@ def __idle_fsm(self, packet: GenericPduPacket | None) -> None:
self._start_transaction(metadata_pdu)
else:
raise ValueError(
f"unexpected configuration error: {pdu_holder.pdu} in " f"IDLE state machine"
f"unexpected configuration error: {pdu_holder.pdu} in IDLE state machine"
)

def __non_idle_fsm(self, packet: GenericPduPacket | None) -> None:
self._fsm_advancement_after_packets_were_sent()
self._assert_all_packets_were_sent()
pdu_holder = PduHolder(packet)
if (
self.states.step
Expand Down Expand Up @@ -554,20 +559,10 @@ def __non_idle_fsm(self, packet: GenericPduPacket | None) -> None:
if self.states.step == TransactionStep.WAITING_FOR_FINISHED_ACK:
self._handle_waiting_for_finished_ack(pdu_holder)

def _fsm_advancement_after_packets_were_sent(self) -> None:
def _assert_all_packets_were_sent(self) -> None:
"""Advance the internal FSM after all packets to be sent were retrieved from the handler."""
if len(self._pdus_to_be_sent) > 0:
raise UnretrievedPdusToBeSent(f"{len(self._pdus_to_be_sent)} packets left to send")
if self.states.step == TransactionStep.SENDING_EOF_ACK_PDU:
if (
self._params.acked_params.lost_seg_tracker.num_lost_segments > 0
or self._params.acked_params.metadata_missing
):
self._start_deferred_lost_segment_handling()
else:
if self._params.completion_disposition != CompletionDisposition.CANCELED:
self._checksum_verify()
self.states.step = TransactionStep.TRANSFER_COMPLETION

def _start_transaction(self, metadata_pdu: MetadataPdu) -> bool:
if self.states.state != CfdpState.IDLE:
Expand Down Expand Up @@ -595,7 +590,7 @@ def _start_transaction_missing_metadata_recv_eof(self, eof_pdu: EofPdu) -> None:

def _handle_eof_without_previous_metadata(self, eof_pdu: EofPdu) -> None:
self._params.fp.progress = eof_pdu.file_size
self._params.fp.file_size_eof = eof_pdu.file_size
self._params.fp.file_size = eof_pdu.file_size
self._params.acked_params.metadata_missing = True
if self._params.fp.progress > 0:
# Clear old list, deferred procedure for the whole file is now active.
Expand All @@ -609,11 +604,23 @@ def _handle_eof_without_previous_metadata(self, eof_pdu: EofPdu) -> None:
assert self._params.transaction_id is not None
self.user.eof_recv_indication(self._params.transaction_id)
self._prepare_eof_ack_packet()
self.states.step = TransactionStep.SENDING_EOF_ACK_PDU
self._eof_ack_pdu_done()

def _eof_ack_pdu_done(self) -> None:
if (
self._params.acked_params.lost_seg_tracker.num_lost_segments > 0
or self._params.acked_params.metadata_missing
):
self._start_deferred_lost_segment_handling()

else:
if self._params.completion_disposition != CompletionDisposition.CANCELED:
self._checksum_verify()
self.states.step = TransactionStep.TRANSFER_COMPLETION

def _start_transaction_missing_metadata_recv_fd(self, fd_pdu: FileDataPdu) -> None:
self._common_first_packet_not_metadata_pdu_handler(fd_pdu)
self._handle_fd_without_previous_metadata(True, fd_pdu)
self._handle_fd_without_previous_metadata(fd_pdu)

def _handle_finished_pdu_sent(self) -> None:
if (
Expand All @@ -625,19 +632,16 @@ def _handle_finished_pdu_sent(self) -> None:
return
self._reset_internal(False)

def _handle_fd_without_previous_metadata(self, first_pdu: bool, fd_pdu: FileDataPdu) -> None:
def _handle_fd_without_previous_metadata(self, fd_pdu: FileDataPdu) -> None:
self._params.fp.progress = fd_pdu.offset + len(fd_pdu.file_data)
if len(fd_pdu.file_data) > 0:
start = fd_pdu.offset
if first_pdu:
start = 0
# I will just wait until the metadata has been received with re-requesting the file
# data PDU. How does the standard expect me to process file data PDUs where I do not
# even know the filenames? How would I even generically do this?
# I will add this file segment (and all others which came before and might be missing
# as well) to the lost segment list.
self._params.acked_params.lost_seg_tracker.add_lost_segment(
(start, self._params.fp.progress)
(0, self._params.fp.progress)
)
# This is a bit tricky: We need to set those variables to an appropriate value so
# the removal of handled lost segments works properly. However, we can not set the
Expand All @@ -649,8 +653,7 @@ def _handle_fd_without_previous_metadata(self, first_pdu: bool, fd_pdu: FileData
# Re-request the metadata PDU.
if self._params.remote_cfg.immediate_nak_mode:
lost_segments: list[tuple[int, int]] = []
if first_pdu:
lost_segments.append((0, 0))
lost_segments.append((0, 0))
if len(fd_pdu.file_data) > 0:
lost_segments.append((0, self._params.fp.progress))
if len(lost_segments) > 0:
Expand Down Expand Up @@ -698,7 +701,7 @@ def _handle_metadata_packet(self, metadata_pdu: MetadataPdu) -> None:
# sender.
if self._params.remote_cfg is None:
_LOGGER.warning(
"No remote configuration found for remote ID" f" {metadata_pdu.dest_entity_id}"
f"No remote configuration found for remote ID {metadata_pdu.dest_entity_id}"
)
raise NoRemoteEntityCfgFound(metadata_pdu.dest_entity_id)
if not self._params.fp.metadata_only:
Expand Down Expand Up @@ -753,7 +756,7 @@ def _handle_waiting_for_missing_metadata(self, packet_holder: PduHolder) -> None
if packet_holder.pdu is None:
return
if packet_holder.pdu_type == PduType.FILE_DATA:
self._handle_fd_without_previous_metadata(True, packet_holder.to_file_data_pdu())
self._handle_fd_without_previous_metadata(packet_holder.to_file_data_pdu())
elif packet_holder.pdu_directive_type == DirectiveType.METADATA_PDU:
self._handle_metadata_packet(packet_holder.to_metadata_pdu())
if self._params.acked_params.deferred_lost_segment_detection_active:
Expand Down Expand Up @@ -832,8 +835,8 @@ def _handle_fd_pdu(self, file_data_pdu: FileDataPdu) -> None:
self._params.finished_params.file_status = FileStatus.FILE_RETAINED

if (
self._params.fp.file_size_eof is not None
and (offset + len(file_data_pdu.file_data) > self._params.fp.file_size_eof)
self._params.fp.file_size is not None
and (offset + len(file_data_pdu.file_data) > self._params.fp.file_size)
and (
self._declare_fault(ConditionCode.FILE_SIZE_ERROR)
!= FaultHandlerCode.IGNORE_ERROR
Expand Down Expand Up @@ -895,7 +898,7 @@ def _deferred_lost_segment_handling(self) -> None:
if not self._params.acked_params.deferred_lost_segment_detection_active:
return
assert self._params.remote_cfg is not None
assert self._params.fp.file_size_eof is not None
assert self._params.fp.file_size is not None
if (
self._params.acked_params.lost_seg_tracker.num_lost_segments == 0
and not self._params.acked_params.metadata_missing
Expand Down Expand Up @@ -941,7 +944,7 @@ def _deferred_lost_segment_handling(self) -> None:
NakPdu(
self._params.pdu_conf,
0,
self._params.fp.file_size_eof,
self._params.fp.file_size,
next_segment_reqs,
)
)
Expand All @@ -951,7 +954,7 @@ def _deferred_lost_segment_handling(self) -> None:
NakPdu(
self._params.pdu_conf,
0,
self._params.fp.file_size_eof,
self._params.fp.file_size,
next_segment_reqs,
)
)
Expand All @@ -962,7 +965,7 @@ def _deferred_lost_segment_handling(self) -> None:
def _handle_eof_pdu(self, eof_pdu: EofPdu) -> bool | None:
"""Returns whether to exit the FSM prematurely."""
self._params.fp.crc32 = eof_pdu.file_checksum
self._params.fp.file_size_eof = eof_pdu.file_size
self._params.fp.file_size = eof_pdu.file_size
if self.cfg.indication_cfg.eof_recv_indication_required:
assert self._params.transaction_id is not None
self.user.eof_recv_indication(self._params.transaction_id)
Expand All @@ -978,30 +981,26 @@ def _handle_eof_pdu(self, eof_pdu: EofPdu) -> bool | None:
EntityIdTlv(self._params.remote_cfg.entity_id.as_bytes),
)
# Store this as progress for the checksum calculation.
self._params.fp.progress = self._params.fp.file_size_eof
self._params.fp.progress = self._params.fp.file_size
self._params.finished_params.delivery_code = DeliveryCode.DATA_INCOMPLETE
self._file_transfer_complete_transition()
return False

def _handle_no_error_eof(self) -> bool:
"""Returns whether the transfer can be completed regularly."""
# CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
if self._params.fp.progress > self._params.fp.file_size_eof: # type: ignore
if self._params.fp.progress > self._params.fp.file_size: # type: ignore
if self._declare_fault(ConditionCode.FILE_SIZE_ERROR) != FaultHandlerCode.IGNORE_ERROR:
return False
elif (
self._params.fp.progress < self._params.fp.file_size_eof # type: ignore
self._params.fp.progress < self._params.fp.file_size # type: ignore
) and self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
# CFDP 4.6.4.3.1: The end offset of the last received file segment and the file
# size as stated in the EOF PDU is not the same, so we need to add that segment to
# the lost segments for the deferred lost segment detection procedure.
self._params.acked_params.lost_seg_tracker.add_lost_segment(
(self._params.fp.progress, self._params.fp.file_size_eof) # type: ignore
(self._params.fp.progress, self._params.fp.file_size) # type: ignore
)
if self._params.fp.file_size_eof != self._params.fp.file_size:
# Can or should this ever happen for a No Error EOF? Treat this like a non-fatal
# error for now.
_LOGGER.warning("missmatch of EOF file size and Metadata File Size for success EOF")
if (
self.transmission_mode == TransmissionMode.UNACKNOWLEDGED
and not self._checksum_verify()
Expand All @@ -1022,8 +1021,8 @@ def _start_deferred_lost_segment_handling(self) -> None:
self.states.step = TransactionStep.WAITING_FOR_MISSING_DATA
self._params.acked_params.deferred_lost_segment_detection_active = True
self._params.acked_params.lost_seg_tracker.coalesce_lost_segments()
self._params.acked_params.last_start_offset = self._params.fp.file_size_eof # type: ignore
self._params.acked_params.last_end_offset = self._params.fp.file_size_eof # type: ignore
self._params.acked_params.last_start_offset = self._params.fp.file_size # type: ignore
self._params.acked_params.last_end_offset = self._params.fp.file_size # type: ignore
self._deferred_lost_segment_handling()

def _prepare_eof_ack_packet(self) -> None:
Expand Down Expand Up @@ -1062,7 +1061,7 @@ def _file_transfer_complete_transition(self) -> None:
self.states.step = TransactionStep.TRANSFER_COMPLETION
elif self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
self._prepare_eof_ack_packet()
self.states.step = TransactionStep.SENDING_EOF_ACK_PDU
self._eof_ack_pdu_done()

def _trigger_notice_of_completion_canceled(
self, condition_code: ConditionCode, fault_location: EntityIdTlv
Expand Down Expand Up @@ -1102,8 +1101,6 @@ def _notice_of_completion(self) -> None:
self.user.transaction_finished_indication(finished_indic_params)

def _prepare_finished_pdu(self) -> None:
if self.states.packets_ready:
raise UnretrievedPdusToBeSent
# TODO: Fault location handling. Set remote entity ID for file copy
# operations cancelled with an EOF (Cancel) PDU, and the local ID for file
# copy operations cancelled with the local API.
Expand Down
14 changes: 7 additions & 7 deletions src/cfdppy/handler/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@
class TransactionStep(enum.Enum):
IDLE = 0
TRANSACTION_START = 1

# The following three are used for the Copy File Procedure
SENDING_METADATA = 3
SENDING_FILE_DATA = 4

RETRANSMITTING = 5
"""Re-transmitting missing packets in acknowledged mode."""

SENDING_EOF = 6
WAITING_FOR_EOF_ACK = 7
WAITING_FOR_FINISHED = 8
Expand Down Expand Up @@ -886,12 +889,9 @@ def _prepare_progressing_file_data_pdu(self) -> None:
:return: True if a packet was prepared, False if PDU handling is done and the next steps
in the Copy File procedure can be performed
"""
if self._params.fp.file_size < self._params.fp.segment_len:
read_len = self._params.fp.file_size
elif self._params.fp.progress + self._params.fp.segment_len > self._params.fp.file_size:
read_len = self._params.fp.file_size - self._params.fp.progress
else:
read_len = self._params.fp.segment_len
read_len = min(
self._params.fp.segment_len, self._params.fp.file_size - self._params.fp.progress
)
self._prepare_file_data_pdu(self._params.fp.progress, read_len)
self._params.fp.progress += read_len

Expand Down Expand Up @@ -930,7 +930,7 @@ def _get_next_transfer_seq_num(self) -> None:
next_seq_num = self.seq_num_provider.get_and_increment()
if self.seq_num_provider.max_bit_width not in [8, 16, 32]:
raise ValueError(
"Invalid bit width for sequence number provider, must be one of [8," " 16, 32]"
"Invalid bit width for sequence number provider, must be one of [8, 16, 32]"
)
self._params.pdu_conf.transaction_seq_num = ByteFieldGenerator.from_int(
self.seq_num_provider.max_bit_width // 8, next_seq_num
Expand Down
2 changes: 1 addition & 1 deletion src/cfdppy/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __str_for_metadata_only(self) -> str:
print_str = f"Metadata Only Put Request with Destination ID {self.destination_id.value}"
if self.msgs_to_user is not None:
for idx, msg_to_user in enumerate(self.msgs_to_user):
msg_to_user = cast(MessageToUserTlv, msg_to_user)
msg_to_user = cast("MessageToUserTlv", msg_to_user)
if msg_to_user.is_reserved_cfdp_message():
reserved_msg = msg_to_user.to_reserved_msg_tlv()
assert reserved_msg is not None
Expand Down
Loading
Loading