|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Tool to convert YAML files back to binary corpus format.""" |
| 3 | + |
| 4 | +import argparse |
| 5 | +import logging |
| 6 | +import yaml |
| 7 | +import struct |
| 8 | +from pathlib import Path |
| 9 | +from typing import Dict, Any |
| 10 | + |
| 11 | +from curl_fuzzer_tools import common_logging |
| 12 | +from curl_fuzzer_tools.corpus import TLVEncoder |
| 13 | + |
| 14 | +log = logging.getLogger(__name__) |
| 15 | + |
| 16 | + |
| 17 | +def extract_tlv_types_from_header(header_file_path: Path) -> Dict[str, int]: |
| 18 | + """Extract TLV type definitions from curl_fuzzer.h header file and return name->value mapping.""" |
| 19 | + import re |
| 20 | + tlv_types = {} |
| 21 | + |
| 22 | + if not header_file_path.exists(): |
| 23 | + log.warning(f"Header file {header_file_path} not found, using built-in types") |
| 24 | + return {} |
| 25 | + |
| 26 | + with open(header_file_path, "r") as f: |
| 27 | + content = f.read() |
| 28 | + |
| 29 | + # Pattern to match #define TLV_TYPE_NAME value |
| 30 | + pattern = r'#define\s+TLV_TYPE_(\w+)\s+(\d+)' |
| 31 | + |
| 32 | + for match in re.finditer(pattern, content): |
| 33 | + type_name = match.group(1) |
| 34 | + type_value = int(match.group(2)) |
| 35 | + tlv_types[type_name] = type_value |
| 36 | + |
| 37 | + log.info(f"Extracted {len(tlv_types)} TLV type definitions") |
| 38 | + return tlv_types |
| 39 | + |
| 40 | + |
| 41 | +def get_tlv_type_id(type_name: str, tlv_types: Dict[str, int]) -> int: |
| 42 | + """Get the numeric ID for a TLV type name.""" |
| 43 | + if type_name in tlv_types: |
| 44 | + return tlv_types[type_name] |
| 45 | + |
| 46 | + # Handle UNKNOWN_<number> format |
| 47 | + if type_name.startswith("UNKNOWN_"): |
| 48 | + try: |
| 49 | + return int(type_name.split("_", 1)[1]) |
| 50 | + except (ValueError, IndexError): |
| 51 | + pass |
| 52 | + |
| 53 | + raise ValueError(f"Unknown TLV type: {type_name}") |
| 54 | + |
| 55 | + |
| 56 | +def parse_yaml_value(value: Any) -> bytes: |
| 57 | + """Convert a YAML value back to bytes.""" |
| 58 | + if value == "" or value is None: |
| 59 | + return b"" |
| 60 | + |
| 61 | + if isinstance(value, str): |
| 62 | + # Check if it's a hex string |
| 63 | + if all(c in '0123456789abcdefABCDEF' for c in value) and len(value) % 2 == 0: |
| 64 | + try: |
| 65 | + return bytes.fromhex(value) |
| 66 | + except ValueError: |
| 67 | + pass |
| 68 | + |
| 69 | + # Otherwise treat as UTF-8 string |
| 70 | + return value.encode('utf-8') |
| 71 | + |
| 72 | + elif isinstance(value, int): |
| 73 | + # Convert integer to 4-byte big-endian |
| 74 | + return struct.pack('!I', value) |
| 75 | + |
| 76 | + elif isinstance(value, dict): |
| 77 | + # Handle verbose format with hex/integer/partial_text |
| 78 | + if 'hex' in value: |
| 79 | + return bytes.fromhex(value['hex']) |
| 80 | + elif 'integer' in value: |
| 81 | + return struct.pack('!I', value['integer']) |
| 82 | + elif 'partial_text' in value: |
| 83 | + # For partial text, we'll use the hex representation if available |
| 84 | + if 'hex' in value: |
| 85 | + return bytes.fromhex(value['hex']) |
| 86 | + else: |
| 87 | + return value['partial_text'].encode('utf-8', errors='replace') |
| 88 | + |
| 89 | + # Fallback: convert to string and then to bytes |
| 90 | + return str(value).encode('utf-8') |
| 91 | + |
| 92 | + |
| 93 | +def yaml_to_corpus(yaml_file: Path, tlv_types: Dict[str, int], force_v1_ids: bool = False) -> bytes: |
| 94 | + """Convert a YAML file back to binary corpus format.""" |
| 95 | + with open(yaml_file, 'r', encoding='utf-8') as f: |
| 96 | + yaml_data = yaml.safe_load(f) |
| 97 | + |
| 98 | + if not isinstance(yaml_data, dict) or 'tlvs' not in yaml_data: |
| 99 | + raise ValueError("Invalid YAML format: missing 'tlvs' key") |
| 100 | + |
| 101 | + # Create a temporary in-memory file-like object for the encoder |
| 102 | + import io |
| 103 | + output = io.BytesIO() |
| 104 | + |
| 105 | + # We don't have test_data for this use case, so pass None |
| 106 | + encoder = TLVEncoder(output, None) |
| 107 | + |
| 108 | + for tlv_entry in yaml_data['tlvs']: |
| 109 | + if not isinstance(tlv_entry, dict): |
| 110 | + log.warning(f"Skipping invalid TLV entry: {tlv_entry}") |
| 111 | + continue |
| 112 | + |
| 113 | + # Get type ID |
| 114 | + if force_v1_ids: |
| 115 | + # Force using v1 type IDs |
| 116 | + if 'type_id_v1' in tlv_entry: |
| 117 | + type_id = tlv_entry['type_id_v1'] |
| 118 | + elif 'type_id' in tlv_entry: |
| 119 | + type_id = tlv_entry['type_id'] |
| 120 | + else: |
| 121 | + log.warning(f"TLV entry missing type_id_v1 or type_id when --force-v1-ids is used: {tlv_entry}") |
| 122 | + continue |
| 123 | + else: |
| 124 | + # Default behavior: prefer type name lookup, fall back to IDs |
| 125 | + if 'type' in tlv_entry: |
| 126 | + try: |
| 127 | + type_id = get_tlv_type_id(tlv_entry['type'], tlv_types) |
| 128 | + except ValueError as e: |
| 129 | + log.warning(f"Type name lookup failed: {e}. Falling back to numeric ID.") |
| 130 | + if 'type_id_v1' in tlv_entry: |
| 131 | + type_id = tlv_entry['type_id_v1'] |
| 132 | + elif 'type_id' in tlv_entry: |
| 133 | + type_id = tlv_entry['type_id'] |
| 134 | + else: |
| 135 | + log.warning(f"TLV entry missing type information: {tlv_entry}") |
| 136 | + continue |
| 137 | + elif 'type_id_v1' in tlv_entry: |
| 138 | + type_id = tlv_entry['type_id_v1'] |
| 139 | + elif 'type_id' in tlv_entry: |
| 140 | + type_id = tlv_entry['type_id'] |
| 141 | + else: |
| 142 | + log.warning(f"TLV entry missing type information: {tlv_entry}") |
| 143 | + continue |
| 144 | + |
| 145 | + # Get length and value |
| 146 | + length = tlv_entry.get('length', 0) |
| 147 | + |
| 148 | + if length > 0: |
| 149 | + if 'value' not in tlv_entry: |
| 150 | + log.warning(f"TLV entry with length {length} but no value: {tlv_entry}") |
| 151 | + value_bytes = b"" |
| 152 | + else: |
| 153 | + value_bytes = parse_yaml_value(tlv_entry['value']) |
| 154 | + # Verify length matches |
| 155 | + if len(value_bytes) != length: |
| 156 | + log.warning(f"Length mismatch: expected {length}, got {len(value_bytes)}. Using actual length.") |
| 157 | + length = len(value_bytes) |
| 158 | + else: |
| 159 | + value_bytes = b"" |
| 160 | + |
| 161 | + # Write the TLV |
| 162 | + encoder.write_bytes(type_id, value_bytes) |
| 163 | + log.debug(f"Wrote TLV type {type_id}, length {len(value_bytes)}") |
| 164 | + |
| 165 | + return output.getvalue() |
| 166 | + |
| 167 | + |
| 168 | +def main() -> None: |
| 169 | + """Main function.""" |
| 170 | + parser = argparse.ArgumentParser( |
| 171 | + description="Convert YAML files back to curl fuzzer corpus format" |
| 172 | + ) |
| 173 | + parser.add_argument( |
| 174 | + "input", |
| 175 | + help="YAML file or directory to convert" |
| 176 | + ) |
| 177 | + parser.add_argument( |
| 178 | + "-o", "--output", |
| 179 | + help="Output file or directory (default: create corpus files alongside YAML files)" |
| 180 | + ) |
| 181 | + parser.add_argument( |
| 182 | + "--header", |
| 183 | + help="Path to curl_fuzzer.h header file (default: auto-detect)", |
| 184 | + type=Path |
| 185 | + ) |
| 186 | + parser.add_argument( |
| 187 | + "--corpus-dir", |
| 188 | + help="Base directory for corpus output (default: corpus_output/)", |
| 189 | + type=Path, |
| 190 | + default=Path("corpus_output") |
| 191 | + ) |
| 192 | + parser.add_argument( |
| 193 | + "--force-v1-ids", |
| 194 | + action="store_true", |
| 195 | + help="Force using type_id_v1 field instead of type name lookup" |
| 196 | + ) |
| 197 | + |
| 198 | + args = parser.parse_args() |
| 199 | + |
| 200 | + # Auto-detect header file if not provided |
| 201 | + if args.header is None: |
| 202 | + header_file = Path(__file__).parent.parent.parent / "curl_fuzzer.h" |
| 203 | + else: |
| 204 | + header_file = args.header |
| 205 | + |
| 206 | + # Extract TLV type definitions (name -> value mapping) |
| 207 | + tlv_types = extract_tlv_types_from_header(header_file) |
| 208 | + |
| 209 | + input_path = Path(args.input) |
| 210 | + |
| 211 | + if input_path.is_file(): |
| 212 | + # Single file processing |
| 213 | + if not input_path.suffix == '.yaml': |
| 214 | + log.warning(f"Input file {input_path} doesn't have .yaml extension") |
| 215 | + |
| 216 | + corpus_data = yaml_to_corpus(input_path, tlv_types, args.force_v1_ids) |
| 217 | + |
| 218 | + if args.output: |
| 219 | + output_file = Path(args.output) |
| 220 | + else: |
| 221 | + # Create corpus file alongside YAML file |
| 222 | + output_file = input_path.with_suffix('') # Remove .yaml extension |
| 223 | + |
| 224 | + output_file.parent.mkdir(parents=True, exist_ok=True) |
| 225 | + with open(output_file, 'wb') as f: |
| 226 | + f.write(corpus_data) |
| 227 | + |
| 228 | + log.info(f"Converted {input_path} -> {output_file} ({len(corpus_data)} bytes)") |
| 229 | + |
| 230 | + elif input_path.is_dir(): |
| 231 | + # Directory processing |
| 232 | + yaml_files = list(input_path.rglob("*.yaml")) |
| 233 | + |
| 234 | + if not yaml_files: |
| 235 | + log.error(f"No YAML files found in {input_path}") |
| 236 | + return |
| 237 | + |
| 238 | + processed_count = 0 |
| 239 | + for yaml_file in yaml_files: |
| 240 | + try: |
| 241 | + corpus_data = yaml_to_corpus(yaml_file, tlv_types, args.force_v1_ids) |
| 242 | + |
| 243 | + if args.output: |
| 244 | + # Create relative path structure in output directory |
| 245 | + relative_path = yaml_file.relative_to(input_path) |
| 246 | + output_file = Path(args.output) / relative_path.with_suffix('') |
| 247 | + else: |
| 248 | + # Create corresponding corpus file maintaining directory structure |
| 249 | + output_file = create_corpus_path(yaml_file, args.corpus_dir, input_path) |
| 250 | + |
| 251 | + output_file.parent.mkdir(parents=True, exist_ok=True) |
| 252 | + with open(output_file, 'wb') as f: |
| 253 | + f.write(corpus_data) |
| 254 | + |
| 255 | + log.debug(f"Converted {yaml_file} -> {output_file} ({len(corpus_data)} bytes)") |
| 256 | + processed_count += 1 |
| 257 | + |
| 258 | + except Exception as e: |
| 259 | + log.error(f"Error processing {yaml_file}: {e}") |
| 260 | + |
| 261 | + log.info(f"Processed {processed_count} YAML files") |
| 262 | + |
| 263 | + else: |
| 264 | + raise FileNotFoundError(f"Input path {args.input} does not exist") |
| 265 | + |
| 266 | + |
| 267 | +def create_corpus_path(yaml_file: Path, corpus_base_dir: Path, yaml_base_dir: Path = None) -> Path: |
| 268 | + """Create the corresponding corpus file path maintaining directory structure.""" |
| 269 | + if yaml_base_dir: |
| 270 | + # For directory processing, maintain relative structure |
| 271 | + try: |
| 272 | + relative_path = yaml_file.relative_to(yaml_base_dir) |
| 273 | + # Remove .yaml extension |
| 274 | + corpus_path = corpus_base_dir / relative_path.with_suffix('') |
| 275 | + return corpus_path |
| 276 | + except ValueError: |
| 277 | + # If yaml_file is not relative to yaml_base_dir, use just the filename |
| 278 | + relative_path = yaml_file.stem # filename without extension |
| 279 | + else: |
| 280 | + # For single file processing, use just the filename without extension |
| 281 | + relative_path = yaml_file.stem |
| 282 | + |
| 283 | + return corpus_base_dir / relative_path |
| 284 | + |
| 285 | + |
| 286 | +def run() -> None: |
| 287 | + """Set up common logging and run the main function.""" |
| 288 | + common_logging(__name__, __file__) |
| 289 | + main() |
| 290 | + |
| 291 | + |
| 292 | +if __name__ == "__main__": |
| 293 | + run() |
0 commit comments