@@ -154,7 +154,12 @@ def parse_json(param_value_json, name_dict: dict[str, PrmTemplate], include_impl
154154 return templates_to_values
155155
156156
157- def main ():
157+ def main_encode ():
158+ """CLI entry point for fprime-prm-write (encoding).
159+
160+ Encodes parameter JSON files into binary .dat files or command sequence .seq files.
161+ This is the inverse operation of fprime-prm-decode.
162+ """
158163 arg_parser = ArgumentParser ()
159164 subparsers = arg_parser .add_subparsers (dest = "subcmd" , required = True )
160165
@@ -246,5 +251,298 @@ def convert_json(json_file: Path, dictionary: Path, output: Path, output_format:
246251 raise RuntimeError ("Invalid output format " + str (output_format ))
247252
248253
254+ def decode_dat_to_params (dat_bytes : bytes , id_dict : dict [int , PrmTemplate ]) -> list [tuple [PrmTemplate , Any ]]:
255+ """Decode a binary .dat file into a list of (PrmTemplate, value) tuples.
256+
257+ Args:
258+ dat_bytes: The binary data from a .dat file
259+ id_dict: Dictionary mapping parameter IDs to PrmTemplate objects
260+
261+ Returns:
262+ List of (PrmTemplate, value) tuples where value is in JSON-compatible format
263+
264+ Raises:
265+ RuntimeError: If the file format is invalid or parameters cannot be decoded
266+ """
267+ params = []
268+ offset = 0
269+
270+ while offset < len (dat_bytes ):
271+ # Check for delimiter
272+ if dat_bytes [offset ] != 0xA5 :
273+ raise RuntimeError (
274+ f"Invalid delimiter at offset { offset } : expected 0xA5, got { dat_bytes [offset ]:#x} "
275+ )
276+ offset += 1
277+
278+ # Read record size (4 bytes, big endian)
279+ if offset + 4 > len (dat_bytes ):
280+ raise RuntimeError (
281+ f"Incomplete record size at offset { offset } : expected 4 bytes, got { len (dat_bytes ) - offset } "
282+ )
283+ record_size = int .from_bytes (dat_bytes [offset :offset + 4 ], byteorder = "big" )
284+ offset += 4
285+
286+ # Read parameter ID (4 bytes, big endian)
287+ if offset + 4 > len (dat_bytes ):
288+ raise RuntimeError (
289+ f"Incomplete parameter ID at offset { offset } : expected 4 bytes, got { len (dat_bytes ) - offset } "
290+ )
291+ param_id = int .from_bytes (dat_bytes [offset :offset + 4 ], byteorder = "big" )
292+ offset += 4
293+
294+ # Look up parameter template
295+ prm_template = id_dict .get (param_id , None )
296+ if not prm_template :
297+ raise RuntimeError (
298+ f"Unknown parameter ID { param_id } (0x{ param_id :x} ) at offset { offset - 4 } "
299+ )
300+
301+ # Calculate the value size
302+ value_size = record_size - FW_PRM_ID_TYPE_SIZE
303+
304+ # Check if we have enough data
305+ if offset + value_size > len (dat_bytes ):
306+ raise RuntimeError (
307+ f"Incomplete parameter value for { prm_template .get_full_name ()} at offset { offset } : "
308+ f"expected { value_size } bytes, got { len (dat_bytes ) - offset } "
309+ )
310+
311+ # Deserialize the value
312+ prm_instance = prm_template .prm_type_obj ()
313+ try :
314+ prm_instance .deserialize (dat_bytes , offset )
315+ except Exception as e :
316+ raise RuntimeError (
317+ f"Failed to deserialize parameter { prm_template .get_full_name ()} "
318+ f"(id={ param_id } , type={ prm_template .prm_type_obj .__name__ } ): { str (e )} "
319+ )
320+
321+ # Get the raw value - use .val for simple types
322+ # For complex types (arrays, structs), to_jsonable() provides the correct format
323+ if isinstance (prm_instance , (ArrayType , SerializableType )):
324+ value = prm_instance .to_jsonable ()
325+ else :
326+ # For simple types (string, bool, numbers, enums), use the raw value
327+ value = prm_instance .val
328+
329+ params .append ((prm_template , value ))
330+
331+ offset += value_size
332+
333+ return params
334+
335+
336+ def params_to_json (params : list [tuple [PrmTemplate , Any ]]) -> dict :
337+ """Convert a list of (PrmTemplate, value) tuples to JSON format.
338+
339+ The output format matches the input format expected by fprime-prm-write:
340+ {
341+ "componentName": {
342+ "paramName": value,
343+ ...
344+ },
345+ ...
346+ }
347+
348+ Complex types from to_jsonable() are converted to simple format that
349+ instantiate_prm_type() expects for round-trip compatibility.
350+
351+ Args:
352+ params: List of (PrmTemplate, value) tuples
353+
354+ Returns:
355+ Dictionary in the JSON format used by fprime-prm-write
356+ """
357+ def to_encoder_format (value ):
358+ """Convert to_jsonable() output to format expected by instantiate_prm_type()."""
359+ if value is None :
360+ return None
361+
362+ # Handle lists recursively
363+ if isinstance (value , list ):
364+ return [to_encoder_format (v ) for v in value ]
365+
366+ # Only process dicts from here
367+ if not isinstance (value , dict ):
368+ return value
369+
370+ # Array: {"values": [...]} -> [...]
371+ if "values" in value and isinstance (value .get ("values" ), list ):
372+ return [to_encoder_format (v ) for v in value ["values" ]]
373+
374+ # Any dict with "value" key (primitive wrapper or struct member) -> extract value
375+ if "value" in value :
376+ return to_encoder_format (value ["value" ])
377+
378+ # Plain dict (struct without metadata): recursively process all fields
379+ return {k : to_encoder_format (v ) for k , v in value .items ()}
380+
381+ result = {}
382+
383+ for prm_template , value in params :
384+ comp_name = prm_template .comp_name
385+ prm_name = prm_template .prm_name
386+
387+ # Create component entry if it doesn't exist
388+ if comp_name not in result :
389+ result [comp_name ] = {}
390+
391+ # Add parameter to component with encoder-compatible format
392+ result [comp_name ][prm_name ] = to_encoder_format (value )
393+
394+ return result
395+
396+
397+ def params_to_text (params : list [tuple [PrmTemplate , Any ]]) -> str :
398+ """Convert a list of (PrmTemplate, value) tuples to human-readable text format.
399+
400+ Args:
401+ params: List of (PrmTemplate, value) tuples
402+
403+ Returns:
404+ Human-readable text string
405+ """
406+ lines = []
407+ current_component = None
408+
409+ for prm_template , value in params :
410+ comp_name = prm_template .comp_name
411+ prm_name = prm_template .prm_name
412+ prm_id = prm_template .prm_id
413+ type_name = prm_template .prm_type_obj .__name__ .replace ("Type" , "" )
414+
415+ # Add component header if this is a new component
416+ if comp_name != current_component :
417+ if current_component is not None :
418+ lines .append ("" ) # Blank line between components
419+ lines .append (f"Component: { comp_name } " )
420+ current_component = comp_name
421+
422+ # Format the value
423+ if isinstance (value , str ):
424+ value_str = f'"{ value } "'
425+ elif isinstance (value , (list , dict )):
426+ value_str = js .dumps (value )
427+ else :
428+ value_str = str (value )
429+
430+ lines .append (f" { prm_name } = { value_str } (type: { type_name } , id: { prm_id } )" )
431+
432+ return "\n " .join (lines )
433+
434+
435+ def params_to_csv (params : list [tuple [PrmTemplate , Any ]]) -> str :
436+ """Convert a list of (PrmTemplate, value) tuples to CSV format.
437+
438+ Args:
439+ params: List of (PrmTemplate, value) tuples
440+
441+ Returns:
442+ CSV string with columns: Component,Parameter,Value,Type,ID
443+ """
444+ lines = []
445+ lines .append ("Component,Parameter,Value,Type,ID" )
446+
447+ for prm_template , value in params :
448+ comp_name = prm_template .comp_name
449+ prm_name = prm_template .prm_name
450+ prm_id = prm_template .prm_id
451+ type_name = prm_template .prm_type_obj .__name__ .replace ("Type" , "" )
452+
453+ # Format the value for CSV
454+ # For complex types (arrays, structs), convert to JSON string
455+ if isinstance (value , (list , dict )):
456+ value_str = js .dumps (value )
457+ elif isinstance (value , str ):
458+ # Escape quotes in strings
459+ value_str = value .replace ('"' , '""' )
460+ else :
461+ value_str = str (value )
462+
463+ # Escape any commas or quotes in the value
464+ if ',' in value_str or '"' in value_str or '\n ' in value_str :
465+ value_str = f'"{ value_str } "'
466+
467+ lines .append (f"{ comp_name } ,{ prm_name } ,{ value_str } ,{ type_name } ,{ prm_id } " )
468+
469+ return "\n " .join (lines )
470+
471+
472+ def main_decode ():
473+ """CLI entry point for fprime-prm-decode (decoding).
474+
475+ Decodes binary parameter database (.dat) files into human-readable formats.
476+ This is the inverse operation of fprime-prm-write.
477+ """
478+ arg_parser = ArgumentParser ()
479+
480+ arg_parser .add_argument (
481+ "dat_file" , type = Path , help = "The .dat file to decode" , default = None
482+ )
483+ arg_parser .add_argument (
484+ "--dictionary" ,
485+ "-d" ,
486+ type = Path ,
487+ help = "The dictionary file of the FSW" ,
488+ required = True ,
489+ )
490+ arg_parser .add_argument ("--format" , "-f" , type = str , choices = ["json" , "text" , "csv" ], default = "json" , help = "Output format (default: json)" )
491+ arg_parser .add_argument ("--output" , "-o" , type = Path , help = "The output file" , default = None )
492+
493+
494+ args = arg_parser .parse_args ()
495+
496+ if args .dat_file is None or not args .dat_file .exists ():
497+ print ("Unable to find" , args .dat_file )
498+ exit (1 )
499+
500+ if args .dat_file .is_dir ():
501+ print ("dat-file is a dir" , args .dat_file )
502+ exit (1 )
503+
504+ if not args .dictionary .exists ():
505+ print ("Unable to find" , args .dictionary )
506+ exit (1 )
507+
508+ output_format = args .format
509+
510+ # determine output path
511+ if args .output is None :
512+ output_path = args .dat_file .with_suffix ("." + output_format )
513+ else :
514+ output_path = args .output
515+
516+ print ("Decoding" , args .dat_file , "to" , output_path , "(format: ." + output_format + ")" )
517+ output_path .parent .mkdir (parents = True , exist_ok = True )
518+
519+ # Load dictionary
520+ dict_parser = PrmJsonLoader (str (args .dictionary .resolve ()))
521+ id_dict , name_dict , versions = dict_parser .construct_dicts (
522+ str (args .dictionary .resolve ())
523+ )
524+
525+ # Read and decode .dat file
526+ dat_bytes = args .dat_file .read_bytes ()
527+ params = decode_dat_to_params (dat_bytes , id_dict )
528+
529+ # Format output based on requested format
530+ if output_format == "json" :
531+ output_data = params_to_json (params )
532+ output_content = js .dumps (output_data , indent = 4 )
533+ elif output_format == "text" :
534+ output_content = params_to_text (params )
535+ elif output_format == "csv" :
536+ output_content = params_to_csv (params )
537+ else :
538+ raise RuntimeError ("Invalid output format " + str (output_format ))
539+
540+ # Write output
541+ print ("Done, writing to" , output_path .resolve ())
542+ output_path .write_text (output_content )
543+
544+
249545if __name__ == "__main__" :
250- main ()
546+ # This file was originally created to encode parameter database files
547+ # Keep this backwards compatibility
548+ main_encode ()
0 commit comments