Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ fprime-cli = "fprime_gds.executables.fprime_cli:main"
fprime-seqgen = "fprime_gds.common.tools.seqgen:main"
fprime-dp = "fprime_gds.executables.data_products:main"
fprime-gds = "fprime_gds.executables.run_deployment:main"
fprime-prm-write = "fprime_gds.common.tools.params:main"
fprime-prm-write = "fprime_gds.common.tools.params:main_encode"
fprime-prm-decode = "fprime_gds.common.tools.params:main_decode"
fprime-merge-dictionary = "fprime_gds.executables.dictionary_merge:main"

# For Pytest fixtures
Expand Down
302 changes: 300 additions & 2 deletions src/fprime_gds/common/tools/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ def parse_json(param_value_json, name_dict: dict[str, PrmTemplate], include_impl
return templates_to_values


def main():
def main_encode():
"""CLI entry point for fprime-prm-write (encoding).

Encodes parameter JSON files into binary .dat files or command sequence .seq files.
This is the inverse operation of fprime-prm-decode.
"""
arg_parser = ArgumentParser()
subparsers = arg_parser.add_subparsers(dest="subcmd", required=True)

Expand Down Expand Up @@ -246,5 +251,298 @@ def convert_json(json_file: Path, dictionary: Path, output: Path, output_format:
raise RuntimeError("Invalid output format " + str(output_format))


def decode_dat_to_params(dat_bytes: bytes, id_dict: dict[int, PrmTemplate]) -> list[tuple[PrmTemplate, Any]]:
"""Decode a binary .dat file into a list of (PrmTemplate, value) tuples.

Args:
dat_bytes: The binary data from a .dat file
id_dict: Dictionary mapping parameter IDs to PrmTemplate objects

Returns:
List of (PrmTemplate, value) tuples where value is in JSON-compatible format

Raises:
RuntimeError: If the file format is invalid or parameters cannot be decoded
"""
params = []
offset = 0

while offset < len(dat_bytes):
# Check for delimiter
if dat_bytes[offset] != 0xA5:
raise RuntimeError(
f"Invalid delimiter at offset {offset}: expected 0xA5, got {dat_bytes[offset]:#x}"
)
offset += 1

# Read record size (4 bytes, big endian)
if offset + 4 > len(dat_bytes):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing hardcoded widths and such. You should instead be using the loaded dictionary types. See my other comment on how to do that. Once you do have it, you should be able to do something like

from fprime_gds.common.utils.config_manager import ConfigManager

record_id_size = ConfigManager().get_type("FwDpIdType").getSize()

raise RuntimeError(
f"Incomplete record size at offset {offset}: expected 4 bytes, got {len(dat_bytes) - offset}"
)
record_size = int.from_bytes(dat_bytes[offset:offset+4], byteorder="big")
offset += 4

# Read parameter ID (4 bytes, big endian)
if offset + 4 > len(dat_bytes):
raise RuntimeError(
f"Incomplete parameter ID at offset {offset}: expected 4 bytes, got {len(dat_bytes) - offset}"
)
param_id = int.from_bytes(dat_bytes[offset:offset+4], byteorder="big")
offset += 4

# Look up parameter template
prm_template = id_dict.get(param_id, None)
if not prm_template:
raise RuntimeError(
f"Unknown parameter ID {param_id} (0x{param_id:x}) at offset {offset-4}"
)

# Calculate the value size
value_size = record_size - FW_PRM_ID_TYPE_SIZE

# Check if we have enough data
if offset + value_size > len(dat_bytes):
raise RuntimeError(
f"Incomplete parameter value for {prm_template.get_full_name()} at offset {offset}: "
f"expected {value_size} bytes, got {len(dat_bytes) - offset}"
)

# Deserialize the value
prm_instance = prm_template.prm_type_obj()
try:
prm_instance.deserialize(dat_bytes, offset)
except Exception as e:
raise RuntimeError(
f"Failed to deserialize parameter {prm_template.get_full_name()} "
f"(id={param_id}, type={prm_template.prm_type_obj.__name__}): {str(e)}"
)

# Get the raw value - use .val for simple types
# For complex types (arrays, structs), to_jsonable() provides the correct format
if isinstance(prm_instance, (ArrayType, SerializableType)):
value = prm_instance.to_jsonable()
else:
# For simple types (string, bool, numbers, enums), use the raw value
value = prm_instance.val

params.append((prm_template, value))

offset += value_size

return params


def params_to_json(params: list[tuple[PrmTemplate, Any]]) -> dict:
"""Convert a list of (PrmTemplate, value) tuples to JSON format.

The output format matches the input format expected by fprime-prm-write:
{
"componentName": {
"paramName": value,
...
},
...
}

Complex types from to_jsonable() are converted to simple format that
instantiate_prm_type() expects for round-trip compatibility.

Args:
params: List of (PrmTemplate, value) tuples

Returns:
Dictionary in the JSON format used by fprime-prm-write
"""
def to_encoder_format(value):
"""Convert to_jsonable() output to format expected by instantiate_prm_type()."""
if value is None:
return None

# Handle lists recursively
if isinstance(value, list):
return [to_encoder_format(v) for v in value]

# Only process dicts from here
if not isinstance(value, dict):
return value

# Array: {"values": [...]} -> [...]
if "values" in value and isinstance(value.get("values"), list):
return [to_encoder_format(v) for v in value["values"]]

# Any dict with "value" key (primitive wrapper or struct member) -> extract value
if "value" in value:
return to_encoder_format(value["value"])

# Plain dict (struct without metadata): recursively process all fields
return {k: to_encoder_format(v) for k, v in value.items()}

result = {}

for prm_template, value in params:
comp_name = prm_template.comp_name
prm_name = prm_template.prm_name

# Create component entry if it doesn't exist
if comp_name not in result:
result[comp_name] = {}

# Add parameter to component with encoder-compatible format
result[comp_name][prm_name] = to_encoder_format(value)

return result


def params_to_text(params: list[tuple[PrmTemplate, Any]]) -> str:
"""Convert a list of (PrmTemplate, value) tuples to human-readable text format.

Args:
params: List of (PrmTemplate, value) tuples

Returns:
Human-readable text string
"""
lines = []
current_component = None

for prm_template, value in params:
comp_name = prm_template.comp_name
prm_name = prm_template.prm_name
prm_id = prm_template.prm_id
type_name = prm_template.prm_type_obj.__name__.replace("Type", "")

# Add component header if this is a new component
if comp_name != current_component:
if current_component is not None:
lines.append("") # Blank line between components
lines.append(f"Component: {comp_name}")
current_component = comp_name

# Format the value
if isinstance(value, str):
value_str = f'"{value}"'
elif isinstance(value, (list, dict)):
value_str = js.dumps(value)
else:
value_str = str(value)

lines.append(f" {prm_name} = {value_str} (type: {type_name}, id: {prm_id})")

return "\n".join(lines)


def params_to_csv(params: list[tuple[PrmTemplate, Any]]) -> str:
"""Convert a list of (PrmTemplate, value) tuples to CSV format.

Args:
params: List of (PrmTemplate, value) tuples

Returns:
CSV string with columns: Component,Parameter,Value,Type,ID
"""
lines = []
lines.append("Component,Parameter,Value,Type,ID")

for prm_template, value in params:
comp_name = prm_template.comp_name
prm_name = prm_template.prm_name
prm_id = prm_template.prm_id
type_name = prm_template.prm_type_obj.__name__.replace("Type", "")

# Format the value for CSV
# For complex types (arrays, structs), convert to JSON string
if isinstance(value, (list, dict)):
value_str = js.dumps(value)
elif isinstance(value, str):
# Escape quotes in strings
value_str = value.replace('"', '""')
else:
value_str = str(value)

# Escape any commas or quotes in the value
if ',' in value_str or '"' in value_str or '\n' in value_str:
value_str = f'"{value_str}"'

lines.append(f"{comp_name},{prm_name},{value_str},{type_name},{prm_id}")

return "\n".join(lines)


def main_decode():
"""CLI entry point for fprime-prm-decode (decoding).

Decodes binary parameter database (.dat) files into human-readable formats.
This is the inverse operation of fprime-prm-write.
"""
arg_parser = ArgumentParser()

arg_parser.add_argument(
"dat_file", type=Path, help="The .dat file to decode", default=None
)
arg_parser.add_argument(
"--dictionary",
"-d",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With that argument, you can leverage the pre-existing Dictionaries class which can load all the dictionary types into a global config singleton, which will help processing without hardcoding type widths and such.

See

# If a dictionary is passed, load it into ConfigManager and add to args for convenient access
if args.dictionary:
args.dictionaries = Dictionaries.load_dictionaries_into_config(args.dictionary)

type=Path,
help="The dictionary file of the FSW",
required=True,
)
arg_parser.add_argument("--format", "-f", type=str, choices=["json", "text", "csv"], default="json", help="Output format (default: json)")
arg_parser.add_argument("--output", "-o", type=Path, help="The output file", default=None)


args = arg_parser.parse_args()

if args.dat_file is None or not args.dat_file.exists():
print("Unable to find", args.dat_file)
exit(1)

if args.dat_file.is_dir():
print("dat-file is a dir", args.dat_file)
exit(1)

if not args.dictionary.exists():
print("Unable to find", args.dictionary)
exit(1)

output_format = args.format

# determine output path
if args.output is None:
output_path = args.dat_file.with_suffix("." + output_format)
else:
output_path = args.output

print("Decoding", args.dat_file, "to", output_path, "(format: ." + output_format + ")")
output_path.parent.mkdir(parents=True, exist_ok=True)

# Load dictionary
dict_parser = PrmJsonLoader(str(args.dictionary.resolve()))
id_dict, name_dict, versions = dict_parser.construct_dicts(
str(args.dictionary.resolve())
)

# Read and decode .dat file
dat_bytes = args.dat_file.read_bytes()
params = decode_dat_to_params(dat_bytes, id_dict)

# Format output based on requested format
if output_format == "json":
output_data = params_to_json(params)
output_content = js.dumps(output_data, indent=4)
elif output_format == "text":
output_content = params_to_text(params)
elif output_format == "csv":
output_content = params_to_csv(params)
else:
raise RuntimeError("Invalid output format " + str(output_format))

# Write output
print("Done, writing to", output_path.resolve())
output_path.write_text(output_content)


if __name__ == "__main__":
main()
# This file was originally created to encode parameter database files
# Keep this backwards compatibility
main_encode()
Loading
Loading