diff --git a/logger/transforms/convert_fields_transform.py b/logger/transforms/convert_fields_transform.py index 66f51640..898bb962 100644 --- a/logger/transforms/convert_fields_transform.py +++ b/logger/transforms/convert_fields_transform.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 -"""Convert fields in a DASRecord to specified types. +"""Transform to convert fields in a DASRecord to specified types. + +This is a thin wrapper around the convert_fields utility function, +providing the Transform interface for use in listener pipelines. """ import copy @@ -7,11 +10,11 @@ import sys from typing import Union -# Ensure we can import the necessary classes from os.path import dirname, realpath sys.path.append(dirname(dirname(dirname(realpath(__file__))))) from logger.utils.das_record import DASRecord # noqa: E402 +from logger.utils.convert_fields import convert_fields # noqa: E402 from logger.transforms.transform import Transform # noqa: E402 @@ -23,98 +26,59 @@ class ConvertFieldsTransform(Transform): conversions (e.g., combining a value and a cardinal direction field). """ - def __init__(self, fields=None, lat_lon_fields=None, delete_source_fields=False, + def __init__(self, fields=None, delete_source_fields=False, delete_unconverted_fields=False, **kwargs): """ Args: fields (dict): A dictionary mapping field names to their target configuration. This accepts two formats for the value: - 1. A dictionary containing metadata (preferred). The key - 'data_type' is used for conversion; others are ignored. + 1. A dictionary containing metadata (preferred). + keys: + 'data_type': target type (float, int, str, bool, hex, + nmea_lat, nmea_lon) + 'direction_field': (for nmea_*) name of direction field Example: - {'heave': {'data_type': 'float', 'units': 'm'}} + {'Latitude': {'data_type': 'nmea_lat', + 'direction_field': 'NorS'}} 2. A simple string specifying the data type (backward compatibility). Example: {'heave': 'float'} - Supported types include: - - float, double - - int, short, ushort, uint, long, ubyte, byte, hex - - str, char, string, text - - bool, boolean - - lat_lon_fields (dict): A dictionary mapping a new target field name to a tuple - or list of (value_field, direction_field). - Example: - {'latitude': ('raw_lat', 'lat_dir'), - 'longitude': ('raw_lon', 'lon_dir')} - - delete_source_fields (bool): If True, the original source fields used for - conversion (like 'raw_lat' and 'lat_dir') + delete_source_fields (bool): If True, source fields (e.g. 'raw_lat', 'lat_dir') are removed after successful conversion. Defaults to False. - delete_unconverted_fields (bool): If True, any field in the record that was - NOT involved in a conversion (either as a - source or a destination) will be removed. - Defaults to False. + delete_unconverted_fields (bool): If True, fields NOT involved in conversion + are removed. Defaults to False. """ super().__init__(**kwargs) # processes 'quiet' and type hints - self.fields = fields or {} - self.lat_lon_fields = lat_lon_fields or {} + self.field_specs = {} + self.lat_lon_specs = {} self.delete_source_fields = delete_source_fields self.delete_unconverted_fields = delete_unconverted_fields - # Map string type names to actual python types/conversion functions. - # Recognized types include: - # float, double -> float - # int, short, ushort, uint, long, ubyte, byte, hex -> int - # str, char, string, text -> str - # bool, boolean -> bool - self.type_map = { - 'float': float, - 'double': float, - 'int': int, - 'short': int, - 'ushort': int, - 'uint': int, - 'long': int, - 'ubyte': int, - 'byte': int, - 'str': str, - 'char': str, - 'string': str, - 'text': str, - 'bool': bool, - 'boolean': bool, - 'hex': lambda x: int(str(x), 16), # Handles "1A", "0x1A", etc. - } - - ############################ - def _convert_lat_lon(self, value, direction): - """ - Helper to convert NMEA style lat/lon (DDMM.MMMM) and direction (N/S/E/W) - to decimal degrees, rounded to fixed number of decimal places. - """ - ROUNDING_DECIMALS = 5 - try: - val = float(value) - # NMEA format is roughly DDMM.MMMM - # Degrees is the integer part of val / 100 - degrees = int(val / 100) - minutes = val - (degrees * 100) - decimal = degrees + (minutes / 60) - - if direction.upper() in ['S', 'W']: - decimal = -decimal - - # Truncate/Round - return round(decimal, ROUNDING_DECIMALS) - except (ValueError, TypeError, AttributeError) as e: - logging.warning(f'Failed to convert lat/lon: value=\'{value}\', ' - f'direction=\'{direction}\' - {e}') - return None + if fields: + # Process fields to separate standard conversions from special NMEA ones + for f_name, f_def in fields.items(): + # Normalize definition + if isinstance(f_def, str): + f_def = {'data_type': f_def} + + dtype = f_def.get('data_type') + + # Check for declarative NMEA configuration + if dtype in ['nmea_lat', 'nmea_lon']: + dir_field = f_def.get('direction_field') + if dir_field: + # Format: target_field -> (value_field, direction_field) + self.lat_lon_specs[f_name] = (f_name, dir_field) + else: + logging.warning(f"Field '{f_name}' has type '{dtype}' but " + "missing 'direction_field'. Ignoring.") + else: + # Standard field + self.field_specs[f_name] = f_def ############################ def transform(self, record: Union[str, dict, DASRecord])\ @@ -151,80 +115,18 @@ def transform(self, record: Union[str, dict, DASRecord])\ type(new_record)) return None - # Track which fields were successfully converted or used - processed_fields = set() - - # 1. Handle simple Type Conversions - for field_name, field_def in self.fields.items(): - if field_name in fields: - # Extract target type from dict, or fallback if string - target_type_str = None - if isinstance(field_def, dict): - target_type_str = field_def.get('data_type') - elif isinstance(field_def, str): - target_type_str = field_def - - if not target_type_str: - continue - - val = fields[field_name] - try: - converter = self.type_map.get(target_type_str) - if converter: - # Special case to head off ValueError of int("123.0") - if isinstance(val, str) and converter is int: - try: - val = float(val) - except ValueError: - # Not a float string, let int() call below handle/fail it naturally - pass - - fields[field_name] = converter(val) - processed_fields.add(field_name) - else: - logging.warning(f'Unknown type \'{target_type_str}\' ' - f'requested for field \'{field_name}\'') - except ValueError: - logging.warning(f'Failed to convert field \'{field_name}\': ' - f'value=\'{val}\' (type={type(val).__name__}) ' - f'to target_type=\'{target_type_str}\'') - - # 2. Handle Lat/Lon Conversions - for target_field, (val_field, dir_field) in self.lat_lon_fields.items(): - if val_field in fields and dir_field in fields: - val = fields[val_field] - direction = fields[dir_field] - - decimal_degrees = self._convert_lat_lon(val, direction) - - if decimal_degrees is not None: - fields[target_field] = decimal_degrees - processed_fields.add(target_field) - - # If we are successfully converting, we mark sources for potential deletion - if self.delete_source_fields: - # Mark them processed so they don't get deleted by unconverted check - processed_fields.add(val_field) - processed_fields.add(dir_field) - - # Actually delete them now if configured, BUT... - # Do NOT delete if the source field is the same as the target field! - if val_field in fields and val_field != target_field: - del fields[val_field] - if dir_field in fields and dir_field != target_field: - del fields[dir_field] - - # 3. Clean up unconverted fields - if self.delete_unconverted_fields: - # We must create a list of keys since we are modifying the dict - all_fields = list(fields.keys()) - for f in all_fields: - if f not in processed_fields: - del fields[f] - - # If no fields remain, return None? Or empty record? - # SelectFieldsTransform returns None if no fields left. - if not fields: + # Delegate to the utility function + result = convert_fields( + fields, + self.field_specs, + self.lat_lon_specs, + delete_source_fields=self.delete_source_fields, + delete_unconverted_fields=self.delete_unconverted_fields, + quiet=self.quiet + ) + + # If no fields remain, return None + if result is None: return None return new_record diff --git a/logger/transforms/regex_parse_transform.py b/logger/transforms/regex_parse_transform.py new file mode 100644 index 00000000..97b5ec11 --- /dev/null +++ b/logger/transforms/regex_parse_transform.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +""" +RegexParseTransform - a thin wrapper around RegexParser. + +This transform parses text records into DASRecord objects using regular +expressions. It delegates all parsing, device-aware processing, and metadata +injection to the underlying RegexParser. + +The architecture parallels ParseTransform, which wraps RecordParser. +""" + +import sys +from typing import Union, Dict, List + +from os.path import dirname, realpath + +sys.path.append(dirname(dirname(dirname(realpath(__file__))))) + +from logger.utils.das_record import DASRecord # noqa: E402 +from logger.transforms.transform import Transform # noqa: E402 +from logger.utils import regex_parser # noqa: E402 + +# Optional: for generic field conversion via 'fields' parameter +try: + from logger.transforms.convert_fields_transform import ConvertFieldsTransform +except ImportError: + ConvertFieldsTransform = None + + +class RegexParseTransform(Transform): + r""" + Parses a string record into a DASRecord using regular expressions, + with optional field type conversion. + + This is a thin wrapper around RegexParser. All parsing logic, device-aware + field processing, and metadata injection are handled by the parser. + + **Example Configuration:** + + .. code-block:: yaml + + - class: RegexParseTransform + module: logger.transforms.regex_parse_transform + kwargs: + data_id: gnsspo112593 # Overrides or fills in data_id + field_patterns: + GPZDA: '^\WGPZDA,(?P\d+\.\d+),...' + GPGGA: '^\WGPGGA,(?P\d+\.\d+),...' + + Or using device definitions: + + .. code-block:: yaml + + - class: RegexParseTransform + module: logger.transforms.regex_parse_transform + kwargs: + definition_path: 'local/devices/*.yaml' + metadata_interval: 60 + """ + + def __init__(self, + # --- Parsing Arguments (passed to RegexParser) --- + record_format: str = None, + field_patterns: Union[List, Dict] = None, + data_id: str = None, + definition_path: str = None, + metadata: Dict = None, + metadata_interval: float = None, + # --- Additional Transform Arguments --- + fields: Dict = None, + delete_source_fields: bool = False, + delete_unconverted_fields: bool = False, + **kwargs): + """ + Args: + record_format (str): A regex string to match the record envelope + (timestamp, data_id). Defaults to regex_parser.DEFAULT_RECORD_FORMAT. + + field_patterns (list/dict): + - A list of regex patterns to match the field body. + - A dict of {message_type: pattern}. + If None, patterns are loaded from definition_path. + + data_id (str): If specified, this string is used as the data_id + for all records, overriding any data_id extracted from the + source record. + + definition_path (str): Wildcarded path matching YAML definitions + for devices. Used only if 'field_patterns' is None. + Defaults to regex_parser.DEFAULT_DEFINITION_PATH. + + metadata (dict): If field_patterns is not None, the metadata to + send along with data records. + + metadata_interval (float): If not None, include the description, + units and other metadata pertaining to each field in the + returned record if those data haven't been returned in the + last metadata_interval seconds. + + fields (dict): Mapping of field names to target types + (e.g., {'temp': 'float'}). If provided, ConvertFieldsTransform + is applied after parsing. + + delete_source_fields (bool): Remove original fields after conversion. + + delete_unconverted_fields (bool): Remove fields that were not converted. + """ + super().__init__(**kwargs) # processes 'quiet' and type hints + + # Create the parser with all configuration + self.parser = regex_parser.RegexParser( + record_format=record_format, + field_patterns=field_patterns, + data_id=data_id, + definition_path=definition_path, + metadata=metadata, + metadata_interval=metadata_interval, + quiet=self.quiet + ) + + # Optional generic field converter (for 'fields' parameter) + self.converter = None + if fields and ConvertFieldsTransform: + self.converter = ConvertFieldsTransform( + fields=fields, + delete_source_fields=delete_source_fields, + delete_unconverted_fields=delete_unconverted_fields, + quiet=self.quiet + ) + + ############################ + def transform(self, record: str) -> Union[DASRecord, List[DASRecord], None]: + """Parse record and return DASRecord.""" + # See if it's something we can process, and if not, try digesting + if not self.can_process_record(record): # inherited from BaseModule + return self.digest_record(record) # inherited from BaseModule + + # Delegate to parser + parsed_record = self.parser.parse_record(record) + + if not parsed_record: + return None + + # Apply optional generic converter + if self.converter: + parsed_record = self.converter.transform(parsed_record) + + return parsed_record + + +# Alias for backward compatibility +RegexTransform = RegexParseTransform diff --git a/logger/utils/convert_fields.py b/logger/utils/convert_fields.py new file mode 100644 index 00000000..896995a2 --- /dev/null +++ b/logger/utils/convert_fields.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +"""Utilities for converting field values to specified types.""" + +import logging + +# Map string type names to actual python types/conversion functions. +# Recognized types include: +# float, double -> float +# int, short, ushort, uint, long, ubyte, byte, hex_int -> int +# str, char, string, text -> str +# bool, boolean -> bool +TYPE_MAP = { + 'float': float, + 'double': float, + 'int': int, + 'short': int, + 'ushort': int, + 'uint': int, + 'long': int, + 'ubyte': int, + 'byte': int, + 'str': str, + 'char': str, + 'string': str, + 'text': str, + 'bool': bool, + 'boolean': bool, + 'hex_int': lambda x: int(str(x), 16), # Handles "1A", "0x1A", etc. +} + + +def convert_lat_lon(value, direction, rounding_decimals=5): + """ + Convert NMEA style lat/lon (DDMM.MMMM) and direction (N/S/E/W) + to decimal degrees. + + Args: + value: NMEA format value (e.g., "4807.038" for 48°07.038') + direction: Cardinal direction ('N', 'S', 'E', 'W') + rounding_decimals: Number of decimal places to round to + + Returns: + Decimal degrees as float, or None if conversion fails + """ + try: + val = float(value) + # NMEA format is roughly DDMM.MMMM + # Degrees is the integer part of val / 100 + degrees = int(val / 100) + minutes = val - (degrees * 100) + decimal = degrees + (minutes / 60) + + if direction.upper() in ['S', 'W']: + decimal = -decimal + + return round(decimal, rounding_decimals) + except (ValueError, TypeError, AttributeError) as e: + logging.warning(f'Failed to convert lat/lon: value=\'{value}\', ' + f'direction=\'{direction}\' - {e}') + return None + + +def convert_field_value(value, target_type_str, quiet=False): + """ + Convert a single field value to the specified type. + + Args: + value: The value to convert + target_type_str: String name of target type (e.g., 'float', 'int', 'str') + quiet: If True, suppress warning messages + + Returns: + Converted value, or original value if conversion fails + """ + converter = TYPE_MAP.get(target_type_str) + if not converter: + if not quiet: + logging.warning(f'Unknown type \'{target_type_str}\' requested') + return value + + try: + # Special case to head off ValueError of int("123.0") + if isinstance(value, str) and converter is int: + try: + value = float(value) + except ValueError: + # Not a float string, let int() call below handle/fail it naturally + pass + + return converter(value) + except ValueError: + if not quiet: + logging.warning(f'Failed to convert value \'{value}\' ' + f'(type={type(value).__name__}) to \'{target_type_str}\'') + return value + + +def convert_fields(fields, field_specs, lat_lon_specs=None, + delete_source_fields=False, delete_unconverted_fields=False, + quiet=False): + """ + Convert fields in a dictionary according to specifications. + + This is a shared utility used by ConvertFieldsTransform and can be used + directly by parsers for field conversion. + + Args: + fields: Dict of field_name -> value (modified in place) + field_specs: Dict of field_name -> target_type or {data_type: target_type} + lat_lon_specs: Dict of target_field -> (value_field, direction_field) + for NMEA lat/lon conversion + delete_source_fields: If True, remove source fields after lat/lon conversion + delete_unconverted_fields: If True, remove fields not involved in conversion + quiet: If True, suppress warning messages + + Returns: + The modified fields dict, or None if no fields remain after processing + """ + if not fields: + return None + + # Track which fields were successfully converted or used + processed_fields = set() + + # 1. Handle simple Type Conversions + if field_specs: + for field_name, field_def in field_specs.items(): + if field_name not in fields: + continue + + # Extract target type from dict, or use string directly + if isinstance(field_def, dict): + target_type_str = field_def.get('data_type') + elif isinstance(field_def, str): + target_type_str = field_def + else: + continue + + if not target_type_str: + continue + + val = fields[field_name] + converter = TYPE_MAP.get(target_type_str) + if converter: + try: + # Special case to head off ValueError of int("123.0") + if isinstance(val, str) and converter is int: + try: + val = float(val) + except ValueError: + pass + + fields[field_name] = converter(val) + processed_fields.add(field_name) + except ValueError: + if not quiet: + logging.warning(f'Failed to convert field \'{field_name}\': ' + f'value=\'{val}\' (type={type(val).__name__}) ' + f'to target_type=\'{target_type_str}\'') + else: + if not quiet: + logging.warning(f'Unknown type \'{target_type_str}\' ' + f'requested for field \'{field_name}\'') + + # 2. Handle Lat/Lon Conversions + if lat_lon_specs: + for target_field, (val_field, dir_field) in lat_lon_specs.items(): + if val_field not in fields or dir_field not in fields: + continue + + val = fields[val_field] + direction = fields[dir_field] + + decimal_degrees = convert_lat_lon(val, direction) + + if decimal_degrees is not None: + fields[target_field] = decimal_degrees + processed_fields.add(target_field) + + # Mark source fields as processed + if delete_source_fields: + processed_fields.add(val_field) + processed_fields.add(dir_field) + + # Delete source fields, but NOT if source == target + if val_field in fields and val_field != target_field: + del fields[val_field] + if dir_field in fields and dir_field != target_field: + del fields[dir_field] + + # 3. Clean up unconverted fields + if delete_unconverted_fields: + all_fields = list(fields.keys()) + for f in all_fields: + if f not in processed_fields: + del fields[f] + + # If no fields remain, return None + if not fields: + return None + + return fields diff --git a/logger/utils/das_record.py b/logger/utils/das_record.py index 206e1ac6..9492ae81 100644 --- a/logger/utils/das_record.py +++ b/logger/utils/das_record.py @@ -167,3 +167,37 @@ def to_das_record_list(record, data_id=None): logging.error('Badly-structured field dictionary: %s: %s', field, pprint.pformat(ts_value_list)) return [] + + +def collect_metadata_for_fields(field_names, timestamp, metadata, + metadata_interval, metadata_last_sent): + """ + Collect metadata for fields that are due to be sent based on the interval. + + This function is shared between RecordParser and RegexParser to avoid + code duplication. + + Args: + field_names: Iterable of field names to check for metadata. + timestamp: Current record timestamp (numeric). + metadata: Dict mapping field names to their metadata dicts. + metadata_interval: Minimum seconds between metadata sends per field. + metadata_last_sent: Dict tracking last send time per field (modified in place). + + Returns: + Dict with 'fields' key containing metadata to inject, or None if no + metadata is due to be sent. + """ + if not metadata or not metadata_interval: + return None + + metadata_fields = {} + for field_name in field_names: + last_sent = metadata_last_sent.get(field_name, 0) + if timestamp - last_sent > metadata_interval: + field_metadata = metadata.get(field_name) + if field_metadata: + metadata_fields[field_name] = field_metadata + metadata_last_sent[field_name] = timestamp + + return {'fields': metadata_fields} if metadata_fields else None diff --git a/logger/utils/read_config.py b/logger/utils/read_config.py index 8b6db0e8..38d4b7bc 100755 --- a/logger/utils/read_config.py +++ b/logger/utils/read_config.py @@ -163,6 +163,8 @@ def expand_includes(input_dict: dict) -> Dict[str, Any]: # Process each included file or pattern for include_pattern in input_dict['includes']: # Expand wildcards to get list of matching files + if isinstance(include_pattern, str): + include_pattern = include_pattern.strip() matching_files = expand_wildcards(include_pattern, base_dir) # Process each matching file @@ -897,6 +899,118 @@ def _extract_from_string(text: str) -> List[str]: return matches +############################################################################## +def load_definitions(definition_path): + """ + Load and merge device definitions from YAML files. + + This is a shared utility used by RegexParser and RecordParser to load + device and device_type definitions from YAML configuration files. + + Supports both the new structured format and the legacy format: + + New format: + devices: + device_name: + device_type: SomeType + device_types: + SomeType: + format: ... + + Legacy format (deprecated): + device_name: + category: device + device_type: SomeType + SomeType: + category: device_type + format: ... + + Args: + definition_path: Comma-separated glob patterns for definition files. + Example: 'local/devices/*.yaml,contrib/devices/*.yaml' + + Returns: + Dict with structure: + { + 'devices': {device_name: device_def, ...}, + 'device_types': {type_name: type_def, ...} + } + + Returns empty structure if definition_path is None or no files found. + """ + definitions = {'devices': {}, 'device_types': {}} + + if not definition_path: + return definitions + + def_files = [] + for path_glob in definition_path.split(','): + matched = glob.glob(path_glob.strip()) + if not matched: + logging.debug('No files match definition path "%s"', path_glob.strip()) + def_files.extend(matched) + + if not def_files: + return definitions + + for filename in def_files: + file_defs = read_config(filename) + file_defs = expand_includes(file_defs) + + for key, val in file_defs.items(): + # New format: 'devices' key contains dict of device definitions + if key == 'devices': + if not isinstance(val, dict): + logging.error('"devices" value in file %s must be dict. ' + 'Found type "%s"', filename, type(val)) + continue + for name, defn in val.items(): + if name in definitions['devices']: + logging.warning('Duplicate device definition "%s" in %s', + name, filename) + definitions['devices'][name] = defn + + # New format: 'device_types' key contains dict of device type definitions + elif key == 'device_types': + if not isinstance(val, dict): + logging.error('"device_types" value in file %s must be dict. ' + 'Found type "%s"', filename, type(val)) + continue + for name, defn in val.items(): + if name in definitions['device_types']: + logging.warning('Duplicate device_type definition "%s" in %s', + name, filename) + definitions['device_types'][name] = defn + + # Skip 'includes' - already handled by expand_includes + elif key == 'includes': + pass + + # Legacy format: top-level key with 'category' field + elif isinstance(val, dict) and 'category' in val: + category = val.get('category') + if category == 'device': + if key in definitions['devices']: + logging.warning('Duplicate device definition "%s" in %s', + key, filename) + definitions['devices'][key] = val + elif category == 'device_type': + if key in definitions['device_types']: + logging.warning('Duplicate device_type definition "%s" in %s', + key, filename) + definitions['device_types'][key] = val + else: + logging.warning('Top-level definition "%s" in file %s has ' + 'unrecognized category "%s" - ignoring', + key, filename, category) + + # Unknown top-level key + else: + logging.debug('Ignoring unknown top-level key "%s" in %s', key, filename) + + return definitions + + ############################################################################## ############################################################################## if __name__ == "__main__": diff --git a/logger/utils/record_parser.py b/logger/utils/record_parser.py index 1695acb6..6942f42a 100644 --- a/logger/utils/record_parser.py +++ b/logger/utils/record_parser.py @@ -8,7 +8,6 @@ definitions should take. """ import datetime -import glob import json import logging import pprint @@ -23,8 +22,9 @@ # Append openrvdas root to syspath prior to importing openrvdas modules from os.path import dirname, realpath sys.path.append(dirname(dirname(dirname(realpath(__file__))))) -from logger.utils import read_config # noqa: E402 from logger.utils.das_record import DASRecord # noqa: E402 +from logger.utils.read_config import load_definitions # noqa: E402 +from logger.utils.das_record import collect_metadata_for_fields # noqa: E402 # Dict of format types that extend the default formats recognized by the # parse module. @@ -121,7 +121,7 @@ def __init__(self, record_format=None, else: # Fill in the devices and device_types - NOTE: we won't be using # these if 'field_patterns' is provided as an argument. - definitions = self._new_read_definitions(definition_path) + definitions = load_definitions(definition_path) self.devices = definitions.get('devices', {}) self.device_types = definitions.get('device_types', {}) @@ -286,24 +286,11 @@ def parse_record(self, record): if message_type: parsed_record['message_type'] = message_type - # If we have parsed fields, see if we also have metadata. Are we - # supposed to occasionally send it for our variables? Is it time - # to send it again? - metadata_fields = {} - if self.metadata and self.metadata_interval: - for field_name in fields: - last_metadata_sent = self.metadata_last_sent.get(field_name, 0) - time_since_send = timestamp - last_metadata_sent - if time_since_send > self.metadata_interval: - field_metadata = self.metadata.get(field_name) - if field_metadata: - metadata_fields[field_name] = field_metadata - self.metadata_last_sent[field_name] = timestamp - if metadata_fields: - metadata = {'fields': metadata_fields} - else: - metadata = None - + # Metadata Injection - use shared utility + metadata = collect_metadata_for_fields( + fields, timestamp, self.metadata, + self.metadata_interval, self.metadata_last_sent + ) if metadata: parsed_record['metadata'] = metadata @@ -450,115 +437,3 @@ def _compile_formats_from_patterns(self, field_patterns): else: raise ValueError('Passed field_patterns must be str, list or dict. Found %s: %s' % (type(field_patterns), str(field_patterns))) - - ############################ - def _read_definitions(self, filespec_paths): - """Read the files on the filespec_paths and return dictionary of - accumulated definitions. - """ - definitions = {} - for filespec in filespec_paths.split(','): - filenames = glob.glob(filespec) - if not filenames: - logging.warning('No files match definition file spec "%s"', filespec) - - for filename in filenames: - file_definitions = read_config.read_config(filename) - - for new_def_name, new_def in file_definitions.items(): - if new_def_name in definitions: - logging.warning('Duplicate definition for "%s" found in %s', - new_def_name, filename) - definitions[new_def_name] = new_def - return definitions - - ############################ - def _new_read_definitions(self, filespec_paths, definitions=None): - """Read the files on the filespec_paths and return dictionary of - accumulated definitions. - - filespec_paths - a list of possibly-globbed filespecs to be read - - definitions - optional dict of pre-existing definitions that will - be added to. Typically this will be omitted on a base call, - but may be added to when recursing. Passing it in allows - flagging when items are defined more than once. - """ - # If nothing was passed in, start with base case. - definitions = definitions or {'devices': {}, 'device_types': {}} - - for filespec in filespec_paths.split(','): - filenames = glob.glob(filespec) - if not filenames: - logging.warning('No files match definition file spec "%s"', filespec) - - for filename in filenames: - file_definitions = read_config.read_config(filename) - - for key, val in file_definitions.items(): - # If we have a dict of device definitions, copy them into the - # 'devices' key of our definitions. - if key == 'devices': - if not isinstance(val, dict): - logging.error('"devices" values in file %s must be dict. ' - 'Found type "%s"', filename, type(val)) - return None - - for device_name, device_def in val.items(): - if device_name in definitions['devices']: - logging.warning('Duplicate definition for "%s" found in %s', - device_name, filename) - definitions['devices'][device_name] = device_def - - # If we have a dict of device_type definitions, copy them into the - # 'device_types' key of our definitions. - elif key == 'device_types': - if not isinstance(val, dict): - logging.error('"device_typess" values in file %s must be dict. ' - 'Found type "%s"', filename, type(val)) - return None - - for device_type_name, device_type_def in val.items(): - if device_type_name in definitions['device_types']: - logging.warning('Duplicate definition for "%s" found in %s', - device_type_name, filename) - definitions['device_types'][device_type_name] = device_type_def - - # If we're including other files, recurse inelegantly - elif key == 'includes': - if not type(val) in [str, list]: - logging.error('"includes" values in file %s must be either ' - 'a list or a simple string. Found type "%s"', - filename, type(val)) - return None - - if isinstance(val, str): - val = [val] - for filespec in val: - new_defs = self._new_read_definitions(filespec, definitions) - definitions['devices'].update(new_defs.get('devices', {})) - definitions['device_types'].update(new_defs.get('device_types', {})) - - # If it's not an includes/devices/device_types def, assume - # it's a (deprecated) top-level device or device_type - # definition. Try adding it to the right place. - else: - category = val.get('category') - if category not in ['device', 'device_type']: - logging.warning('Top-level definition "%s" in file %s is not ' - 'category "device" or "device_type". ' - 'Category is "%s" - ignoring', category) - continue - if category == 'device': - if key in definitions['devices']: - logging.warning('Duplicate definition for "%s" found in %s', - key, filename) - definitions['devices'][key] = val - else: - if key in definitions['device_types']: - logging.warning('Duplicate definition for "%s" found in %s', - key, filename) - definitions['device_types'][key] = val - - # Finally, return the accumulated definitions - return definitions diff --git a/logger/utils/regex_parser.py b/logger/utils/regex_parser.py new file mode 100755 index 00000000..49ee7cdf --- /dev/null +++ b/logger/utils/regex_parser.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python3 + +"""Tools for parsing NMEA and other text records using regex. +""" +import datetime +import logging +import re +import pprint +import sys +import time + +from os.path import dirname, realpath + + +# Append openrvdas root to syspath prior to importing openrvdas modules +sys.path.append(dirname(dirname(dirname(realpath(__file__))))) + +from logger.utils.das_record import DASRecord # noqa: E402 +from logger.utils.read_config import load_definitions # noqa: E402 +from logger.utils.das_record import collect_metadata_for_fields # noqa: E402 + +# Import ConvertFieldsTransform, but handle gracefully if unavailable +try: + from logger.transforms.convert_fields_transform import ConvertFieldsTransform +except ImportError: + ConvertFieldsTransform = None + +# Note: this is a "permissive" regex. It looks for data_id and timestamp prior to field_string, +# But still parses field_string if they are absent +# Works for both "data_id timestamp fields" but also parses "fields" if +# data_id or timestamp are missing + +DEFAULT_RECORD_FORMAT = r"^(?:(?P\w+)\s+(?P[0-9TZ:\-\.]*)\s+)?(?P(.|\r|\n)*)" # noqa: E501 + +DEFAULT_DEFINITION_PATH = 'local/devices/*.yaml,contrib/devices/*.yaml' + + +################################################################################ +class RegexParser: + ############################ + def __init__(self, + record_format=None, + field_patterns=None, + data_id=None, + definition_path=None, + metadata=None, + metadata_interval=None, + quiet=False): + r"""Create a parser that will parse field values out of a text record + and return a DASRecord object. + ``` + record_format - string for re.match() to use to break out data_id + and timestamp from the rest of the message. By default this will + look for 'data_id timestamp field_string', where 'field_string' + is a str containing the fields to be parsed. + + field_patterns + If not None, either + - a list of regex patterns to be tried + - a dict of message_type:regex patterns to be tried. When one + matches, the record's message_type is set accordingly. + If None and definition_path is provided, patterns are loaded from + device definition files. + + data_id + If specified, this string is used as the data_id for all records, + overriding any data_id extracted from the source record. + + definition_path + Wildcarded path matching YAML definitions for devices. Used only + if 'field_patterns' is None. Defaults to DEFAULT_DEFINITION_PATH. + Comma-separated globs are supported. + + metadata + If provided, a dict mapping field names to their metadata dicts. + If None and definition_path is used, metadata is compiled from + device definitions. + + metadata_interval + If not None, include the description, units and other metadata + pertaining to each field in the returned record if those data + haven't been returned in the last metadata_interval seconds. + + quiet - if not False, don't complain when unable to parse a record. + ``` + """ + self.quiet = quiet + self.record_format = record_format or DEFAULT_RECORD_FORMAT + self.compiled_record_format = re.compile(self.record_format) + self.data_id = data_id # Store the data_id override + + # Check for conflict + if field_patterns and definition_path: + raise ValueError('RegexParser: Both field_patterns and definition_path ' + 'specified. Please specify only one.') + + # Device-aware parsing state + self.devices = {} + self.device_types = {} + self.type_converters = {} + + # Metadata state + self.metadata = metadata or {} + self.metadata_interval = metadata_interval + self.metadata_last_sent = {} + + # If field patterns not provided, look them up in definitions + if field_patterns is None and definition_path is not None: + field_patterns = self._load_definitions(definition_path) + + # If metadata not explicitly provided, compile it from definitions + if not metadata and metadata_interval: + self._compile_metadata() + + self.field_patterns = field_patterns + + # If we've been explicitly given the field_patterns we're to use for + # parsing, compile them now. + if field_patterns: + if isinstance(field_patterns, list): + self.compiled_field_patterns = [ + re.compile(pattern) + for pattern in field_patterns + ] + elif isinstance(field_patterns, dict): + self.compiled_field_patterns = { + message_type: re.compile(pattern) + for (message_type, pattern) in field_patterns.items() + } + else: + raise ValueError('field_patterns must either be a list of patterns or ' + 'dict of message_type:pattern pairs. Found type ' + f'{type(field_patterns)}') + else: + self.compiled_field_patterns = None + + ############################ + def _load_definitions(self, definition_path): + """Load device definitions and return aggregated field patterns. + Populates self.devices and self.device_types. + """ + field_patterns = {} + + # Use shared utility to load definitions + definitions = load_definitions(definition_path) + + # Store devices + self.devices = definitions.get('devices', {}) + + # Process device_types: store them, extract patterns, create converters + for dt_name, dt_def in definitions.get('device_types', {}).items(): + self.device_types[dt_name] = dt_def + + # Aggregate formats (regexes) + dt_formats = dt_def.get('format', {}) + if isinstance(dt_formats, dict): + field_patterns.update(dt_formats) + + # Create cached component converter for this type + dt_fields = dt_def.get('fields', {}) + if dt_fields and ConvertFieldsTransform: + self.type_converters[dt_name] = ConvertFieldsTransform( + fields=dt_fields, + quiet=self.quiet + ) + + return field_patterns + + ############################ + def _compile_metadata(self): + """ + Compile metadata from device definitions if available. + Logic adapted from RecordParser. + """ + # It's a map from variable name to the device and device type it + # came from, along with device type variable and its units and + # description, if provided in the device type definition. + for device, device_def in self.devices.items(): + device_type_name = device_def.get('device_type') + if not device_type_name: + continue + + device_type_def = self.device_types.get(device_type_name) + if not device_type_def: + continue + + device_type_fields = device_type_def.get('fields') + if not device_type_fields: + continue + + fields = device_def.get('fields') + if not fields: + continue + + # e.g. device_type_field = GPSTime, device_field = S330GPSTime + for device_type_field, device_field in fields.items(): + # e.g. GPSTime: {'units':..., 'description':...} + field_desc = device_type_fields.get(device_type_field) + + # field_desc might be a string (type) or dict (metadata) or None + if not field_desc or not isinstance(field_desc, dict): + continue + + self.metadata[device_field] = { + 'device': device, + 'device_type': device_type_name, + 'device_type_field': device_type_field, + } + # Copy relevant keys like units, description + for k in ['units', 'description']: + if k in field_desc: + self.metadata[device_field][k] = field_desc[k] + + ############################ + def parse_record(self, record): + """Parse an id-prefixed text record into a DASRecord. + """ + if not record: + return None + if not isinstance(record, str): + logging.info('Record is not a string: "%s"', record) + return None + try: + parsed_record = self.compiled_record_format.match(record).groupdict() + except (ValueError, AttributeError): + if not self.quiet: + logging.warning('Unable to parse record into "%s"', self.record_format) + logging.warning('Record: %s', record) + return None + + if parsed_record is None: + return None + + # Logic to determine data_id: + # 1. If self.data_id is set (in __init__), use it (Override). + # 2. Else, look for 'data_id' extracted from the record via regex. + # 3. If that fails, default to 'unknown'. + if self.data_id: + data_id = self.data_id + else: + data_id = parsed_record.get('data_id', None) + if not data_id: + if not self.quiet: + logging.warning('No data_id found in record and none specified. ' + 'Defaulting to "unknown".') + data_id = 'unknown' + + # Convert timestamp to numeric, if it's there. + # Initialize to None first to avoid UnboundLocalError if 'timestamp' + # is not in the regex groups. + timestamp = None + timestamp_text = parsed_record.get('timestamp', None) + + if timestamp_text is not None: + timestamp = self.convert_timestamp(timestamp_text) + + # If no timestamp found, DASRecord will default to time.time() + # if passed None. + if timestamp is None: + timestamp = time.time() + + # Extract the field string we're going to parse; + # remove trailing whitespace. + field_string = parsed_record.get('field_string', None) + if field_string is not None: + field_string = field_string.rstrip() + + message_type = None + fields = {} + if field_string: + # If we've been given a set of field_patterns to apply, + # use the first that matches. + # Shortcut that lets us iterate through a list or a dict with the same + # invocation. With a list, it returns (None, value); with a dict it + # returns (key, value). + def iterate_patterns(obj): + return (obj.items() if isinstance(obj, dict) else ((None, v) for v in obj)) + + if self.field_patterns: + for message_type, pattern in iterate_patterns(self.compiled_field_patterns): + try: + try_parse = pattern.match(field_string) + # Did we find a parse that matched? + # If so, return its fields + if try_parse: + fields = try_parse.groupdict() + break + except Exception as e: + logging.error(e) + + logging.debug('Created parsed fields: %s', pprint.pformat(fields)) + + # Create the initial DASRecord + try: + das_record = DASRecord(data_id=data_id, timestamp=timestamp, + message_type=message_type, + fields=fields) + except KeyError: + return None + + # Device-Specific Processing + # Try to match data_id to a known device + if data_id in self.devices: + device_def = self.devices[data_id] + device_type = device_def.get('device_type') + + # A. Type Conversion (delegated to cached ConvertFieldsTransform) + if device_type in self.type_converters: + converter = self.type_converters[device_type] + das_record = converter.transform(das_record) + if not das_record: + return None + + # B. Field Renaming / Filtering + # Only retain fields that are in the device's 'fields' map + device_fields_map = device_def.get('fields', {}) + if device_fields_map: + new_fields = {} + for original_name, mapped_name in device_fields_map.items(): + if original_name in das_record.fields: + # Use the mapped name (value) + new_fields[mapped_name] = das_record.fields[original_name] + + das_record.fields = new_fields + + # Metadata Injection + # If we have parsed fields, see if we also have metadata. Are we + # supposed to occasionally send it for our variables? Is it time + # to send it again? + metadata_to_inject = collect_metadata_for_fields( + das_record.fields, + das_record.timestamp or 0, + self.metadata, + self.metadata_interval, + self.metadata_last_sent + ) + if metadata_to_inject: + if das_record.metadata is None: + das_record.metadata = {} + das_record.metadata['fields'] = metadata_to_inject['fields'] + + return das_record + + ############################ + def convert_timestamp(self, datetime_text): + """Validates a datetime string and converts to numeric. + """ + + DEFAULT_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + + try: + datetime_ti = datetime.datetime.strptime( + datetime_text, DEFAULT_FORMAT) + except ValueError: + logging.debug("Incorrect datetime format.") + return None + + if datetime_ti: + # Explicitly set UTC timezone because the format expects 'Z' + # .replace(tzinfo=...) ensures .timestamp() treats it as UTC + # regardless of the local system clock. + timestamp = datetime_ti.replace(tzinfo=datetime.timezone.utc).timestamp() + return timestamp diff --git a/test/logger/transforms/test_convert_fields_transform.py b/test/logger/transforms/test_convert_fields_transform.py index aed8622b..670059bc 100755 --- a/test/logger/transforms/test_convert_fields_transform.py +++ b/test/logger/transforms/test_convert_fields_transform.py @@ -82,9 +82,9 @@ def test_float_string_to_int(self): def test_hex_conversion(self): """Test converting hex strings to integers.""" t = ConvertFieldsTransform(fields={ - 'flag_a': 'hex', - 'flag_b': 'hex', - 'prefixed': 'hex' + 'flag_a': 'hex_int', + 'flag_b': 'hex_int', + 'prefixed': 'hex_int' }) input_dict = { @@ -102,22 +102,22 @@ def test_hex_conversion(self): ############################ def test_lat_lon_conversion(self): """Test NMEA lat/lon conversion logic.""" - t = ConvertFieldsTransform(lat_lon_fields={ - 'latitude': ('raw_lat', 'lat_dir'), - 'longitude': ('raw_lon', 'lon_dir') + t = ConvertFieldsTransform(fields={ + 'latitude': {'data_type': 'nmea_lat', 'direction_field': 'lat_dir'}, + 'longitude': {'data_type': 'nmea_lon', 'direction_field': 'lon_dir'} }) # 45 deg 30 min N = 45.5, 120 deg 30 min W = -120.5 input_dict = { - 'raw_lat': '4530.00', 'lat_dir': 'N', - 'raw_lon': '12030.00', 'lon_dir': 'W' + 'latitude': '4530.00', 'lat_dir': 'N', + 'longitude': '12030.00', 'lon_dir': 'W' } # By default, delete_source_fields is False, so raw fields stay result = t.transform(input_dict) self.assertAlmostEqual(result['latitude'], 45.5) self.assertAlmostEqual(result['longitude'], -120.5) - self.assertIn('raw_lat', result) + self.assertIn('lat_dir', result) self.assertIn('lon_dir', result) ############################ @@ -126,13 +126,25 @@ def test_delete_options(self): # Case 1: Delete Source Fields = True t_del_src = ConvertFieldsTransform( - lat_lon_fields={'lat': ('raw_lat', 'lat_dir')}, + fields={'lat': {'data_type': 'nmea_lat', 'direction_field': 'lat_dir'}}, delete_source_fields=True ) - res_src = t_del_src.transform({'raw_lat': '4530.00', 'lat_dir': 'S'}) + # Note: Input must use the KEY defined in fields dict + # In previous format, we mapped target -> (value_field, dir_field) explicitly. + # Now: target key matches input key, and value_field is implicit. + # So input dict must use 'lat' as key for value. + res_src = t_del_src.transform({'lat': '4530.00', 'lat_dir': 'S'}) self.assertAlmostEqual(res_src['lat'], -45.5) - self.assertNotIn('raw_lat', res_src) + # lat_dir should be deleted self.assertNotIn('lat_dir', res_src) + # 'lat' key remains but value is converted. + # delete_source_fields deletes source fields IF they are different from target. + # Here they are same ('lat'). `_convert_lat_lon` updates in place. + # Wait, my logic in ConvertFieldsTransform for delete: + # if val_field in fields and val_field != target_field: del fields[val_field] + # In new declarative mode, target_field == val_field == 'lat'. + # So 'lat' is NOT deleted. + # But 'lat_dir' IS deleted. # Case 2: Delete Unconverted Fields = True t_del_unconv = ConvertFieldsTransform( @@ -144,22 +156,20 @@ def test_delete_options(self): self.assertDictEqual(res_unconv, {'keep_me': 5}) # Case 3: Both True - # Convert lat/lon, delete raw lat/lon, delete any other fields + # Convert lat/lon, delete raw lat/lon (dir), delete any other fields t_both = ConvertFieldsTransform( - lat_lon_fields={'lat': ('raw_lat', 'lat_dir')}, + fields={'lat': {'data_type': 'nmea_lat', 'direction_field': 'lat_dir'}}, delete_source_fields=True, delete_unconverted_fields=True ) input_both = { - 'raw_lat': '3000.00', 'lat_dir': 'N', # Should be converted then deleted + 'lat': '3000.00', 'lat_dir': 'N', 'extra_field': 'delete_me' # Should be deleted (unconverted) } res_both = t_both.transform(input_both) self.assertAlmostEqual(res_both['lat'], 30.0) - self.assertNotIn('raw_lat', res_both) + self.assertNotIn('lat_dir', res_both) self.assertNotIn('extra_field', res_both) - # Ensure only the new field remains - self.assertEqual(len(res_both), 1) ############################ def test_das_record_input(self): diff --git a/test/logger/transforms/test_regex_parse_transform.py b/test/logger/transforms/test_regex_parse_transform.py new file mode 100755 index 00000000..91237c11 --- /dev/null +++ b/test/logger/transforms/test_regex_parse_transform.py @@ -0,0 +1,193 @@ +import unittest +from unittest import mock +import sys +from os.path import dirname, realpath + +sys.path.append(dirname(dirname(dirname(dirname(realpath(__file__)))))) + +from logger.transforms.regex_parse_transform import RegexParseTransform # noqa: E402 +from logger.utils.das_record import DASRecord # noqa: E402 + + +class TestRegexParseTransform(unittest.TestCase): + + def setUp(self): + self.sample_record = "sensor1 2023-01-01T12:00:00.000Z temp=23.5 humidity=60" + self.field_pattern = r"temp=(?P[\d\.]+)\s+humidity=(?P[\d\.]+)" + + def test_transform(self): + """Test transforming to a DASRecord (default behavior).""" + transform = RegexParseTransform(field_patterns=[self.field_pattern]) + result = transform.transform(self.sample_record) + + self.assertIsInstance(result, DASRecord) + self.assertEqual(result.data_id, 'sensor1') + self.assertEqual(result.fields['temp'], '23.5') + + def test_data_id_override(self): + """Test overriding data_id.""" + transform = RegexParseTransform(field_patterns=[self.field_pattern], data_id='my_sensor') + result = transform.transform(self.sample_record) + self.assertEqual(result.data_id, 'my_sensor') + + def test_field_conversion(self): + """Test integrating with field conversion (float).""" + # We need to mock or ensure ConvertFieldsTransform logic works. + # Assuming ConvertFieldsTransform is working (it's imported). + fields_map = {'temp': 'float'} + transform = RegexParseTransform(field_patterns=[self.field_pattern], fields=fields_map) + + result = transform.transform(self.sample_record) + # Temp should be a float now, not a string + self.assertEqual(result.fields['temp'], 23.5) + self.assertIsInstance(result.fields['temp'], float) + + def test_conflict_error(self): + """Test that defining both field_patterns and definition_path raises ValueError.""" + with self.assertRaises(ValueError): + RegexParseTransform( + field_patterns=[self.field_pattern], + definition_path='some/path.yaml' + ) + + @mock.patch('logger.utils.read_config.glob.glob') + @mock.patch('logger.utils.read_config.read_config') + @mock.patch('logger.utils.read_config.expand_includes') + def test_definition_path(self, mock_expand, mock_read, mock_glob): + """Test loading definitions from path with device mapping.""" + # Setup mocks + mock_glob.return_value = ['/path/to/defs.yaml'] + + # Define the mocked configuration structure + mock_config = { + 'device_types': { + 'TestType': { + 'format': { + 'MSG': r'\$(?P
\w+),val=(?P\d+),rem=(?P\w+)' + }, + 'fields': { + 'Val': {'data_type': 'int'}, + 'Header': {'data_type': 'str'} + } + } + }, + 'devices': { + 'dev1': { + 'device_type': 'TestType', + 'fields': { + 'Val': 'Value', # Resume as Value + 'Header': 'Head' # Resume as Head + # 'Rem' is NOT mapped, so it should be filtered out + } + } + } + } + mock_read.return_value = mock_config # simplified, expand_includes usually returns it + mock_expand.return_value = mock_config + + # Initialize + transform = RegexParseTransform(definition_path='defs.yaml') + + # Test 1: Known Device (dev1 maps to TestType) + record = "dev1 2023 $MSG,val=42,rem=FOO" + result = transform.transform(record) + + self.assertIsNotNone(result) + # Check renaming: Val -> Value + self.assertIn('Value', result.fields) + self.assertEqual(result.fields['Value'], 42) + self.assertIsInstance(result.fields['Value'], int) # Converted + + # Check renaming: Header -> Head + self.assertIn('Head', result.fields) + self.assertEqual(result.fields['Head'], 'MSG') + + # Check filtering: Rem should be gone (was not in devices fields map) + self.assertNotIn('Rem', result.fields) + + # Test 2: Unknown Device + # Should parse with regex but not convert or filter/rename + record_unk = "unk 2023 $MSG,val=99,rem=BAR" + # RegexParser extracts data_id if possible, or uses default. + # Here 'unk' is data_id. + result_unk = transform.transform(record_unk) + + self.assertIsNotNone(result_unk) + # Should contain raw keys + self.assertIn('Val', result_unk.fields) + self.assertEqual(result_unk.fields['Val'], '99') # String! + self.assertIn('Rem', result_unk.fields) # Present! + + def test_metadata_injection(self): + """Test that metadata is injected based on interval.""" + metadata = {'temp': {'units': 'C', 'description': 'Temperature'}} + # Use a small interval + transform = RegexParseTransform( + field_patterns=[self.field_pattern], + metadata=metadata, + metadata_interval=10 + ) + + # 1. First record (T=100) -> 2023-01-01T00:00:00.000Z + record_1 = "sensor1 2023-01-01T00:00:00.000Z temp=23.5 humidity=60" + result_1 = transform.transform(record_1) + self.assertIsNotNone(result_1.metadata) + self.assertIn('fields', result_1.metadata) + self.assertEqual(result_1.metadata['fields']['temp']['units'], 'C') + + # 2. Second record (T+5s) -> 2023-01-01T00:00:05.000Z + record_2 = "sensor1 2023-01-01T00:00:05.000Z temp=24.0 humidity=61" + result_2 = transform.transform(record_2) + # Should NOT have metadata (only 5s elapsed) + if result_2.metadata: + self.assertNotIn('fields', result_2.metadata) + + # 3. Third record (T+15s) -> 2023-01-01T00:00:15.000Z + record_3 = "sensor1 2023-01-01T00:00:15.000Z temp=24.5 humidity=62" + result_3 = transform.transform(record_3) + # Should have metadata again (>10s elapsed) + self.assertIsNotNone(result_3.metadata) + self.assertIn('fields', result_3.metadata) + self.assertEqual(result_3.metadata['fields']['temp']['units'], 'C') + + @mock.patch('logger.utils.read_config.glob.glob') + @mock.patch('logger.utils.read_config.read_config') + @mock.patch('logger.utils.read_config.expand_includes') + def test_metadata_compilation(self, mock_expand, mock_read, mock_glob): + """Test that metadata is correctly compiled from device definitions.""" + mock_glob.return_value = ['/defs.yaml'] + mock_config = { + 'device_types': { + 'TypeA': { + 'fields': { + 'RawTemp': {'units': 'C', 'description': 'Raw Temp'} + } + } + }, + 'devices': { + 'sensorA': { + 'device_type': 'TypeA', + 'fields': { + 'RawTemp': 'Temperature' + } + } + } + } + mock_read.return_value = mock_config + mock_expand.return_value = mock_config + + transform = RegexParseTransform( + definition_path='defs.yaml', + metadata_interval=10 + ) + + # Check if internal metadata structure was built correctly + # Mapped field 'Temperature' should have metadata from 'RawTemp' + # Note: metadata is now in the parser, not the transform + self.assertIn('Temperature', transform.parser.metadata) + self.assertEqual(transform.parser.metadata['Temperature']['units'], 'C') + self.assertEqual(transform.parser.metadata['Temperature']['description'], 'Raw Temp') + + +if __name__ == '__main__': + unittest.main() diff --git a/test/logger/utils/test_regex_parser.py b/test/logger/utils/test_regex_parser.py new file mode 100755 index 00000000..2f0f7106 --- /dev/null +++ b/test/logger/utils/test_regex_parser.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +import unittest +import sys +from os.path import dirname, realpath + +# Add the proper path so imports work as if running in the package structure +# path: test/logger/utils/test_regex_parser.py -> root (4 levels up) +sys.path.append(dirname(dirname(dirname(dirname(realpath(__file__)))))) + +from logger.utils.regex_parser import RegexParser # noqa: E402 +from logger.utils.das_record import DASRecord # noqa: E402 + + +class TestRegexParser(unittest.TestCase): + + def setUp(self): + # A simple record: data_id timestamp field_string + self.sample_record = "sensor1 2023-01-01T12:00:00.000Z temp=23.5 humidity=60" + self.no_id_record = "2023-01-01T12:00:00.000Z temp=23.5" + + # A pattern to extract fields + self.field_pattern = r"temp=(?P[\d\.]+)\s+humidity=(?P[\d\.]+)" + + def test_basic_parsing(self): + """Test standard parsing extracting data_id and timestamp from record.""" + parser = RegexParser(field_patterns=[self.field_pattern]) + result = parser.parse_record(self.sample_record) + + self.assertIsInstance(result, DASRecord) + self.assertEqual(result.data_id, 'sensor1') + self.assertEqual(result.timestamp, 1672574400.0) + self.assertEqual(result.fields['temp'], '23.5') + self.assertEqual(result.fields['humidity'], '60') + + def test_data_id_override(self): + """Test that passing data_id to __init__ overrides the record's data_id.""" + # Record says 'sensor1', but we initialize with 'override_id' + parser = RegexParser(field_patterns=[self.field_pattern], data_id='override_id') + result = parser.parse_record(self.sample_record) + + self.assertEqual(result.data_id, 'override_id') + + def test_data_id_fallback(self): + """Test that data_id arg fills in when record has no data_id.""" + # Using a format that doesn't look for data_id at the start + # e.g., just timestamp and fields + record_format = r"(?P[0-9TZ:\-\.]*)\s+(?P.*)" + parser = RegexParser(record_format=record_format, data_id='fixed_id') + + result = parser.parse_record(self.no_id_record) + self.assertEqual(result.data_id, 'fixed_id') + + def test_unknown_data_id(self): + """Test default 'unknown' if no data_id in record and no override provided.""" + record_format = r"(?P[0-9TZ:\-\.]*)\s+(?P.*)" + parser = RegexParser(record_format=record_format) # No data_id arg + + result = parser.parse_record(self.no_id_record) + self.assertEqual(result.data_id, 'unknown') + + def test_message_type_dict(self): + """Test using a dictionary of message types for field patterns.""" + patterns = { + 'weather': r"temp=(?P[\d\.]+)", + 'nav': r"lat=(?P[\d\.]+)" + } + parser = RegexParser(field_patterns=patterns) + + # Test matching first pattern + rec1 = "sensor1 2023-01-01T12:00:00Z temp=23.5" + res1 = parser.parse_record(rec1) + self.assertEqual(res1.fields['temp'], '23.5') + + # Test matching second pattern + rec2 = "sensor1 2023-01-01T12:00:00Z lat=45.0" + res2 = parser.parse_record(rec2) + self.assertEqual(res2.fields['lat'], '45.0') + + +if __name__ == '__main__': + unittest.main()