diff --git a/pyproject.toml b/pyproject.toml index 6906cd02..58a60f88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,8 @@ fprime-cli = "fprime_gds.executables.fprime_cli:main" fprime-seqgen = "fprime_gds.common.tools.seqgen:main" fprime-dp = "fprime_gds.executables.data_products:main" fprime-gds = "fprime_gds.executables.run_deployment:main" -fprime-prm-write = "fprime_gds.common.tools.params:main" +fprime-prm-write = "fprime_gds.common.tools.params:main_encode" +fprime-prm-decode = "fprime_gds.common.tools.params:main_decode" fprime-merge-dictionary = "fprime_gds.executables.dictionary_merge:main" # For Pytest fixtures diff --git a/src/fprime_gds/common/tools/params.py b/src/fprime_gds/common/tools/params.py index be5cac33..07a5038c 100644 --- a/src/fprime_gds/common/tools/params.py +++ b/src/fprime_gds/common/tools/params.py @@ -154,7 +154,12 @@ def parse_json(param_value_json, name_dict: dict[str, PrmTemplate], include_impl return templates_to_values -def main(): +def main_encode(): + """CLI entry point for fprime-prm-write (encoding). + + Encodes parameter JSON files into binary .dat files or command sequence .seq files. + This is the inverse operation of fprime-prm-decode. + """ arg_parser = ArgumentParser() subparsers = arg_parser.add_subparsers(dest="subcmd", required=True) @@ -246,5 +251,298 @@ def convert_json(json_file: Path, dictionary: Path, output: Path, output_format: raise RuntimeError("Invalid output format " + str(output_format)) +def decode_dat_to_params(dat_bytes: bytes, id_dict: dict[int, PrmTemplate]) -> list[tuple[PrmTemplate, Any]]: + """Decode a binary .dat file into a list of (PrmTemplate, value) tuples. + + Args: + dat_bytes: The binary data from a .dat file + id_dict: Dictionary mapping parameter IDs to PrmTemplate objects + + Returns: + List of (PrmTemplate, value) tuples where value is in JSON-compatible format + + Raises: + RuntimeError: If the file format is invalid or parameters cannot be decoded + """ + params = [] + offset = 0 + + while offset < len(dat_bytes): + # Check for delimiter + if dat_bytes[offset] != 0xA5: + raise RuntimeError( + f"Invalid delimiter at offset {offset}: expected 0xA5, got {dat_bytes[offset]:#x}" + ) + offset += 1 + + # Read record size (4 bytes, big endian) + if offset + 4 > len(dat_bytes): + raise RuntimeError( + f"Incomplete record size at offset {offset}: expected 4 bytes, got {len(dat_bytes) - offset}" + ) + record_size = int.from_bytes(dat_bytes[offset:offset+4], byteorder="big") + offset += 4 + + # Read parameter ID (4 bytes, big endian) + if offset + 4 > len(dat_bytes): + raise RuntimeError( + f"Incomplete parameter ID at offset {offset}: expected 4 bytes, got {len(dat_bytes) - offset}" + ) + param_id = int.from_bytes(dat_bytes[offset:offset+4], byteorder="big") + offset += 4 + + # Look up parameter template + prm_template = id_dict.get(param_id, None) + if not prm_template: + raise RuntimeError( + f"Unknown parameter ID {param_id} (0x{param_id:x}) at offset {offset-4}" + ) + + # Calculate the value size + value_size = record_size - FW_PRM_ID_TYPE_SIZE + + # Check if we have enough data + if offset + value_size > len(dat_bytes): + raise RuntimeError( + f"Incomplete parameter value for {prm_template.get_full_name()} at offset {offset}: " + f"expected {value_size} bytes, got {len(dat_bytes) - offset}" + ) + + # Deserialize the value + prm_instance = prm_template.prm_type_obj() + try: + prm_instance.deserialize(dat_bytes, offset) + except Exception as e: + raise RuntimeError( + f"Failed to deserialize parameter {prm_template.get_full_name()} " + f"(id={param_id}, type={prm_template.prm_type_obj.__name__}): {str(e)}" + ) + + # Get the raw value - use .val for simple types + # For complex types (arrays, structs), to_jsonable() provides the correct format + if isinstance(prm_instance, (ArrayType, SerializableType)): + value = prm_instance.to_jsonable() + else: + # For simple types (string, bool, numbers, enums), use the raw value + value = prm_instance.val + + params.append((prm_template, value)) + + offset += value_size + + return params + + +def params_to_json(params: list[tuple[PrmTemplate, Any]]) -> dict: + """Convert a list of (PrmTemplate, value) tuples to JSON format. + + The output format matches the input format expected by fprime-prm-write: + { + "componentName": { + "paramName": value, + ... + }, + ... + } + + Complex types from to_jsonable() are converted to simple format that + instantiate_prm_type() expects for round-trip compatibility. + + Args: + params: List of (PrmTemplate, value) tuples + + Returns: + Dictionary in the JSON format used by fprime-prm-write + """ + def to_encoder_format(value): + """Convert to_jsonable() output to format expected by instantiate_prm_type().""" + if value is None: + return None + + # Handle lists recursively + if isinstance(value, list): + return [to_encoder_format(v) for v in value] + + # Only process dicts from here + if not isinstance(value, dict): + return value + + # Array: {"values": [...]} -> [...] + if "values" in value and isinstance(value.get("values"), list): + return [to_encoder_format(v) for v in value["values"]] + + # Any dict with "value" key (primitive wrapper or struct member) -> extract value + if "value" in value: + return to_encoder_format(value["value"]) + + # Plain dict (struct without metadata): recursively process all fields + return {k: to_encoder_format(v) for k, v in value.items()} + + result = {} + + for prm_template, value in params: + comp_name = prm_template.comp_name + prm_name = prm_template.prm_name + + # Create component entry if it doesn't exist + if comp_name not in result: + result[comp_name] = {} + + # Add parameter to component with encoder-compatible format + result[comp_name][prm_name] = to_encoder_format(value) + + return result + + +def params_to_text(params: list[tuple[PrmTemplate, Any]]) -> str: + """Convert a list of (PrmTemplate, value) tuples to human-readable text format. + + Args: + params: List of (PrmTemplate, value) tuples + + Returns: + Human-readable text string + """ + lines = [] + current_component = None + + for prm_template, value in params: + comp_name = prm_template.comp_name + prm_name = prm_template.prm_name + prm_id = prm_template.prm_id + type_name = prm_template.prm_type_obj.__name__.replace("Type", "") + + # Add component header if this is a new component + if comp_name != current_component: + if current_component is not None: + lines.append("") # Blank line between components + lines.append(f"Component: {comp_name}") + current_component = comp_name + + # Format the value + if isinstance(value, str): + value_str = f'"{value}"' + elif isinstance(value, (list, dict)): + value_str = js.dumps(value) + else: + value_str = str(value) + + lines.append(f" {prm_name} = {value_str} (type: {type_name}, id: {prm_id})") + + return "\n".join(lines) + + +def params_to_csv(params: list[tuple[PrmTemplate, Any]]) -> str: + """Convert a list of (PrmTemplate, value) tuples to CSV format. + + Args: + params: List of (PrmTemplate, value) tuples + + Returns: + CSV string with columns: Component,Parameter,Value,Type,ID + """ + lines = [] + lines.append("Component,Parameter,Value,Type,ID") + + for prm_template, value in params: + comp_name = prm_template.comp_name + prm_name = prm_template.prm_name + prm_id = prm_template.prm_id + type_name = prm_template.prm_type_obj.__name__.replace("Type", "") + + # Format the value for CSV + # For complex types (arrays, structs), convert to JSON string + if isinstance(value, (list, dict)): + value_str = js.dumps(value) + elif isinstance(value, str): + # Escape quotes in strings + value_str = value.replace('"', '""') + else: + value_str = str(value) + + # Escape any commas or quotes in the value + if ',' in value_str or '"' in value_str or '\n' in value_str: + value_str = f'"{value_str}"' + + lines.append(f"{comp_name},{prm_name},{value_str},{type_name},{prm_id}") + + return "\n".join(lines) + + +def main_decode(): + """CLI entry point for fprime-prm-decode (decoding). + + Decodes binary parameter database (.dat) files into human-readable formats. + This is the inverse operation of fprime-prm-write. + """ + arg_parser = ArgumentParser() + + arg_parser.add_argument( + "dat_file", type=Path, help="The .dat file to decode", default=None + ) + arg_parser.add_argument( + "--dictionary", + "-d", + type=Path, + help="The dictionary file of the FSW", + required=True, + ) + arg_parser.add_argument("--format", "-f", type=str, choices=["json", "text", "csv"], default="json", help="Output format (default: json)") + arg_parser.add_argument("--output", "-o", type=Path, help="The output file", default=None) + + + args = arg_parser.parse_args() + + if args.dat_file is None or not args.dat_file.exists(): + print("Unable to find", args.dat_file) + exit(1) + + if args.dat_file.is_dir(): + print("dat-file is a dir", args.dat_file) + exit(1) + + if not args.dictionary.exists(): + print("Unable to find", args.dictionary) + exit(1) + + output_format = args.format + + # determine output path + if args.output is None: + output_path = args.dat_file.with_suffix("." + output_format) + else: + output_path = args.output + + print("Decoding", args.dat_file, "to", output_path, "(format: ." + output_format + ")") + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Load dictionary + dict_parser = PrmJsonLoader(str(args.dictionary.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts( + str(args.dictionary.resolve()) + ) + + # Read and decode .dat file + dat_bytes = args.dat_file.read_bytes() + params = decode_dat_to_params(dat_bytes, id_dict) + + # Format output based on requested format + if output_format == "json": + output_data = params_to_json(params) + output_content = js.dumps(output_data, indent=4) + elif output_format == "text": + output_content = params_to_text(params) + elif output_format == "csv": + output_content = params_to_csv(params) + else: + raise RuntimeError("Invalid output format " + str(output_format)) + + # Write output + print("Done, writing to", output_path.resolve()) + output_path.write_text(output_content) + + if __name__ == "__main__": - main() + # This file was originally created to encode parameter database files + # Keep this backwards compatibility + main_encode() diff --git a/test/fprime_gds/common/tools/test_prm_decode.py b/test/fprime_gds/common/tools/test_prm_decode.py new file mode 100644 index 00000000..0f46f7e5 --- /dev/null +++ b/test/fprime_gds/common/tools/test_prm_decode.py @@ -0,0 +1,324 @@ +import json +import tempfile +from pathlib import Path +import pytest +from fprime_gds.common.tools.params import ( + convert_json, + decode_dat_to_params, + params_to_json, + params_to_text, + params_to_csv, +) +from fprime_gds.common.loaders.prm_json_loader import PrmJsonLoader + + +def test_decode_simple_paramdb(): + """Test decoding the simple_paramdb.dat file.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + dat_file = Path(__file__).parent / "expected" / "simple_paramdb.dat" + input_json_file = Path(__file__).parent / "input" / "simple_paramdb.json" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Read and decode .dat file + dat_bytes = dat_file.read_bytes() + params = decode_dat_to_params(dat_bytes, id_dict) + + # Verify we got parameters + assert len(params) > 0, "Should have decoded at least one parameter" + + # Convert to JSON and compare with original input + decoded_json = params_to_json(params) + expected_json = json.loads(input_json_file.read_text()) + + assert decoded_json == expected_json, "Decoded JSON should match original input" + + +def test_round_trip_encode_decode(): + """Test that encoding then decoding produces the same result.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + input_json_file = Path(__file__).parent / "input" / "simple_paramdb.json" + + with tempfile.TemporaryDirectory() as temp_dir: + # Encode JSON to .dat + dat_file = Path(temp_dir) / "test.dat" + convert_json(input_json_file, dict_file, dat_file, "dat") + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Decode .dat back to JSON + dat_bytes = dat_file.read_bytes() + params = decode_dat_to_params(dat_bytes, id_dict) + decoded_json = params_to_json(params) + + # Compare with original + expected_json = json.loads(input_json_file.read_text()) + assert decoded_json == expected_json, "Round-trip should produce identical JSON" + + +def test_params_to_text_format(): + """Test that params_to_text produces readable output.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + dat_file = Path(__file__).parent / "expected" / "simple_paramdb.dat" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Decode parameters + dat_bytes = dat_file.read_bytes() + params = decode_dat_to_params(dat_bytes, id_dict) + + # Convert to text + text_output = params_to_text(params) + + # Verify text contains expected elements + assert "Component:" in text_output, "Text output should have component headers" + assert "type:" in text_output, "Text output should include type information" + assert "id:" in text_output, "Text output should include parameter IDs" + assert len(text_output) > 0, "Text output should not be empty" + + +def test_params_to_csv_format(): + """Test that params_to_csv produces valid CSV output.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + dat_file = Path(__file__).parent / "expected" / "simple_paramdb.dat" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Decode parameters + dat_bytes = dat_file.read_bytes() + params = decode_dat_to_params(dat_bytes, id_dict) + + # Convert to CSV + csv_output = params_to_csv(params) + + # Verify CSV structure + lines = csv_output.split("\n") + assert len(lines) >= 2, "CSV should have header and at least one data row" + assert lines[0] == "Component,Parameter,Value,Type,ID", "CSV should have correct header" + + # Check that data rows have the right number of columns + for line in lines[1:]: + if line: # Skip empty lines + # Count commas, accounting for quoted values + # Simple check: should have at least 4 commas for 5 columns + assert "," in line, "CSV data rows should have comma separators" + + +def test_decode_invalid_delimiter(): + """Test that decoding fails with invalid delimiter.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Create invalid data with wrong delimiter + invalid_data = b"\xFF\x00\x00\x00\x12\x00\x00\x11\x01test" + + with pytest.raises(RuntimeError, match="Invalid delimiter"): + decode_dat_to_params(invalid_data, id_dict) + + +def test_decode_unknown_param_id(): + """Test that decoding fails with unknown parameter ID.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Create data with unknown parameter ID (0xFFFFFFFF) + invalid_data = b"\xA5\x00\x00\x00\x08\xFF\xFF\xFF\xFF\x00\x00\x00\x00" + + with pytest.raises(RuntimeError, match="Unknown parameter ID"): + decode_dat_to_params(invalid_data, id_dict) + + +def test_decode_incomplete_data(): + """Test that decoding fails with incomplete data.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Create incomplete data (delimiter and partial record size) + incomplete_data = b"\xA5\x00\x00" + + with pytest.raises(RuntimeError, match="Incomplete"): + decode_dat_to_params(incomplete_data, id_dict) + + +def test_params_to_json_multiple_components(): + """Test that params_to_json handles multiple components correctly.""" + from fprime_gds.common.templates.prm_template import PrmTemplate + from fprime_gds.common.models.serialize.numerical_types import U32Type + + # Create mock parameters from different components + template1 = PrmTemplate(1, "param1", "comp1", U32Type, None) + template2 = PrmTemplate(2, "param2", "comp1", U32Type, None) + template3 = PrmTemplate(3, "param3", "comp2", U32Type, None) + + params = [ + (template1, 100), + (template2, 200), + (template3, 300), + ] + + result = params_to_json(params) + + # Verify structure + assert "comp1" in result, "Should have comp1" + assert "comp2" in result, "Should have comp2" + assert result["comp1"]["param1"] == 100 + assert result["comp1"]["param2"] == 200 + assert result["comp2"]["param3"] == 300 + + +def test_decode_empty_file(): + """Test that decoding an empty file returns empty list.""" + dict_file = Path(__file__).parent / "resources" / "simple_dictionary.json" + + # Load dictionary + dict_parser = PrmJsonLoader(str(dict_file.resolve())) + id_dict, name_dict, versions = dict_parser.construct_dicts(str(dict_file.resolve())) + + # Decode empty data + empty_data = b"" + params = decode_dat_to_params(empty_data, id_dict) + + assert len(params) == 0, "Empty file should decode to empty list" + + +def test_encoder_format_conversion_array(): + """Test converting array to_jsonable format to encoder format.""" + from fprime_gds.common.templates.prm_template import PrmTemplate + from fprime_gds.common.models.serialize.numerical_types import U32Type + + # Simulate array to_jsonable() output + template = PrmTemplate(1, "arrayParam", "comp1", U32Type, None) + array_value = { + "name": "Array_U32_3", + "type": "Array_U32_3", + "size": 3, + "values": [ + {"value": 10, "type": "U32"}, + {"value": 20, "type": "U32"}, + {"value": 30, "type": "U32"} + ] + } + + params = [(template, array_value)] + result = params_to_json(params) + + # Should convert to simple list format + assert result == {"comp1": {"arrayParam": [10, 20, 30]}} + + +def test_encoder_format_conversion_struct(): + """Test converting struct to_jsonable format to encoder format.""" + from fprime_gds.common.templates.prm_template import PrmTemplate + from fprime_gds.common.models.serialize.numerical_types import U32Type + + # Simulate struct to_jsonable() output + template = PrmTemplate(1, "structParam", "comp1", U32Type, None) + struct_value = { + "x": {"value": 1.0, "format": "{f}", "description": "X component"}, + "y": {"value": 2.0, "format": "{f}", "description": "Y component"}, + "z": {"value": 3.0, "format": "{f}", "description": "Z component"} + } + + params = [(template, struct_value)] + result = params_to_json(params) + + # Should convert to simple dict format + assert result == {"comp1": {"structParam": {"x": 1.0, "y": 2.0, "z": 3.0}}} + + +def test_encoder_format_conversion_primitive(): + """Test converting primitive wrapper to encoder format.""" + from fprime_gds.common.templates.prm_template import PrmTemplate + from fprime_gds.common.models.serialize.numerical_types import U32Type + + # Simulate primitive to_jsonable() output + template = PrmTemplate(1, "intParam", "comp1", U32Type, None) + primitive_value = {"value": 42, "type": "U32"} + + params = [(template, primitive_value)] + result = params_to_json(params) + + # Should extract just the value + assert result == {"comp1": {"intParam": 42}} + + +def test_encoder_format_conversion_passthrough(): + """Test that simple values pass through unchanged.""" + from fprime_gds.common.templates.prm_template import PrmTemplate + from fprime_gds.common.models.serialize.numerical_types import U32Type + + # Simple values should pass through unchanged + template1 = PrmTemplate(1, "numParam", "comp1", U32Type, None) + template2 = PrmTemplate(2, "strParam", "comp1", U32Type, None) + template3 = PrmTemplate(3, "listParam", "comp1", U32Type, None) + + params = [ + (template1, 123), + (template2, "test"), + (template3, [1, 2, 3]) + ] + result = params_to_json(params) + + assert result == { + "comp1": { + "numParam": 123, + "strParam": "test", + "listParam": [1, 2, 3] + } + } + + +def test_encoder_format_nested_structures(): + """Test converting nested structures (array of structs).""" + from fprime_gds.common.templates.prm_template import PrmTemplate + from fprime_gds.common.models.serialize.numerical_types import U32Type + + # Array of structs + template = PrmTemplate(1, "nestedParam", "comp1", U32Type, None) + nested_value = { + "name": "Array_Vector3_2", + "type": "Array_Vector3_2", + "size": 2, + "values": [ + { + "x": {"value": 1.0, "format": "{f}", "description": "X"}, + "y": {"value": 2.0, "format": "{f}", "description": "Y"}, + "z": {"value": 3.0, "format": "{f}", "description": "Z"} + }, + { + "x": {"value": 4.0, "format": "{f}", "description": "X"}, + "y": {"value": 5.0, "format": "{f}", "description": "Y"}, + "z": {"value": 6.0, "format": "{f}", "description": "Z"} + } + ] + } + + params = [(template, nested_value)] + result = params_to_json(params) + + # Should convert to nested simple format + assert result == { + "comp1": { + "nestedParam": [ + {"x": 1.0, "y": 2.0, "z": 3.0}, + {"x": 4.0, "y": 5.0, "z": 6.0} + ] + } + }