Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/fprime_gds/common/testing_fw/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pathlib import Path
import shutil
import json
import tempfile

from fprime_gds.common.models.serialize.time_type import TimeType

Expand All @@ -21,6 +22,7 @@
from fprime_gds.common.history.test import TestHistory
from fprime_gds.common.logger.test_logger import TestLogger
from fprime_gds.common.testing_fw import predicates
from fprime_gds.common.tools.seqgen import SeqGenException, generateSequence
from fprime_gds.common.utils.event_severity import EventSeverity


Expand Down Expand Up @@ -1158,11 +1160,53 @@ def uplink_file(self, file_path, destination=None):

Args:
file_path: the path to the file to upload
destination: the destination path for the uploaded file
"""
uplink_file = Path(self.pipeline.up_store) / Path(file_path).name
shutil.copy2(file_path, uplink_file)
self.pipeline.files.uplinker.enqueue(str(uplink_file), destination)

def uplink_sequence_and_await_completion(self, sequence_path, destination=None, timeout=10):
"""
This function will upload a sequence and wait for its completion, awaiting for the
FileReceived event.

Args:
sequence_path: the path to the sequence to upload
destination: the destination path for the uploaded sequence
timeout: the maximum time to wait for the event
"""
self.uplink_sequence(sequence_path, destination)
self.await_event("FileReceived", timeout=timeout)

def uplink_sequence(self, sequence_path, destination=None):
"""
This function will upload a sequence to the specified location.

Note: this will simply put the sequence file on the outgoing queue. No guarantee
is made on when the sequence will be delivered. To wait for the completion of
the sequence uplink, use uplink_sequence_and_await_completion()

Args:
sequence_path: the path to the sequence to upload
destination: the destination path for the uploaded sequence
"""
with tempfile.TemporaryDirectory() as tempdir:
temp_bin_path = (Path(tempdir) / Path(sequence_path).name).with_suffix(".bin")
try:
generateSequence(
sequence_path, temp_bin_path, self.dictionaries.dictionary_path, 0xFFFF, cont=True
)
except OSError as ose:
msg = f"Failed to generate sequence binary from {sequence_path}: {ose}"
self.__log(msg, TestLogger.RED)
raise
except SeqGenException as exc:
msg = f"Failed to generate sequence binary from {sequence_path}: {exc}"
self.__log(msg, TestLogger.RED)
raise
self.uplink_file(temp_bin_path, destination)

######################################################################################
# History Searches
######################################################################################
Expand Down
Loading