From e6aa63b4852c11d5d44fc818d782b72a7ffcf3c2 Mon Sep 17 00:00:00 2001 From: Neil-TC Date: Wed, 8 Oct 2025 17:21:08 +0800 Subject: [PATCH 1/5] Add files via upload --- kicad_mcp/tools/bom_tools.py | 561 +++++++++++++++---------------- kicad_mcp/tools/drc_tools.py | 89 ++--- kicad_mcp/tools/export_tools.py | 147 ++++---- kicad_mcp/tools/netlist_tools.py | 405 +++++----------------- kicad_mcp/tools/pattern_tools.py | 213 ++++++------ 5 files changed, 591 insertions(+), 824 deletions(-) diff --git a/kicad_mcp/tools/bom_tools.py b/kicad_mcp/tools/bom_tools.py index 897949a..f5d4563 100644 --- a/kicad_mcp/tools/bom_tools.py +++ b/kicad_mcp/tools/bom_tools.py @@ -1,122 +1,121 @@ """ Bill of Materials (BOM) processing tools for KiCad projects. """ + import os import csv import json import pandas as pd from typing import Dict, List, Any, Optional, Tuple -from mcp.server.fastmcp import FastMCP, Context, Image +from mcp.server.fastmcp import FastMCP, Image +import logging from kicad_mcp.utils.file_utils import get_project_files + def register_bom_tools(mcp: FastMCP) -> None: """Register BOM-related tools with the MCP server. - + Args: mcp: The FastMCP server instance """ - + @mcp.tool() - async def analyze_bom(project_path: str, ctx: Context) -> Dict[str, Any]: + def analyze_bom(project_path: str) -> Dict[str, Any]: """Analyze a KiCad project's Bill of Materials. - + This tool will look for BOM files related to a KiCad project and provide analysis including component counts, categories, and cost estimates if available. - + Args: project_path: Path to the KiCad project file (.kicad_pro) - ctx: MCP context for progress reporting - + Returns: Dictionary with BOM analysis results """ print(f"Analyzing BOM for project: {project_path}") - + if not os.path.exists(project_path): print(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") + logging.info(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} - + # Report progress - await ctx.report_progress(10, 100) - ctx.info(f"Looking for BOM files related to {os.path.basename(project_path)}") - + # Progress removed(10, 100) + logging.info(f"Looking for BOM files related to {os.path.basename(project_path)}") + # Get all project files files = get_project_files(project_path) - + # Look for BOM files bom_files = {} for file_type, file_path in files.items(): if "bom" in file_type.lower() or file_path.lower().endswith(".csv"): bom_files[file_type] = file_path print(f"Found potential BOM file: {file_path}") - + if not bom_files: print("No BOM files found for project") - ctx.info("No BOM files found for project") + logging.info("No BOM files found for project") return { - "success": False, + "success": False, "error": "No BOM files found. Export a BOM from KiCad first.", - "project_path": project_path + "project_path": project_path, } - - await ctx.report_progress(30, 100) - + + # Progress removed(30, 100) + # Analyze each BOM file results = { "success": True, "project_path": project_path, "bom_files": {}, - "component_summary": {} + "component_summary": {}, } - + total_unique_components = 0 total_components = 0 - + for file_type, file_path in bom_files.items(): try: - ctx.info(f"Analyzing {os.path.basename(file_path)}") - + logging.info(f"Analyzing {os.path.basename(file_path)}") + # Parse the BOM file bom_data, format_info = parse_bom_file(file_path) - + if not bom_data or len(bom_data) == 0: print(f"Failed to parse BOM file: {file_path}") continue - + # Analyze the BOM data analysis = analyze_bom_data(bom_data, format_info) - + # Add to results results["bom_files"][file_type] = { "path": file_path, "format": format_info, - "analysis": analysis + "analysis": analysis, } - + # Update totals total_unique_components += analysis["unique_component_count"] total_components += analysis["total_component_count"] - + print(f"Successfully analyzed BOM file: {file_path}") - + except Exception as e: print(f"Error analyzing BOM file {file_path}: {str(e)}", exc_info=True) - results["bom_files"][file_type] = { - "path": file_path, - "error": str(e) - } - - await ctx.report_progress(70, 100) - + results["bom_files"][file_type] = {"path": file_path, "error": str(e)} + + # Progress removed(70, 100) + # Generate overall component summary if total_components > 0: results["component_summary"] = { "total_unique_components": total_unique_components, - "total_components": total_components + "total_components": total_components, } - + # Calculate component categories across all BOMs all_categories = {} for file_type, file_info in results["bom_files"].items(): @@ -125,9 +124,9 @@ async def analyze_bom(project_path: str, ctx: Context) -> Dict[str, Any]: if category not in all_categories: all_categories[category] = 0 all_categories[category] += count - + results["component_summary"]["categories"] = all_categories - + # Calculate total cost if available total_cost = 0.0 cost_available = False @@ -136,177 +135,182 @@ async def analyze_bom(project_path: str, ctx: Context) -> Dict[str, Any]: if file_info["analysis"]["total_cost"] > 0: total_cost += file_info["analysis"]["total_cost"] cost_available = True - + if cost_available: results["component_summary"]["total_cost"] = round(total_cost, 2) - currency = next(( - file_info["analysis"].get("currency", "USD") - for file_type, file_info in results["bom_files"].items() - if "analysis" in file_info and "currency" in file_info["analysis"] - ), "USD") + currency = next( + ( + file_info["analysis"].get("currency", "USD") + for file_type, file_info in results["bom_files"].items() + if "analysis" in file_info and "currency" in file_info["analysis"] + ), + "USD", + ) results["component_summary"]["currency"] = currency - - await ctx.report_progress(100, 100) - ctx.info(f"BOM analysis complete: found {total_components} components") - + + # Progress removed(100, 100) + logging.info(f"BOM analysis complete: found {total_components} components") + return results - + @mcp.tool() - async def export_bom_csv(project_path: str, ctx: Context) -> Dict[str, Any]: + def export_bom_csv(project_path: str) -> Dict[str, Any]: """Export a Bill of Materials for a KiCad project. - + This tool attempts to generate a CSV BOM file for a KiCad project. It requires KiCad to be installed with the appropriate command-line tools. - + Args: project_path: Path to the KiCad project file (.kicad_pro) - ctx: MCP context for progress reporting - + Returns: Dictionary with export results """ print(f"Exporting BOM for project: {project_path}") - + if not os.path.exists(project_path): print(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") + logging.info(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} - - # Get access to the app context - app_context = ctx.request_context.lifespan_context - kicad_modules_available = app_context.kicad_modules_available - + + # KiCad modules availability - set to False since we converted to CLI-based approach + kicad_modules_available = False + # Report progress - await ctx.report_progress(10, 100) - + # Progress removed(10, 100) + # Get all project files files = get_project_files(project_path) - + # We need the schematic file to generate a BOM if "schematic" not in files: print("Schematic file not found in project") - ctx.info("Schematic file not found in project") + logging.info("Schematic file not found in project") return {"success": False, "error": "Schematic file not found"} - + schematic_file = files["schematic"] project_dir = os.path.dirname(project_path) project_name = os.path.basename(project_path)[:-10] # Remove .kicad_pro extension - - await ctx.report_progress(20, 100) - ctx.info(f"Found schematic file: {os.path.basename(schematic_file)}") - + + # Progress removed(20, 100) + logging.info(f"Found schematic file: {os.path.basename(schematic_file)}") + # Try to export BOM # This will depend on KiCad's command-line tools or Python modules export_result = {"success": False} - + if kicad_modules_available: try: # Try to use KiCad Python modules - ctx.info("Attempting to export BOM using KiCad Python modules...") - export_result = await export_bom_with_python(schematic_file, project_dir, project_name, ctx) + logging.info("Attempting to export BOM using KiCad Python modules...") + export_result = export_bom_with_python( + schematic_file, project_dir, project_name + ) except Exception as e: print(f"Error exporting BOM with Python modules: {str(e)}", exc_info=True) - ctx.info(f"Error using Python modules: {str(e)}") + logging.info(f"Error using Python modules: {str(e)}") export_result = {"success": False, "error": str(e)} - + # If Python method failed, try command-line method if not export_result.get("success", False): try: - ctx.info("Attempting to export BOM using command-line tools...") - export_result = await export_bom_with_cli(schematic_file, project_dir, project_name, ctx) + logging.info("Attempting to export BOM using command-line tools...") + export_result = export_bom_with_cli( + schematic_file, project_dir, project_name + ) except Exception as e: print(f"Error exporting BOM with CLI: {str(e)}", exc_info=True) - ctx.info(f"Error using command-line tools: {str(e)}") + logging.info(f"Error using command-line tools: {str(e)}") export_result = {"success": False, "error": str(e)} - - await ctx.report_progress(100, 100) - + + # Progress removed(100, 100) + if export_result.get("success", False): - ctx.info(f"BOM exported successfully to {export_result.get('output_file', 'unknown location')}") + logging.info( + f"BOM exported successfully to {export_result.get('output_file', 'unknown location')}" + ) else: - ctx.info(f"Failed to export BOM: {export_result.get('error', 'Unknown error')}") - + logging.info(f"Failed to export BOM: {export_result.get('error', 'Unknown error')}") + return export_result # Helper functions for BOM processing + def parse_bom_file(file_path: str) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """Parse a BOM file and detect its format. - + Args: file_path: Path to the BOM file - + Returns: Tuple containing: - List of component dictionaries - Dictionary with format information """ print(f"Parsing BOM file: {file_path}") - + # Check file extension _, ext = os.path.splitext(file_path) ext = ext.lower() - + # Dictionary to store format detection info - format_info = { - "file_type": ext, - "detected_format": "unknown", - "header_fields": [] - } - + format_info = {"file_type": ext, "detected_format": "unknown", "header_fields": []} + # Empty list to store component data components = [] - + try: - if ext == '.csv': + if ext == ".csv": # Try to parse as CSV - with open(file_path, 'r', encoding='utf-8-sig') as f: + with open(file_path, "r", encoding="utf-8-sig") as f: # Read a few lines to analyze the format - sample = ''.join([f.readline() for _ in range(10)]) + sample = "".join([f.readline() for _ in range(10)]) f.seek(0) # Reset file pointer - + # Try to detect the delimiter - if ',' in sample: - delimiter = ',' - elif ';' in sample: - delimiter = ';' - elif '\t' in sample: - delimiter = '\t' + if "," in sample: + delimiter = "," + elif ";" in sample: + delimiter = ";" + elif "\t" in sample: + delimiter = "\t" else: - delimiter = ',' # Default - + delimiter = "," # Default + format_info["delimiter"] = delimiter - + # Read CSV reader = csv.DictReader(f, delimiter=delimiter) format_info["header_fields"] = reader.fieldnames if reader.fieldnames else [] - + # Detect BOM format based on header fields - header_str = ','.join(format_info["header_fields"]).lower() - - if 'reference' in header_str and 'value' in header_str: + header_str = ",".join(format_info["header_fields"]).lower() + + if "reference" in header_str and "value" in header_str: format_info["detected_format"] = "kicad" - elif 'designator' in header_str: + elif "designator" in header_str: format_info["detected_format"] = "altium" - elif 'part number' in header_str or 'manufacturer part' in header_str: + elif "part number" in header_str or "manufacturer part" in header_str: format_info["detected_format"] = "generic" - + # Read components for row in reader: components.append(dict(row)) - - elif ext == '.xml': + + elif ext == ".xml": # Basic XML parsing with security protection from defusedxml.ElementTree import parse as safe_parse + tree = safe_parse(file_path) root = tree.getroot() - + format_info["detected_format"] = "xml" - + # Try to extract components based on common XML BOM formats - component_elements = root.findall('.//component') or root.findall('.//Component') - + component_elements = root.findall(".//component") or root.findall(".//Component") + if component_elements: for elem in component_elements: component = {} @@ -315,83 +319,85 @@ def parse_bom_file(file_path: str) -> Tuple[List[Dict[str, Any]], Dict[str, Any] for child in elem: component[child.tag] = child.text components.append(component) - - elif ext == '.json': + + elif ext == ".json": # Parse JSON - with open(file_path, 'r') as f: + with open(file_path, "r") as f: data = json.load(f) - + format_info["detected_format"] = "json" - + # Try to find components array in common JSON formats if isinstance(data, list): components = data - elif 'components' in data: - components = data['components'] - elif 'parts' in data: - components = data['parts'] - + elif "components" in data: + components = data["components"] + elif "parts" in data: + components = data["parts"] + else: # Unknown format, try generic CSV parsing as fallback try: - with open(file_path, 'r', encoding='utf-8-sig') as f: + with open(file_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) format_info["header_fields"] = reader.fieldnames if reader.fieldnames else [] format_info["detected_format"] = "unknown_csv" - + for row in reader: components.append(dict(row)) except: print(f"Failed to parse unknown file format: {file_path}") return [], {"detected_format": "unsupported"} - + except Exception as e: print(f"Error parsing BOM file: {str(e)}", exc_info=True) return [], {"error": str(e)} - + # Check if we actually got components if not components: print(f"No components found in BOM file: {file_path}") else: print(f"Successfully parsed {len(components)} components from {file_path}") - + # Add a sample of the fields found if components: format_info["sample_fields"] = list(components[0].keys()) - + return components, format_info -def analyze_bom_data(components: List[Dict[str, Any]], format_info: Dict[str, Any]) -> Dict[str, Any]: +def analyze_bom_data( + components: List[Dict[str, Any]], format_info: Dict[str, Any] +) -> Dict[str, Any]: """Analyze component data from a BOM file. - + Args: components: List of component dictionaries format_info: Dictionary with format information - + Returns: Dictionary with analysis results """ print(f"Analyzing {len(components)} components") - + # Initialize results results = { "unique_component_count": 0, "total_component_count": 0, "categories": {}, - "has_cost_data": False + "has_cost_data": False, } - + if not components: return results - + # Try to convert to pandas DataFrame for easier analysis try: df = pd.DataFrame(components) - + # Clean up column names df.columns = [str(col).strip().lower() for col in df.columns] - + # Try to identify key columns based on format ref_col = None value_col = None @@ -399,55 +405,62 @@ def analyze_bom_data(components: List[Dict[str, Any]], format_info: Dict[str, An footprint_col = None cost_col = None category_col = None - + # Check for reference designator column - for possible_col in ['reference', 'designator', 'references', 'designators', 'refdes', 'ref']: + for possible_col in [ + "reference", + "designator", + "references", + "designators", + "refdes", + "ref", + ]: if possible_col in df.columns: ref_col = possible_col break - + # Check for value column - for possible_col in ['value', 'component', 'comp', 'part', 'component value', 'comp value']: + for possible_col in ["value", "component", "comp", "part", "component value", "comp value"]: if possible_col in df.columns: value_col = possible_col break - + # Check for quantity column - for possible_col in ['quantity', 'qty', 'count', 'amount']: + for possible_col in ["quantity", "qty", "count", "amount"]: if possible_col in df.columns: quantity_col = possible_col break - + # Check for footprint column - for possible_col in ['footprint', 'package', 'pattern', 'pcb footprint']: + for possible_col in ["footprint", "package", "pattern", "pcb footprint"]: if possible_col in df.columns: footprint_col = possible_col break - + # Check for cost column - for possible_col in ['cost', 'price', 'unit price', 'unit cost', 'cost each']: + for possible_col in ["cost", "price", "unit price", "unit cost", "cost each"]: if possible_col in df.columns: cost_col = possible_col break - + # Check for category column - for possible_col in ['category', 'type', 'group', 'component type', 'lib']: + for possible_col in ["category", "type", "group", "component type", "lib"]: if possible_col in df.columns: category_col = possible_col break - + # Count total components if quantity_col: # Try to convert quantity to numeric - df[quantity_col] = pd.to_numeric(df[quantity_col], errors='coerce').fillna(1) + df[quantity_col] = pd.to_numeric(df[quantity_col], errors="coerce").fillna(1) results["total_component_count"] = int(df[quantity_col].sum()) else: # If no quantity column, assume each row is one component results["total_component_count"] = len(df) - + # Count unique components results["unique_component_count"] = len(df) - + # Calculate categories if category_col: # Use provided category column @@ -462,44 +475,45 @@ def analyze_bom_data(components: List[Dict[str, Any]], format_info: Dict[str, An def extract_prefix(ref): if isinstance(ref, str): import re - match = re.match(r'^([A-Za-z]+)', ref) + + match = re.match(r"^([A-Za-z]+)", ref) if match: return match.group(1) return "Other" - - if isinstance(df[ref_col].iloc[0], str) and ',' in df[ref_col].iloc[0]: + + if isinstance(df[ref_col].iloc[0], str) and "," in df[ref_col].iloc[0]: # Multiple references in one cell all_refs = [] for refs in df[ref_col]: - all_refs.extend([r.strip() for r in refs.split(',')]) - + all_refs.extend([r.strip() for r in refs.split(",")]) + categories = {} for ref in all_refs: prefix = extract_prefix(ref) categories[prefix] = categories.get(prefix, 0) + 1 - + results["categories"] = categories else: # Single reference per row categories = df[ref_col].apply(extract_prefix).value_counts().to_dict() results["categories"] = {str(k): int(v) for k, v in categories.items()} - + # Map common reference prefixes to component types category_mapping = { - 'R': 'Resistors', - 'C': 'Capacitors', - 'L': 'Inductors', - 'D': 'Diodes', - 'Q': 'Transistors', - 'U': 'ICs', - 'SW': 'Switches', - 'J': 'Connectors', - 'K': 'Relays', - 'Y': 'Crystals/Oscillators', - 'F': 'Fuses', - 'T': 'Transformers' + "R": "Resistors", + "C": "Capacitors", + "L": "Inductors", + "D": "Diodes", + "Q": "Transistors", + "U": "ICs", + "SW": "Switches", + "J": "Connectors", + "K": "Relays", + "Y": "Crystals/Oscillators", + "F": "Fuses", + "T": "Transformers", } - + mapped_categories = {} for cat, count in results["categories"].items(): if cat in category_mapping: @@ -507,257 +521,238 @@ def extract_prefix(ref): mapped_categories[mapped_name] = mapped_categories.get(mapped_name, 0) + count else: mapped_categories[cat] = count - + results["categories"] = mapped_categories - + # Calculate cost if available if cost_col: try: # Try to extract numeric values from cost field - df[cost_col] = df[cost_col].astype(str).str.replace('$', '').str.replace(',', '') - df[cost_col] = pd.to_numeric(df[cost_col], errors='coerce') - + df[cost_col] = df[cost_col].astype(str).str.replace("$", "").str.replace(",", "") + df[cost_col] = pd.to_numeric(df[cost_col], errors="coerce") + # Remove NaN values df_with_cost = df.dropna(subset=[cost_col]) - + if not df_with_cost.empty: results["has_cost_data"] = True - + if quantity_col: total_cost = (df_with_cost[cost_col] * df_with_cost[quantity_col]).sum() else: total_cost = df_with_cost[cost_col].sum() - + results["total_cost"] = round(float(total_cost), 2) - + # Try to determine currency # Check first row that has cost for currency symbols for _, row in df.iterrows(): - cost_str = str(row.get(cost_col, '')) - if '$' in cost_str: + cost_str = str(row.get(cost_col, "")) + if "$" in cost_str: results["currency"] = "USD" break - elif '€' in cost_str: + elif "€" in cost_str: results["currency"] = "EUR" break - elif '£' in cost_str: + elif "£" in cost_str: results["currency"] = "GBP" break - + if "currency" not in results: results["currency"] = "USD" # Default except: print("Failed to parse cost data") - + # Add extra insights if ref_col and value_col: # Check for common components by value value_counts = df[value_col].value_counts() most_common = value_counts.head(5).to_dict() results["most_common_values"] = {str(k): int(v) for k, v in most_common.items()} - + except Exception as e: print(f"Error analyzing BOM data: {str(e)}", exc_info=True) # Fallback to basic analysis results["unique_component_count"] = len(components) results["total_component_count"] = len(components) - + return results -async def export_bom_with_python(schematic_file: str, output_dir: str, project_name: str, ctx: Context) -> Dict[str, Any]: +def export_bom_with_python( + schematic_file: str, output_dir: str, project_name: str +) -> Dict[str, Any]: """Export a BOM using KiCad Python modules. - + Args: schematic_file: Path to the schematic file output_dir: Directory to save the BOM project_name: Name of the project - ctx: MCP context for progress reporting - + Returns: Dictionary with export results """ print(f"Exporting BOM for schematic: {schematic_file}") - await ctx.report_progress(30, 100) - + # Progress removed(30, 100) + try: # Try to import KiCad Python modules # This is a placeholder since exporting BOMs from schematic files # is complex and KiCad's API for this is not well-documented import kicad import kicad.pcbnew - + # For now, return a message indicating this method is not implemented yet print("BOM export with Python modules not fully implemented") - ctx.info("BOM export with Python modules not fully implemented yet") - + logging.info("BOM export with Python modules not fully implemented yet") + return { "success": False, "error": "BOM export using Python modules is not fully implemented yet. Try using the command-line method.", - "schematic_file": schematic_file + "schematic_file": schematic_file, } - + except ImportError: print("Failed to import KiCad Python modules") return { "success": False, "error": "Failed to import KiCad Python modules", - "schematic_file": schematic_file + "schematic_file": schematic_file, } -async def export_bom_with_cli(schematic_file: str, output_dir: str, project_name: str, ctx: Context) -> Dict[str, Any]: +def export_bom_with_cli( + schematic_file: str, output_dir: str, project_name: str +) -> Dict[str, Any]: """Export a BOM using KiCad command-line tools. - + Args: schematic_file: Path to the schematic file output_dir: Directory to save the BOM project_name: Name of the project - ctx: MCP context for progress reporting - + Returns: Dictionary with export results """ import subprocess import platform - + system = platform.system() print(f"Exporting BOM using CLI tools on {system}") - await ctx.report_progress(40, 100) - + # Progress removed(40, 100) + # Output file path output_file = os.path.join(output_dir, f"{project_name}_bom.csv") - + # Define the command based on operating system if system == "Darwin": # macOS from kicad_mcp.config import KICAD_APP_PATH - + # Path to KiCad command-line tools on macOS kicad_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/kicad-cli") - + if not os.path.exists(kicad_cli): return { "success": False, "error": f"KiCad CLI tool not found at {kicad_cli}", - "schematic_file": schematic_file + "schematic_file": schematic_file, } - + # Command to generate BOM - cmd = [ - kicad_cli, - "sch", - "export", - "bom", - "--output", output_file, - schematic_file - ] - + cmd = [kicad_cli, "sch", "export", "bom", "--output", output_file, schematic_file] + elif system == "Windows": from kicad_mcp.config import KICAD_APP_PATH - + # Path to KiCad command-line tools on Windows kicad_cli = os.path.join(KICAD_APP_PATH, "bin", "kicad-cli.exe") - + if not os.path.exists(kicad_cli): return { "success": False, "error": f"KiCad CLI tool not found at {kicad_cli}", - "schematic_file": schematic_file + "schematic_file": schematic_file, } - + # Command to generate BOM - cmd = [ - kicad_cli, - "sch", - "export", - "bom", - "--output", output_file, - schematic_file - ] - + cmd = [kicad_cli, "sch", "export", "bom", "--output", output_file, schematic_file] + elif system == "Linux": # Assume kicad-cli is in the PATH kicad_cli = "kicad-cli" - + # Command to generate BOM - cmd = [ - kicad_cli, - "sch", - "export", - "bom", - "--output", output_file, - schematic_file - ] - + cmd = [kicad_cli, "sch", "export", "bom", "--output", output_file, schematic_file] + else: return { "success": False, "error": f"Unsupported operating system: {system}", - "schematic_file": schematic_file + "schematic_file": schematic_file, } - + try: print(f"Running command: {' '.join(cmd)}") - await ctx.report_progress(60, 100) - + # Progress removed(60, 100) + # Run the command process = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - + # Check if the command was successful if process.returncode != 0: print(f"BOM export command failed with code {process.returncode}") print(f"Error output: {process.stderr}") - + return { "success": False, "error": f"BOM export command failed: {process.stderr}", "schematic_file": schematic_file, - "command": ' '.join(cmd) + "command": " ".join(cmd), } - + # Check if the output file was created if not os.path.exists(output_file): return { "success": False, "error": "BOM file was not created", "schematic_file": schematic_file, - "output_file": output_file + "output_file": output_file, } - - await ctx.report_progress(80, 100) - + + # Progress removed(80, 100) + # Read the first few lines of the BOM to verify it's valid - with open(output_file, 'r') as f: + with open(output_file, "r") as f: bom_content = f.read(1024) # Read first 1KB - + if len(bom_content.strip()) == 0: return { "success": False, "error": "Generated BOM file is empty", "schematic_file": schematic_file, - "output_file": output_file + "output_file": output_file, } - + return { "success": True, "schematic_file": schematic_file, "output_file": output_file, "file_size": os.path.getsize(output_file), - "message": "BOM exported successfully" + "message": "BOM exported successfully", } - + except subprocess.TimeoutExpired: print("BOM export command timed out after 30 seconds") return { "success": False, "error": "BOM export command timed out after 30 seconds", - "schematic_file": schematic_file + "schematic_file": schematic_file, } - + except Exception as e: print(f"Error exporting BOM: {str(e)}", exc_info=True) return { "success": False, "error": f"Error exporting BOM: {str(e)}", - "schematic_file": schematic_file + "schematic_file": schematic_file, } diff --git a/kicad_mcp/tools/drc_tools.py b/kicad_mcp/tools/drc_tools.py index f3cf8fd..a87b072 100644 --- a/kicad_mcp/tools/drc_tools.py +++ b/kicad_mcp/tools/drc_tools.py @@ -1,8 +1,9 @@ """ Design Rule Check (DRC) tools for KiCad PCB files. """ + import os -# import logging # <-- Remove if no other logging exists +import logging from typing import Dict, Any from mcp.server.fastmcp import FastMCP, Context @@ -12,125 +13,125 @@ # Import implementations from kicad_mcp.tools.drc_impl.cli_drc import run_drc_via_cli + def register_drc_tools(mcp: FastMCP) -> None: """Register DRC tools with the MCP server. - + Args: mcp: The FastMCP server instance """ - + @mcp.tool() def get_drc_history_tool(project_path: str) -> Dict[str, Any]: """Get the DRC check history for a KiCad project. - + Args: project_path: Path to the KiCad project file (.kicad_pro) - + Returns: Dictionary with DRC history entries """ print(f"Getting DRC history for project: {project_path}") - + if not os.path.exists(project_path): print(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} - + # Get history entries history_entries = get_drc_history(project_path) - + # Calculate trend information trend = None if len(history_entries) >= 2: first = history_entries[-1] # Oldest entry - last = history_entries[0] # Newest entry - + last = history_entries[0] # Newest entry + first_violations = first.get("total_violations", 0) last_violations = last.get("total_violations", 0) - + if first_violations > last_violations: trend = "improving" elif first_violations < last_violations: trend = "degrading" else: trend = "stable" - + return { "success": True, "project_path": project_path, "history_entries": history_entries, "entry_count": len(history_entries), - "trend": trend + "trend": trend, } - + @mcp.tool() - async def run_drc_check(project_path: str, ctx: Context) -> Dict[str, Any]: + def run_drc_check(project_path: str) -> Dict[str, Any]: """Run a Design Rule Check on a KiCad PCB file. - + Args: project_path: Path to the KiCad project file (.kicad_pro) - ctx: MCP context for progress reporting - + Returns: Dictionary with DRC results and statistics """ print(f"Running DRC check for project: {project_path}") - + if not os.path.exists(project_path): print(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} - + # Get PCB file from project files = get_project_files(project_path) if "pcb" not in files: print("PCB file not found in project") return {"success": False, "error": "PCB file not found in project"} - + pcb_file = files["pcb"] print(f"Found PCB file: {pcb_file}") - + # Report progress to user - await ctx.report_progress(10, 100) - ctx.info(f"Starting DRC check on {os.path.basename(pcb_file)}") - + logging.info(f"Starting DRC check on {os.path.basename(pcb_file)}") + # Run DRC using the appropriate approach drc_results = None - + print("Using kicad-cli for DRC") - ctx.info("Using KiCad CLI for DRC check...") + logging.info("Using KiCad CLI for DRC check...") # logging.info(f"[DRC] Calling run_drc_via_cli for {pcb_file}") # <-- Remove log - drc_results = await run_drc_via_cli(pcb_file, ctx) + drc_results = run_drc_via_cli(pcb_file) # logging.info(f"[DRC] run_drc_via_cli finished for {pcb_file}") # <-- Remove log - + # Process and save results if successful if drc_results and drc_results.get("success", False): # logging.info(f"[DRC] DRC check successful for {pcb_file}. Saving results.") # <-- Remove log # Save results to history save_drc_result(project_path, drc_results) - + # Add comparison with previous run comparison = compare_with_previous(project_path, drc_results) if comparison: drc_results["comparison"] = comparison - + if comparison["change"] < 0: - ctx.info(f"Great progress! You've fixed {abs(comparison['change'])} DRC violations since the last check.") + logging.info( + f"Great progress! You've fixed {abs(comparison['change'])} DRC violations since the last check." + ) elif comparison["change"] > 0: - ctx.info(f"Found {comparison['change']} new DRC violations since the last check.") + logging.info( + f"Found {comparison['change']} new DRC violations since the last check." + ) else: - ctx.info(f"No change in the number of DRC violations since the last check.") + logging.info(f"No change in the number of DRC violations since the last check.") elif drc_results: - # logging.warning(f"[DRC] DRC check reported failure for {pcb_file}: {drc_results.get('error')}") # <-- Remove log - # Pass or print a warning if needed - pass + # logging.warning(f"[DRC] DRC check reported failure for {pcb_file}: {drc_results.get('error')}") # <-- Remove log + # Pass or print a warning if needed + pass else: # logging.error(f"[DRC] DRC check returned None for {pcb_file}") # <-- Remove log # Pass or print an error if needed pass - + # Complete progress - await ctx.report_progress(100, 100) - - return drc_results or { - "success": False, - "error": "DRC check failed with an unknown error" - } + # Progress reporting removed + + return drc_results or {"success": False, "error": "DRC check failed with an unknown error"} diff --git a/kicad_mcp/tools/export_tools.py b/kicad_mcp/tools/export_tools.py index efc5962..0f0edb5 100644 --- a/kicad_mcp/tools/export_tools.py +++ b/kicad_mcp/tools/export_tools.py @@ -1,26 +1,29 @@ """ Export tools for KiCad projects. """ + import os import tempfile import subprocess import shutil import asyncio from typing import Dict, Any, Optional -from mcp.server.fastmcp import FastMCP, Context, Image +from mcp.server.fastmcp import FastMCP, Image +import logging from kicad_mcp.utils.file_utils import get_project_files from kicad_mcp.config import KICAD_APP_PATH, system + def register_export_tools(mcp: FastMCP) -> None: """Register export tools with the MCP server. - + Args: mcp: The FastMCP server instance """ - + @mcp.tool() - async def generate_pcb_thumbnail(project_path: str, ctx: Context): + def generate_pcb_thumbnail(project_path: str): """Generate a thumbnail image of a KiCad PCB layout using kicad-cli. Args: @@ -31,97 +34,123 @@ async def generate_pcb_thumbnail(project_path: str, ctx: Context): Thumbnail image of the PCB or None if generation failed """ try: - # Access the context - app_context = ctx.request_context.lifespan_context - # Removed check for kicad_modules_available as we now use CLI - print(f"Generating thumbnail via CLI for project: {project_path}") if not os.path.exists(project_path): print(f"Project not found: {project_path}") - await ctx.info(f"Project not found: {project_path}") + logging.info(f"Project not found: {project_path}") return None # Get PCB file from project files = get_project_files(project_path) if "pcb" not in files: print("PCB file not found in project") - await ctx.info("PCB file not found in project") + logging.info("PCB file not found in project") return None pcb_file = files["pcb"] print(f"Found PCB file: {pcb_file}") - # Check cache - cache_key = f"thumbnail_cli_{pcb_file}_{os.path.getmtime(pcb_file)}" - if hasattr(app_context, 'cache') and cache_key in app_context.cache: - print(f"Using cached CLI thumbnail for {pcb_file}") - return app_context.cache[cache_key] - - await ctx.report_progress(10, 100) - await ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)} using kicad-cli") + # Progress removed(10, 100) + logging.info(f"Generating thumbnail for {os.path.basename(pcb_file)} using kicad-cli") # Use command-line tools try: - thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx) + thumbnail = generate_thumbnail_with_cli(pcb_file) if thumbnail: - # Cache the result if possible - if hasattr(app_context, 'cache'): - app_context.cache[cache_key] = thumbnail print("Thumbnail generated successfully via CLI.") return thumbnail else: - print("generate_thumbnail_with_cli returned None") - await ctx.info("Failed to generate thumbnail using kicad-cli.") - return None + print("generate_thumbnail_with_cli returned None") + logging.info("Failed to generate thumbnail using kicad-cli.") + return None except Exception as e: print(f"Error calling generate_thumbnail_with_cli: {str(e)}", exc_info=True) - await ctx.info(f"Error generating thumbnail with kicad-cli: {str(e)}") + logging.info(f"Error generating thumbnail with kicad-cli: {str(e)}") return None - + except asyncio.CancelledError: print("Thumbnail generation cancelled") raise # Re-raise to let MCP know the task was cancelled except Exception as e: print(f"Unexpected error in thumbnail generation: {str(e)}") - await ctx.info(f"Error: {str(e)}") + logging.info(f"Error: {str(e)}") return None @mcp.tool() - async def generate_project_thumbnail(project_path: str, ctx: Context): - """Generate a thumbnail of a KiCad project's PCB layout (Alias for generate_pcb_thumbnail).""" - # This function now just calls the main CLI-based thumbnail generator - print(f"generate_project_thumbnail called, redirecting to generate_pcb_thumbnail for {project_path}") - return await generate_pcb_thumbnail(project_path, ctx) + def generate_project_thumbnail(project_path: str): + """Generate a thumbnail of a KiCad project's PCB layout.""" + try: + print(f"Generating thumbnail for project: {project_path}") + + if not os.path.exists(project_path): + print(f"Project not found: {project_path}") + logging.info(f"Project not found: {project_path}") + return None + + # Get PCB file from project + files = get_project_files(project_path) + if "pcb" not in files: + print("PCB file not found in project") + logging.info("PCB file not found in project") + return None + + pcb_file = files["pcb"] + print(f"Found PCB file: {pcb_file}") + + logging.info(f"Generating thumbnail for {os.path.basename(pcb_file)} using kicad-cli") + + # Use command-line tools + try: + thumbnail = generate_thumbnail_with_cli(pcb_file) + if thumbnail: + print("Thumbnail generated successfully via CLI.") + return thumbnail + else: + print("generate_thumbnail_with_cli returned None") + logging.info("Failed to generate thumbnail using kicad-cli.") + return None + except Exception as e: + print(f"Error calling generate_thumbnail_with_cli: {str(e)}", exc_info=True) + logging.info(f"Error generating thumbnail with kicad-cli: {str(e)}") + return None + + except asyncio.CancelledError: + print("Thumbnail generation cancelled") + raise # Re-raise to let MCP know the task was cancelled + except Exception as e: + print(f"Unexpected error in thumbnail generation: {str(e)}") + logging.info(f"Error: {str(e)}") + return None + # Helper functions for thumbnail generation -async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context): +def generate_thumbnail_with_cli(pcb_file: str): """Generate PCB thumbnail using command line tools. This is a fallback method when the kicad Python module is not available or fails. Args: pcb_file: Path to the PCB file (.kicad_pcb) - ctx: MCP context for progress reporting Returns: Image object containing the PCB thumbnail or None if generation failed """ try: print("Attempting to generate thumbnail using KiCad CLI tools") - await ctx.report_progress(20, 100) + # Progress removed(20, 100) - # --- Determine Output Path --- + # --- Determine Output Path --- project_dir = os.path.dirname(pcb_file) project_name = os.path.splitext(os.path.basename(pcb_file))[0] output_file = os.path.join(project_dir, f"{project_name}_thumbnail.svg") - # --------------------------- + # --------------------------- # Check for required command-line tools based on OS kicad_cli = None if system == "Darwin": # macOS kicad_cli_path = os.path.join(KICAD_APP_PATH, "Contents/MacOS/kicad-cli") if os.path.exists(kicad_cli_path): - kicad_cli = kicad_cli_path + kicad_cli = kicad_cli_path elif shutil.which("kicad-cli") is not None: kicad_cli = "kicad-cli" # Try to use from PATH else: @@ -130,9 +159,9 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context): elif system == "Windows": kicad_cli_path = os.path.join(KICAD_APP_PATH, "bin", "kicad-cli.exe") if os.path.exists(kicad_cli_path): - kicad_cli = kicad_cli_path + kicad_cli = kicad_cli_path elif shutil.which("kicad-cli.exe") is not None: - kicad_cli = "kicad-cli.exe" + kicad_cli = "kicad-cli.exe" elif shutil.which("kicad-cli") is not None: kicad_cli = "kicad-cli" # Try to use from PATH (without .exe) else: @@ -147,30 +176,32 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context): print(f"Unsupported operating system: {system}") return None - await ctx.report_progress(30, 100) - await ctx.info("Using KiCad command line tools for thumbnail generation") + # Progress removed(30, 100) + logging.info("Using KiCad command line tools for thumbnail generation") # Build command for generating SVG from PCB using kicad-cli (changed from PNG) cmd = [ kicad_cli, "pcb", "export", - "svg", # <-- Changed format to svg - "--output", output_file, - "--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Keep relevant layers + "svg", # <-- Changed format to svg + "--output", + output_file, + "--layers", + "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Keep relevant layers # Consider adding options like --black-and-white if needed - pcb_file + pcb_file, ] print(f"Running command: {' '.join(cmd)}") - await ctx.report_progress(50, 100) + # Progress removed(50, 100) # Run the command try: process = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=30) print(f"Command successful: {process.stdout}") - await ctx.report_progress(70, 100) + # Progress removed(70, 100) # Check if the output file was created if not os.path.exists(output_file): @@ -178,34 +209,34 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context): return None # Read the image file - with open(output_file, 'rb') as f: + with open(output_file, "rb") as f: img_data = f.read() print(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes") - await ctx.report_progress(90, 100) + # Progress removed(90, 100) # Inform user about the saved file - await ctx.info(f"Thumbnail saved to: {output_file}") - return Image(data=img_data, format="svg") # <-- Changed format to svg + logging.info(f"Thumbnail saved to: {output_file}") + return Image(data=img_data, format="svg") # <-- Changed format to svg except subprocess.CalledProcessError as e: print(f"Command '{' '.join(e.cmd)}' failed with code {e.returncode}") print(f"Stderr: {e.stderr}") print(f"Stdout: {e.stdout}") - await ctx.info(f"KiCad CLI command failed: {e.stderr or e.stdout}") + logging.info(f"KiCad CLI command failed: {e.stderr or e.stdout}") return None except subprocess.TimeoutExpired: print(f"Command timed out after 30 seconds: {' '.join(cmd)}") - await ctx.info("KiCad CLI command timed out") + logging.info("KiCad CLI command timed out") return None except Exception as e: print(f"Error running CLI command: {str(e)}", exc_info=True) - await ctx.info(f"Error running KiCad CLI: {str(e)}") + logging.info(f"Error running KiCad CLI: {str(e)}") return None - + except asyncio.CancelledError: print("CLI thumbnail generation cancelled") raise except Exception as e: print(f"Unexpected error in CLI thumbnail generation: {str(e)}") - await ctx.info(f"Unexpected error: {str(e)}") + logging.info(f"Unexpected error: {str(e)}") return None diff --git a/kicad_mcp/tools/netlist_tools.py b/kicad_mcp/tools/netlist_tools.py index 42e521c..6e5a860 100644 --- a/kicad_mcp/tools/netlist_tools.py +++ b/kicad_mcp/tools/netlist_tools.py @@ -1,69 +1,61 @@ """ -Netlist extraction and analysis tools for KiCad schematics. +Simplified Netlist extraction tools for KiCad schematics. """ + import os +import logging from typing import Dict, Any -from mcp.server.fastmcp import FastMCP, Context +from mcp.server.fastmcp import FastMCP from kicad_mcp.utils.file_utils import get_project_files from kicad_mcp.utils.netlist_parser import extract_netlist, analyze_netlist + def register_netlist_tools(mcp: FastMCP) -> None: """Register netlist-related tools with the MCP server. - + Args: mcp: The FastMCP server instance """ - + @mcp.tool() - async def extract_schematic_netlist(schematic_path: str, ctx: Context) -> Dict[str, Any]: + def extract_schematic_netlist(schematic_path: str) -> Dict[str, Any]: """Extract netlist information from a KiCad schematic. - + This tool parses a KiCad schematic file and extracts comprehensive netlist information including components, connections, and labels. - + Args: schematic_path: Path to the KiCad schematic file (.kicad_sch) - ctx: MCP context for progress reporting - + Returns: Dictionary with netlist information """ - print(f"Extracting netlist from schematic: {schematic_path}") - + logging.info(f"Extracting netlist from schematic: {schematic_path}") + if not os.path.exists(schematic_path): - print(f"Schematic file not found: {schematic_path}") - ctx.info(f"Schematic file not found: {schematic_path}") - return {"success": False, "error": f"Schematic file not found: {schematic_path}"} - - # Report progress - await ctx.report_progress(10, 100) - ctx.info(f"Loading schematic file: {os.path.basename(schematic_path)}") - - # Extract netlist information + error_msg = f"Schematic file not found: {schematic_path}" + logging.error(error_msg) + return {"success": False, "error": error_msg} + + logging.info(f"Loading schematic file: {os.path.basename(schematic_path)}") + try: - await ctx.report_progress(20, 100) - ctx.info("Parsing schematic structure...") - + logging.info("Parsing schematic structure...") netlist_data = extract_netlist(schematic_path) - + if "error" in netlist_data: - print(f"Error extracting netlist: {netlist_data['error']}") - ctx.info(f"Error extracting netlist: {netlist_data['error']}") - return {"success": False, "error": netlist_data['error']} - - await ctx.report_progress(60, 100) - ctx.info(f"Extracted {netlist_data['component_count']} components and {netlist_data['net_count']} nets") - - # Analyze the netlist - await ctx.report_progress(70, 100) - ctx.info("Analyzing netlist data...") - + error_msg = f"Error extracting netlist: {netlist_data['error']}" + logging.error(error_msg) + return {"success": False, "error": netlist_data["error"]} + + logging.info( + f"Extracted {netlist_data['component_count']} components and {netlist_data['net_count']} nets" + ) + + logging.info("Analyzing netlist data...") analysis_results = analyze_netlist(netlist_data) - - await ctx.report_progress(90, 100) - - # Build result + result = { "success": True, "schematic_path": schematic_path, @@ -71,318 +63,88 @@ async def extract_schematic_netlist(schematic_path: str, ctx: Context) -> Dict[s "net_count": netlist_data["net_count"], "components": netlist_data["components"], "nets": netlist_data["nets"], - "analysis": analysis_results + "analysis": analysis_results, } - - # Complete progress - await ctx.report_progress(100, 100) - ctx.info("Netlist extraction complete") - - return result - - except Exception as e: - print(f"Error extracting netlist: {str(e)}") - ctx.info(f"Error extracting netlist: {str(e)}") - return {"success": False, "error": str(e)} - @mcp.tool() - async def extract_project_netlist(project_path: str, ctx: Context) -> Dict[str, Any]: - """Extract netlist from a KiCad project's schematic. - - This tool finds the schematic associated with a KiCad project - and extracts its netlist information. - - Args: - project_path: Path to the KiCad project file (.kicad_pro) - ctx: MCP context for progress reporting - - Returns: - Dictionary with netlist information - """ - print(f"Extracting netlist for project: {project_path}") - - if not os.path.exists(project_path): - print(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") - return {"success": False, "error": f"Project not found: {project_path}"} - - # Report progress - await ctx.report_progress(10, 100) - - # Get the schematic file - try: - files = get_project_files(project_path) - - if "schematic" not in files: - print("Schematic file not found in project") - ctx.info("Schematic file not found in project") - return {"success": False, "error": "Schematic file not found in project"} - - schematic_path = files["schematic"] - print(f"Found schematic file: {schematic_path}") - ctx.info(f"Found schematic file: {os.path.basename(schematic_path)}") - - # Extract netlist - await ctx.report_progress(20, 100) - - # Call the schematic netlist extraction - result = await extract_schematic_netlist(schematic_path, ctx) - - # Add project path to result - if "success" in result and result["success"]: - result["project_path"] = project_path - + logging.info("Netlist extraction complete") return result - - except Exception as e: - print(f"Error extracting project netlist: {str(e)}") - ctx.info(f"Error extracting project netlist: {str(e)}") - return {"success": False, "error": str(e)} - @mcp.tool() - async def analyze_schematic_connections(schematic_path: str, ctx: Context) -> Dict[str, Any]: - """Analyze connections in a KiCad schematic. - - This tool provides detailed analysis of component connections, - including power nets, signal paths, and potential issues. - - Args: - schematic_path: Path to the KiCad schematic file (.kicad_sch) - ctx: MCP context for progress reporting - - Returns: - Dictionary with connection analysis - """ - print(f"Analyzing connections in schematic: {schematic_path}") - - if not os.path.exists(schematic_path): - print(f"Schematic file not found: {schematic_path}") - ctx.info(f"Schematic file not found: {schematic_path}") - return {"success": False, "error": f"Schematic file not found: {schematic_path}"} - - # Report progress - await ctx.report_progress(10, 100) - ctx.info(f"Extracting netlist from: {os.path.basename(schematic_path)}") - - # Extract netlist information - try: - netlist_data = extract_netlist(schematic_path) - - if "error" in netlist_data: - print(f"Error extracting netlist: {netlist_data['error']}") - ctx.info(f"Error extracting netlist: {netlist_data['error']}") - return {"success": False, "error": netlist_data['error']} - - await ctx.report_progress(40, 100) - - # Advanced connection analysis - ctx.info("Performing connection analysis...") - - analysis = { - "component_count": netlist_data["component_count"], - "net_count": netlist_data["net_count"], - "component_types": {}, - "power_nets": [], - "signal_nets": [], - "potential_issues": [] - } - - # Analyze component types - components = netlist_data.get("components", {}) - for ref, component in components.items(): - # Extract component type from reference (e.g., R1 -> R) - import re - comp_type_match = re.match(r'^([A-Za-z_]+)', ref) - if comp_type_match: - comp_type = comp_type_match.group(1) - if comp_type not in analysis["component_types"]: - analysis["component_types"][comp_type] = 0 - analysis["component_types"][comp_type] += 1 - - await ctx.report_progress(60, 100) - - # Identify power nets - nets = netlist_data.get("nets", {}) - for net_name, pins in nets.items(): - if any(net_name.startswith(prefix) for prefix in ["VCC", "VDD", "GND", "+5V", "+3V3", "+12V"]): - analysis["power_nets"].append({ - "name": net_name, - "pin_count": len(pins) - }) - else: - analysis["signal_nets"].append({ - "name": net_name, - "pin_count": len(pins) - }) - - await ctx.report_progress(80, 100) - - # Check for potential issues - # 1. Nets with only one connection (floating) - for net_name, pins in nets.items(): - if len(pins) <= 1 and not any(net_name.startswith(prefix) for prefix in ["VCC", "VDD", "GND", "+5V", "+3V3", "+12V"]): - analysis["potential_issues"].append({ - "type": "floating_net", - "net": net_name, - "description": f"Net '{net_name}' appears to be floating (only has {len(pins)} connection)" - }) - - # 2. Power pins without connections - # This would require more detailed parsing of the schematic - - await ctx.report_progress(90, 100) - - # Build result - result = { - "success": True, - "schematic_path": schematic_path, - "analysis": analysis - } - - # Complete progress - await ctx.report_progress(100, 100) - ctx.info("Connection analysis complete") - - return result - except Exception as e: - print(f"Error analyzing connections: {str(e)}") - ctx.info(f"Error analyzing connections: {str(e)}") + error_msg = f"Error extracting netlist: {str(e)}" + logging.error(error_msg) return {"success": False, "error": str(e)} @mcp.tool() - async def find_component_connections(project_path: str, component_ref: str, ctx: Context) -> Dict[str, Any]: + def find_component_connections(project_path: str, component_ref: str) -> Dict[str, Any]: """Find all connections for a specific component in a KiCad project. - - This tool extracts information about how a specific component - is connected to other components in the schematic. - + Args: project_path: Path to the KiCad project file (.kicad_pro) component_ref: Component reference (e.g., "R1", "U3") - ctx: MCP context for progress reporting - + Returns: Dictionary with component connection information """ - print(f"Finding connections for component {component_ref} in project: {project_path}") - + logging.info(f"Finding connections for component {component_ref} in project: {project_path}") + if not os.path.exists(project_path): - print(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") - return {"success": False, "error": f"Project not found: {project_path}"} - - # Report progress - await ctx.report_progress(10, 100) - - # Get the schematic file + error_msg = f"Project not found: {project_path}" + logging.error(error_msg) + return {"success": False, "error": error_msg} + try: files = get_project_files(project_path) - + if "schematic" not in files: - print("Schematic file not found in project") - ctx.info("Schematic file not found in project") - return {"success": False, "error": "Schematic file not found in project"} - + error_msg = "Schematic file not found in project" + logging.error(error_msg) + return {"success": False, "error": error_msg} + schematic_path = files["schematic"] - print(f"Found schematic file: {schematic_path}") - ctx.info(f"Found schematic file: {os.path.basename(schematic_path)}") - + logging.info(f"Found schematic file: {os.path.basename(schematic_path)}") + # Extract netlist - await ctx.report_progress(30, 100) - ctx.info(f"Extracting netlist to find connections for {component_ref}...") - netlist_data = extract_netlist(schematic_path) - + if "error" in netlist_data: - print(f"Failed to extract netlist: {netlist_data['error']}") - ctx.info(f"Failed to extract netlist: {netlist_data['error']}") - return {"success": False, "error": netlist_data['error']} - - # Check if component exists in the netlist + error_msg = f"Failed to extract netlist: {netlist_data['error']}" + logging.error(error_msg) + return {"success": False, "error": netlist_data["error"]} + + # Check if component exists components = netlist_data.get("components", {}) if component_ref not in components: - print(f"Component {component_ref} not found in schematic") - ctx.info(f"Component {component_ref} not found in schematic") + error_msg = f"Component {component_ref} not found in schematic" + logging.error(error_msg) return { - "success": False, - "error": f"Component {component_ref} not found in schematic", - "available_components": list(components.keys()) + "success": False, + "error": error_msg, + "available_components": list(components.keys()), } - - # Get component information + + # Find connections (simplified version) component_info = components[component_ref] - - # Find connections - await ctx.report_progress(50, 100) - ctx.info("Finding connections...") - nets = netlist_data.get("nets", {}) connections = [] connected_nets = [] - + for net_name, pins in nets.items(): - # Check if any pin belongs to our component - component_pins = [] - for pin in pins: - if pin.get('component') == component_ref: - component_pins.append(pin) - + component_pins = [pin for pin in pins if pin.get("component") == component_ref] if component_pins: - # This net has connections to our component - net_connections = [] - + connected_nets.append(net_name) for pin in component_pins: - pin_num = pin.get('pin', 'Unknown') - # Find other components connected to this pin - connected_components = [] - - for other_pin in pins: - other_comp = other_pin.get('component') - if other_comp and other_comp != component_ref: - connected_components.append({ - "component": other_comp, - "pin": other_pin.get('pin', 'Unknown') - }) - - net_connections.append({ + pin_num = pin.get("pin", "Unknown") + connected_components = [ + {"component": other_pin.get("component"), "pin": other_pin.get("pin", "Unknown")} + for other_pin in pins + if other_pin.get("component") and other_pin.get("component") != component_ref + ] + connections.append({ "pin": pin_num, "net": net_name, "connected_to": connected_components }) - - connections.extend(net_connections) - connected_nets.append(net_name) - - # Analyze the connections - await ctx.report_progress(70, 100) - ctx.info("Analyzing connections...") - - # Categorize connections by pin function (if possible) - pin_functions = {} - if "pins" in component_info: - for pin in component_info["pins"]: - pin_num = pin.get('num') - pin_name = pin.get('name', '') - - # Try to categorize based on pin name - pin_type = "unknown" - - if any(power_term in pin_name.upper() for power_term in ["VCC", "VDD", "VEE", "VSS", "GND", "PWR", "POWER"]): - pin_type = "power" - elif any(io_term in pin_name.upper() for io_term in ["IO", "I/O", "GPIO"]): - pin_type = "io" - elif any(input_term in pin_name.upper() for input_term in ["IN", "INPUT"]): - pin_type = "input" - elif any(output_term in pin_name.upper() for output_term in ["OUT", "OUTPUT"]): - pin_type = "output" - - pin_functions[pin_num] = { - "name": pin_name, - "type": pin_type - } - - # Build result + result = { "success": True, "project_path": project_path, @@ -391,16 +153,13 @@ async def find_component_connections(project_path: str, component_ref: str, ctx: "component_info": component_info, "connections": connections, "connected_nets": connected_nets, - "pin_functions": pin_functions, - "total_connections": len(connections) + "total_connections": len(connections), } - - await ctx.report_progress(100, 100) - ctx.info(f"Found {len(connections)} connections for component {component_ref}") - + + logging.info(f"Found {len(connections)} connections for component {component_ref}") return result - + except Exception as e: - print(f"Error finding component connections: {str(e)}", exc_info=True) - ctx.info(f"Error finding component connections: {str(e)}") - return {"success": False, "error": str(e)} + error_msg = f"Error finding component connections: {str(e)}" + logging.error(error_msg) + return {"success": False, "error": str(e)} \ No newline at end of file diff --git a/kicad_mcp/tools/pattern_tools.py b/kicad_mcp/tools/pattern_tools.py index 65734d8..7335ade 100644 --- a/kicad_mcp/tools/pattern_tools.py +++ b/kicad_mcp/tools/pattern_tools.py @@ -1,7 +1,9 @@ """ Circuit pattern recognition tools for KiCad schematics. """ + import os +import logging from typing import Dict, List, Any, Optional from mcp.server.fastmcp import FastMCP, Context @@ -14,20 +16,21 @@ identify_oscillators, identify_digital_interfaces, identify_microcontrollers, - identify_sensor_interfaces + identify_sensor_interfaces, ) + def register_pattern_tools(mcp: FastMCP) -> None: """Register circuit pattern recognition tools with the MCP server. - + Args: mcp: The FastMCP server instance """ - + @mcp.tool() - async def identify_circuit_patterns(schematic_path: str, ctx: Context) -> Dict[str, Any]: + def identify_circuit_patterns(schematic_path: str) -> Dict[str, Any]: """Identify common circuit patterns in a KiCad schematic. - + This tool analyzes a schematic to recognize common circuit blocks such as: - Power supply circuits (linear regulators, switching converters) - Amplifier circuits (op-amps, transistor amplifiers) @@ -35,143 +38,121 @@ async def identify_circuit_patterns(schematic_path: str, ctx: Context) -> Dict[s - Digital interfaces (I2C, SPI, UART) - Microcontroller circuits - And more - + Args: schematic_path: Path to the KiCad schematic file (.kicad_sch) - ctx: MCP context for progress reporting - + Returns: Dictionary with identified circuit patterns """ - if not os.path.exists(schematic_path): - ctx.info(f"Schematic file not found: {schematic_path}") - return {"success": False, "error": f"Schematic file not found: {schematic_path}"} - - # Report progress - await ctx.report_progress(10, 100) - ctx.info(f"Loading schematic file: {os.path.basename(schematic_path)}") - - try: - # Extract netlist information - await ctx.report_progress(20, 100) - ctx.info("Parsing schematic structure...") - - netlist_data = extract_netlist(schematic_path) - - if "error" in netlist_data: - ctx.info(f"Error extracting netlist: {netlist_data['error']}") - return {"success": False, "error": netlist_data['error']} - - # Analyze components and nets - await ctx.report_progress(30, 100) - ctx.info("Analyzing components and connections...") - - components = netlist_data.get("components", {}) - nets = netlist_data.get("nets", {}) - - # Start pattern recognition - await ctx.report_progress(50, 100) - ctx.info("Identifying circuit patterns...") - - identified_patterns = { - "power_supply_circuits": [], - "amplifier_circuits": [], - "filter_circuits": [], - "oscillator_circuits": [], - "digital_interface_circuits": [], - "microcontroller_circuits": [], - "sensor_interface_circuits": [], - "other_patterns": [] - } - - # Identify power supply circuits - await ctx.report_progress(60, 100) - identified_patterns["power_supply_circuits"] = identify_power_supplies(components, nets) - - # Identify amplifier circuits - await ctx.report_progress(70, 100) - identified_patterns["amplifier_circuits"] = identify_amplifiers(components, nets) - - # Identify filter circuits - await ctx.report_progress(75, 100) - identified_patterns["filter_circuits"] = identify_filters(components, nets) - - # Identify oscillator circuits - await ctx.report_progress(80, 100) - identified_patterns["oscillator_circuits"] = identify_oscillators(components, nets) - - # Identify digital interface circuits - await ctx.report_progress(85, 100) - identified_patterns["digital_interface_circuits"] = identify_digital_interfaces(components, nets) - - # Identify microcontroller circuits - await ctx.report_progress(90, 100) - identified_patterns["microcontroller_circuits"] = identify_microcontrollers(components) - - # Identify sensor interface circuits - await ctx.report_progress(95, 100) - identified_patterns["sensor_interface_circuits"] = identify_sensor_interfaces(components, nets) - - # Build result - result = { - "success": True, - "schematic_path": schematic_path, - "component_count": netlist_data["component_count"], - "identified_patterns": identified_patterns - } - - # Count total patterns - total_patterns = sum(len(patterns) for patterns in identified_patterns.values()) - result["total_patterns_found"] = total_patterns - - # Complete progress - await ctx.report_progress(100, 100) - ctx.info(f"Pattern recognition complete. Found {total_patterns} circuit patterns.") - - return result - - except Exception as e: - ctx.info(f"Error identifying circuit patterns: {str(e)}") - return {"success": False, "error": str(e)} + logging.info(f"Loading schematic file: {os.path.basename(schematic_path)}") + return _identify_patterns_in_schematic(schematic_path) @mcp.tool() - async def analyze_project_circuit_patterns(project_path: str, ctx: Context) -> Dict[str, Any]: + def analyze_project_circuit_patterns(project_path: str) -> Dict[str, Any]: """Identify circuit patterns in a KiCad project's schematic. - + Args: project_path: Path to the KiCad project file (.kicad_pro) - ctx: MCP context for progress reporting - + Returns: Dictionary with identified circuit patterns """ if not os.path.exists(project_path): - ctx.info(f"Project not found: {project_path}") + logging.info(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} - + # Report progress - await ctx.report_progress(10, 100) - + # Progress removed(10, 100) + # Get the schematic file try: files = get_project_files(project_path) - + if "schematic" not in files: - ctx.info("Schematic file not found in project") + logging.info("Schematic file not found in project") return {"success": False, "error": "Schematic file not found in project"} - + schematic_path = files["schematic"] - ctx.info(f"Found schematic file: {os.path.basename(schematic_path)}") - + logging.info(f"Found schematic file: {os.path.basename(schematic_path)}") + # Identify patterns in the schematic - result = await identify_circuit_patterns(schematic_path, ctx) - + result = _identify_patterns_in_schematic(schematic_path) + # Add project path to result if "success" in result and result["success"]: result["project_path"] = project_path - + return result - + except Exception as e: - ctx.info(f"Error analyzing project circuit patterns: {str(e)}") + logging.info(f"Error analyzing project circuit patterns: {str(e)}") return {"success": False, "error": str(e)} + + +# Helper function for pattern identification +def _identify_patterns_in_schematic(schematic_path: str) -> Dict[str, Any]: + """Helper function to identify circuit patterns in a schematic file. + + Args: + schematic_path: Path to the KiCad schematic file (.kicad_sch) + + Returns: + Dictionary with identified circuit patterns + """ + if not os.path.exists(schematic_path): + return {"success": False, "error": f"Schematic file not found: {schematic_path}"} + + try: + # Extract netlist information + logging.info("Parsing schematic structure...") + netlist_data = extract_netlist(schematic_path) + + if "error" in netlist_data: + return {"success": False, "error": netlist_data["error"]} + + # Analyze components and nets + logging.info("Analyzing components and connections...") + components = netlist_data.get("components", {}) + nets = netlist_data.get("nets", {}) + + # Start pattern recognition + logging.info("Identifying circuit patterns...") + identified_patterns = { + "power_supply_circuits": [], + "amplifier_circuits": [], + "filter_circuits": [], + "oscillator_circuits": [], + "digital_interface_circuits": [], + "microcontroller_circuits": [], + "sensor_interface_circuits": [], + "other_patterns": [], + } + + # Identify different circuit patterns + identified_patterns["power_supply_circuits"] = identify_power_supplies(components, nets) + identified_patterns["amplifier_circuits"] = identify_amplifiers(components, nets) + identified_patterns["filter_circuits"] = identify_filters(components, nets) + identified_patterns["oscillator_circuits"] = identify_oscillators(components, nets) + identified_patterns["digital_interface_circuits"] = identify_digital_interfaces(components, nets) + identified_patterns["microcontroller_circuits"] = identify_microcontrollers(components) + identified_patterns["sensor_interface_circuits"] = identify_sensor_interfaces(components, nets) + + # Build result + result = { + "success": True, + "schematic_path": schematic_path, + "component_count": netlist_data["component_count"], + "identified_patterns": identified_patterns, + } + + # Count total patterns + total_patterns = sum(len(patterns) for patterns in identified_patterns.values()) + result["total_patterns_found"] = total_patterns + + logging.info(f"Pattern recognition complete. Found {total_patterns} circuit patterns.") + return result + + except Exception as e: + logging.info(f"Error identifying circuit patterns: {str(e)}") + return {"success": False, "error": str(e)} From b37bc012875860265bca194c4f33fc3fdc9c50ec Mon Sep 17 00:00:00 2001 From: Neil-TC Date: Wed, 8 Oct 2025 17:22:02 +0800 Subject: [PATCH 2/5] Add files via upload --- kicad_mcp/tools/drc_impl/cli_drc.py | 84 +++++++++++++---------------- 1 file changed, 37 insertions(+), 47 deletions(-) diff --git a/kicad_mcp/tools/drc_impl/cli_drc.py b/kicad_mcp/tools/drc_impl/cli_drc.py index 2d3518e..a8a17d4 100644 --- a/kicad_mcp/tools/drc_impl/cli_drc.py +++ b/kicad_mcp/tools/drc_impl/cli_drc.py @@ -1,6 +1,7 @@ """ Design Rule Check (DRC) implementation using KiCad command-line interface. """ + import os import json import subprocess @@ -10,81 +11,70 @@ from kicad_mcp.config import system -async def run_drc_via_cli(pcb_file: str, ctx: Context) -> Dict[str, Any]: + +def run_drc_via_cli(pcb_file: str) -> Dict[str, Any]: """Run DRC using KiCad command line tools. - + Args: pcb_file: Path to the PCB file (.kicad_pcb) - ctx: MCP context for progress reporting - + Returns: Dictionary with DRC results """ - results = { - "success": False, - "method": "cli", - "pcb_file": pcb_file - } - + results = {"success": False, "method": "cli", "pcb_file": pcb_file} + try: # Create a temporary directory for the output with tempfile.TemporaryDirectory() as temp_dir: # Output file for DRC report output_file = os.path.join(temp_dir, "drc_report.json") - + # Find kicad-cli executable kicad_cli = find_kicad_cli() if not kicad_cli: print("kicad-cli not found in PATH or common installation locations") - results["error"] = "kicad-cli not found. Please ensure KiCad 9.0+ is installed and kicad-cli is available." + results["error"] = ( + "kicad-cli not found. Please ensure KiCad 9.0+ is installed and kicad-cli is available." + ) return results - - # Report progress - await ctx.report_progress(50, 100) - ctx.info("Running DRC using KiCad CLI...") - + + # Report progress + print("Running DRC using KiCad CLI...") + # Build the DRC command - cmd = [ - kicad_cli, - "pcb", - "drc", - "--format", "json", - "--output", output_file, - pcb_file - ] - + cmd = [kicad_cli, "pcb", "drc", "--format", "json", "--output", output_file, pcb_file] + print(f"Running command: {' '.join(cmd)}") process = subprocess.run(cmd, capture_output=True, text=True) - + # Check if the command was successful if process.returncode != 0: print(f"DRC command failed with code {process.returncode}") print(f"Error output: {process.stderr}") results["error"] = f"DRC command failed: {process.stderr}" return results - + # Check if the output file was created if not os.path.exists(output_file): print("DRC report file not created") results["error"] = "DRC report file not created" return results - + # Read the DRC report - with open(output_file, 'r') as f: + with open(output_file, "r") as f: try: drc_report = json.load(f) except json.JSONDecodeError: print("Failed to parse DRC report JSON") results["error"] = "Failed to parse DRC report JSON" return results - + # Process the DRC report violations = drc_report.get("violations", []) violation_count = len(violations) print(f"DRC completed with {violation_count} violations") - await ctx.report_progress(70, 100) - ctx.info(f"DRC completed with {violation_count} violations") - + print(f"DRC completed with {violation_count} violations") + # Categorize violations by type error_types = {} for violation in violations: @@ -92,7 +82,7 @@ async def run_drc_via_cli(pcb_file: str, ctx: Context) -> Dict[str, Any]: if error_type not in error_types: error_types[error_type] = 0 error_types[error_type] += 1 - + # Create success response results = { "success": True, @@ -100,12 +90,12 @@ async def run_drc_via_cli(pcb_file: str, ctx: Context) -> Dict[str, Any]: "pcb_file": pcb_file, "total_violations": violation_count, "violation_categories": error_types, - "violations": violations + "violations": violations, } - - await ctx.report_progress(90, 100) + + # Progress reporting removed return results - + except Exception as e: print(f"Error in CLI DRC: {str(e)}", exc_info=True) results["error"] = f"Error in CLI DRC: {str(e)}" @@ -114,7 +104,7 @@ async def run_drc_via_cli(pcb_file: str, ctx: Context) -> Dict[str, Any]: def find_kicad_cli() -> Optional[str]: """Find the kicad-cli executable in the system PATH. - + Returns: Path to kicad-cli if found, None otherwise """ @@ -130,36 +120,36 @@ def find_kicad_cli() -> Optional[str]: result = subprocess.run(["which", "kicad-cli"], capture_output=True, text=True) if result.returncode == 0: return result.stdout.strip() - + except Exception as e: print(f"Error finding kicad-cli: {str(e)}") - + # If we get here, kicad-cli is not in PATH # Try common installation locations if system == "Windows": # Common Windows installation path potential_paths = [ r"C:\Program Files\KiCad\bin\kicad-cli.exe", - r"C:\Program Files (x86)\KiCad\bin\kicad-cli.exe" + r"C:\Program Files (x86)\KiCad\bin\kicad-cli.exe", ] elif system == "Darwin": # macOS # Common macOS installation paths potential_paths = [ "/Applications/KiCad/KiCad.app/Contents/MacOS/kicad-cli", - "/Applications/KiCad/kicad-cli" + "/Applications/KiCad/kicad-cli", ] else: # Linux and other Unix-like systems # Common Linux installation paths potential_paths = [ "/usr/bin/kicad-cli", "/usr/local/bin/kicad-cli", - "/opt/kicad/bin/kicad-cli" + "/opt/kicad/bin/kicad-cli", ] - + # Check each potential path for path in potential_paths: if os.path.exists(path) and os.access(path, os.X_OK): return path - + # If still not found, return None return None From 62150e6595679e09eb65d35dcf93bbedb058fc14 Mon Sep 17 00:00:00 2001 From: Neil-TC Date: Wed, 8 Oct 2025 17:23:17 +0800 Subject: [PATCH 3/5] Add files via upload --- kicad_mcp/context.py | 47 ++++++++++++++++++++++--------------- kicad_mcp/server.py | 56 +++++++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/kicad_mcp/context.py b/kicad_mcp/context.py index 693ed6b..4795661 100644 --- a/kicad_mcp/context.py +++ b/kicad_mcp/context.py @@ -1,56 +1,64 @@ """ Lifespan context management for KiCad MCP Server. """ + from contextlib import asynccontextmanager from dataclasses import dataclass from typing import AsyncIterator, Dict, Any -import logging # Import logging -import os # Added for PID +import logging # Import logging +import os # Added for PID from mcp.server.fastmcp import FastMCP # Get PID for logging # _PID = os.getpid() + @dataclass class KiCadAppContext: """Type-safe context for KiCad MCP server.""" + kicad_modules_available: bool - + # Optional cache for expensive operations cache: Dict[str, Any] + @asynccontextmanager -async def kicad_lifespan(server: FastMCP, kicad_modules_available: bool = False) -> AsyncIterator[KiCadAppContext]: +async def kicad_lifespan( + server: FastMCP, kicad_modules_available: bool = False +) -> AsyncIterator[KiCadAppContext]: """Manage KiCad MCP server lifecycle with type-safe context. - + This function handles: 1. Initializing shared resources when the server starts 2. Providing a typed context object to all request handlers 3. Properly cleaning up resources when the server shuts down - + Args: server: The FastMCP server instance kicad_modules_available: Flag indicating if Python modules were found (passed from create_server) - + Yields: KiCadAppContext: A typed context object shared across all handlers """ logging.info(f"Starting KiCad MCP server initialization") - + # Resources initialization - Python path setup removed # print("Setting up KiCad Python modules") # kicad_modules_available = setup_kicad_python_path() # Now passed as arg - logging.info(f"KiCad Python module availability: {kicad_modules_available} (Setup logic removed)") - + logging.info( + f"KiCad Python module availability: {kicad_modules_available} (Setup logic removed)" + ) + # Create in-memory cache for expensive operations cache: Dict[str, Any] = {} - + # Initialize any other resources that need cleanup later - created_temp_dirs = [] # Assuming this is managed elsewhere or not needed for now - + created_temp_dirs = [] # Assuming this is managed elsewhere or not needed for now + try: - # --- Removed Python module preloading section --- + # --- Removed Python module preloading section --- # if kicad_modules_available: # try: # print("Preloading KiCad Python modules") @@ -61,25 +69,26 @@ async def kicad_lifespan(server: FastMCP, kicad_modules_available: bool = False) # Yield the context to the server - server runs during this time logging.info(f"KiCad MCP server initialization complete") yield KiCadAppContext( - kicad_modules_available=kicad_modules_available, # Pass the flag through - cache=cache + kicad_modules_available=kicad_modules_available, # Pass the flag through + cache=cache, ) finally: # Clean up resources when server shuts down logging.info(f"Shutting down KiCad MCP server") - + # Clear the cache if cache: logging.info(f"Clearing cache with {len(cache)} entries") cache.clear() - + # Clean up any temporary directories import shutil + for temp_dir in created_temp_dirs: try: logging.info(f"Removing temporary directory: {temp_dir}") shutil.rmtree(temp_dir, ignore_errors=True) except Exception as e: logging.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}") - + logging.info(f"KiCad MCP server shutdown complete") diff --git a/kicad_mcp/server.py b/kicad_mcp/server.py index 5c5de67..5836da1 100644 --- a/kicad_mcp/server.py +++ b/kicad_mcp/server.py @@ -1,6 +1,7 @@ """ MCP server creation and configuration. """ + import atexit import os import signal @@ -45,27 +46,29 @@ # Store server instance for clean shutdown _server_instance = None + def add_cleanup_handler(handler: Callable) -> None: """Register a function to be called during cleanup. - + Args: handler: Function to call during cleanup """ cleanup_handlers.append(handler) + def run_cleanup_handlers() -> None: """Run all registered cleanup handlers.""" logging.info(f"Running cleanup handlers...") global _shutting_down - + # Prevent running cleanup handlers multiple times if _shutting_down: return _shutting_down = True logging.info(f"Running cleanup handlers...") - + for handler in cleanup_handlers: try: handler() @@ -73,10 +76,11 @@ def run_cleanup_handlers() -> None: except Exception as e: logging.error(f"Error in cleanup handler {handler.__name__}: {str(e)}", exc_info=True) + def shutdown_server(): """Properly shutdown the server if it exists.""" global _server_instance - + if _server_instance: try: logging.info(f"Shutting down KiCad MCP server") @@ -88,22 +92,23 @@ def shutdown_server(): def register_signal_handlers(server: FastMCP) -> None: """Register handlers for system signals to ensure clean shutdown. - + Args: server: The FastMCP server instance """ + def handle_exit_signal(signum, frame): logging.info(f"Received signal {signum}, initiating shutdown...") - + # Run cleanup first run_cleanup_handlers() - + # Then shutdown server shutdown_server() - + # Exit without waiting for stdio processes which might be blocking os._exit(0) - + # Register for common termination signals for sig in (signal.SIGINT, signal.SIGTERM): try: @@ -120,21 +125,25 @@ def create_server() -> FastMCP: # Try to set up KiCad Python path - Removed # kicad_modules_available = setup_kicad_python_path() - kicad_modules_available = False # Set to False as we removed the setup logic + kicad_modules_available = False # Set to False as we removed the setup logic # if kicad_modules_available: # print("KiCad Python modules successfully configured") # else: # Always print this now, as we rely on CLI - logging.info(f"KiCad Python module setup removed; relying on kicad-cli for external operations.") + logging.info( + f"KiCad Python module setup removed; relying on kicad-cli for external operations." + ) # Build a lifespan callable with the kwarg baked in (FastMCP 2.x dropped lifespan_kwargs) - lifespan_factory = functools.partial(kicad_lifespan, kicad_modules_available=kicad_modules_available) + lifespan_factory = functools.partial( + kicad_lifespan, kicad_modules_available=kicad_modules_available + ) # Initialize FastMCP server mcp = FastMCP("KiCad", lifespan=lifespan_factory) logging.info(f"Created FastMCP server instance with lifespan management") - + # Register resources logging.info(f"Registering resources...") register_project_resources(mcp) @@ -143,7 +152,7 @@ def create_server() -> FastMCP: register_bom_resources(mcp) register_netlist_resources(mcp) register_pattern_resources(mcp) - + # Register tools logging.info(f"Registering tools...") register_project_tools(mcp) @@ -153,7 +162,7 @@ def create_server() -> FastMCP: register_bom_tools(mcp) register_netlist_tools(mcp) register_pattern_tools(mcp) - + # Register prompts logging.info(f"Registering prompts...") register_prompts(mcp) @@ -164,7 +173,7 @@ def create_server() -> FastMCP: # Register signal handlers and cleanup register_signal_handlers(mcp) atexit.register(run_cleanup_handlers) - + # Add specific cleanup handlers add_cleanup_handler(lambda: logging.info(f"KiCad MCP server shutdown complete")) @@ -173,10 +182,10 @@ def cleanup_temp_dirs(): """Clean up any temporary directories created by the server.""" import shutil from kicad_mcp.utils.temp_dir_manager import get_temp_dirs - + temp_dirs = get_temp_dirs() logging.info(f"Cleaning up {len(temp_dirs)} temporary directories") - + for temp_dir in temp_dirs: try: if os.path.exists(temp_dir): @@ -184,9 +193,9 @@ def cleanup_temp_dirs(): logging.info(f"Removed temporary directory: {temp_dir}") except Exception as e: logging.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}") - + add_cleanup_handler(cleanup_temp_dirs) - + logging.info(f"Server initialization complete") return mcp @@ -205,8 +214,7 @@ def cleanup_handler() -> None: def setup_logging() -> None: """Configure logging for the server.""" logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) @@ -214,9 +222,9 @@ def main() -> None: """Start the KiCad MCP server (blocking).""" setup_logging() logging.info("Starting KiCad MCP server...") - + server = create_server() - + try: server.run() # FastMCP manages its own event loop except KeyboardInterrupt: From c7681cf150abd2941be6203de7c525cbd801dca5 Mon Sep 17 00:00:00 2001 From: Neil-TC Date: Wed, 8 Oct 2025 17:24:21 +0800 Subject: [PATCH 4/5] Add files via upload --- kicad_mcp/utils/drc_history.py | 88 +- kicad_mcp/utils/kicad_utils.py | 65 +- kicad_mcp/utils/netlist_parser.py | 366 ++++---- kicad_mcp/utils/pattern_recognition.py | 1159 ++++++++++++++---------- 4 files changed, 939 insertions(+), 739 deletions(-) diff --git a/kicad_mcp/utils/drc_history.py b/kicad_mcp/utils/drc_history.py index c31296b..535ec9e 100644 --- a/kicad_mcp/utils/drc_history.py +++ b/kicad_mcp/utils/drc_history.py @@ -3,6 +3,7 @@ This will allow users to compare DRC results over time. """ + import os import json import platform @@ -13,11 +14,14 @@ # Directory for storing DRC history if platform.system() == "Windows": # Windows: Use APPDATA or LocalAppData - DRC_HISTORY_DIR = os.path.join(os.environ.get("APPDATA", os.path.expanduser("~")), "kicad_mcp", "drc_history") + DRC_HISTORY_DIR = os.path.join( + os.environ.get("APPDATA", os.path.expanduser("~")), "kicad_mcp", "drc_history" + ) else: # macOS/Linux: Use ~/.kicad_mcp/drc_history DRC_HISTORY_DIR = os.path.expanduser("~/.kicad_mcp/drc_history") + def ensure_history_dir() -> None: """Ensure the DRC history directory exists.""" os.makedirs(DRC_HISTORY_DIR, exist_ok=True) @@ -25,66 +29,64 @@ def ensure_history_dir() -> None: def get_project_history_path(project_path: str) -> str: """Get the path to the DRC history file for a project. - + Args: project_path: Path to the KiCad project file - + Returns: Path to the project's DRC history file """ # Create a safe filename from the project path - project_hash = hash(project_path) & 0xffffffff # Ensure positive hash + project_hash = hash(project_path) & 0xFFFFFFFF # Ensure positive hash basename = os.path.basename(project_path) history_filename = f"{basename}_{project_hash}_drc_history.json" - + return os.path.join(DRC_HISTORY_DIR, history_filename) def save_drc_result(project_path: str, drc_result: Dict[str, Any]) -> None: """Save a DRC result to the project's history. - + Args: project_path: Path to the KiCad project file drc_result: DRC result dictionary """ ensure_history_dir() history_path = get_project_history_path(project_path) - + # Create a history entry timestamp = time.time() formatted_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") - + history_entry = { "timestamp": timestamp, "datetime": formatted_time, "total_violations": drc_result.get("total_violations", 0), - "violation_categories": drc_result.get("violation_categories", {}) + "violation_categories": drc_result.get("violation_categories", {}), } - + # Load existing history or create new if os.path.exists(history_path): try: - with open(history_path, 'r') as f: + with open(history_path, "r") as f: history = json.load(f) except (json.JSONDecodeError, IOError) as e: print(f"Error loading DRC history: {str(e)}") history = {"project_path": project_path, "entries": []} else: history = {"project_path": project_path, "entries": []} - + # Add new entry and save history["entries"].append(history_entry) - + # Keep only the last 10 entries to avoid excessive storage if len(history["entries"]) > 10: - history["entries"] = sorted( - history["entries"], - key=lambda x: x["timestamp"], - reverse=True - )[:10] - + history["entries"] = sorted(history["entries"], key=lambda x: x["timestamp"], reverse=True)[ + :10 + ] + try: - with open(history_path, 'w') as f: + with open(history_path, "w") as f: json.dump(history, f, indent=2) print(f"Saved DRC history entry to {history_path}") except IOError as e: @@ -93,71 +95,71 @@ def save_drc_result(project_path: str, drc_result: Dict[str, Any]) -> None: def get_drc_history(project_path: str) -> List[Dict[str, Any]]: """Get the DRC history for a project. - + Args: project_path: Path to the KiCad project file - + Returns: List of DRC history entries, sorted by timestamp (newest first) """ history_path = get_project_history_path(project_path) - + if not os.path.exists(history_path): print(f"No DRC history found for {project_path}") return [] - + try: - with open(history_path, 'r') as f: + with open(history_path, "r") as f: history = json.load(f) - + # Sort entries by timestamp (newest first) entries = sorted( - history.get("entries", []), - key=lambda x: x.get("timestamp", 0), - reverse=True + history.get("entries", []), key=lambda x: x.get("timestamp", 0), reverse=True ) - + return entries except (json.JSONDecodeError, IOError) as e: print(f"Error reading DRC history: {str(e)}") return [] -def compare_with_previous(project_path: str, current_result: Dict[str, Any]) -> Optional[Dict[str, Any]]: +def compare_with_previous( + project_path: str, current_result: Dict[str, Any] +) -> Optional[Dict[str, Any]]: """Compare current DRC result with the previous one. - + Args: project_path: Path to the KiCad project file current_result: Current DRC result dictionary - + Returns: Comparison dictionary or None if no history exists """ history = get_drc_history(project_path) - + if not history or len(history) < 2: # Need at least one previous entry return None - + previous = history[0] # Most recent entry current_violations = current_result.get("total_violations", 0) previous_violations = previous.get("total_violations", 0) - + # Compare violation categories current_categories = current_result.get("violation_categories", {}) previous_categories = previous.get("violation_categories", {}) - + # Find new categories new_categories = {} for category, count in current_categories.items(): if category not in previous_categories: new_categories[category] = count - + # Find resolved categories resolved_categories = {} for category, count in previous_categories.items(): if category not in current_categories: resolved_categories[category] = count - + # Find changed categories changed_categories = {} for category, count in current_categories.items(): @@ -165,9 +167,9 @@ def compare_with_previous(project_path: str, current_result: Dict[str, Any]) -> changed_categories[category] = { "current": count, "previous": previous_categories[category], - "change": count - previous_categories[category] + "change": count - previous_categories[category], } - + comparison = { "current_violations": current_violations, "previous_violations": previous_violations, @@ -175,7 +177,7 @@ def compare_with_previous(project_path: str, current_result: Dict[str, Any]) -> "previous_datetime": previous.get("datetime", "unknown"), "new_categories": new_categories, "resolved_categories": resolved_categories, - "changed_categories": changed_categories + "changed_categories": changed_categories, } - + return comparison diff --git a/kicad_mcp/utils/kicad_utils.py b/kicad_mcp/utils/kicad_utils.py index 8e8e29a..56363ba 100644 --- a/kicad_mcp/utils/kicad_utils.py +++ b/kicad_mcp/utils/kicad_utils.py @@ -1,10 +1,11 @@ """ KiCad-specific utility functions. """ + import os -import logging # Import logging +import logging # Import logging import subprocess -import sys # Add sys import +import sys # Add sys import from typing import Dict, List, Any from kicad_mcp import config @@ -12,14 +13,15 @@ # Get PID for logging - Removed, handled by logging config # _PID = os.getpid() + def find_kicad_projects() -> List[Dict[str, Any]]: """Find KiCad projects in the user's directory. - + Returns: List of dictionaries with project information """ projects = [] - logging.info("Attempting to find KiCad projects...") # Log start + logging.info("Attempting to find KiCad projects...") # Log start # Search directories to look for KiCad projects raw_search_dirs = [config.KICAD_USER_DIR] + config.ADDITIONAL_SEARCH_PATHS logging.info(f"Raw KICAD_USER_DIR: '{config.KICAD_USER_DIR}'") @@ -28,19 +30,21 @@ def find_kicad_projects() -> List[Dict[str, Any]]: expanded_search_dirs = [] for raw_dir in raw_search_dirs: - expanded_dir = os.path.expanduser(raw_dir) # Expand ~ and ~user + expanded_dir = os.path.expanduser(raw_dir) # Expand ~ and ~user if expanded_dir not in expanded_search_dirs: expanded_search_dirs.append(expanded_dir) else: logging.info(f"Skipping duplicate expanded path: {expanded_dir}") - + logging.info(f"Expanded search directories: {expanded_search_dirs}") for search_dir in expanded_search_dirs: if not os.path.exists(search_dir): - logging.warning(f"Expanded search directory does not exist: {search_dir}") # Use warning level + logging.warning( + f"Expanded search directory does not exist: {search_dir}" + ) # Use warning level continue - + logging.info(f"Scanning expanded directory: {search_dir}") # Use followlinks=True to follow symlinks if needed for root, _, files in os.walk(search_dir, followlinks=True): @@ -51,7 +55,7 @@ def find_kicad_projects() -> List[Dict[str, Any]]: if not os.path.isfile(project_path): logging.info(f"Skipping non-file/broken symlink: {project_path}") continue - + try: # Attempt to get modification time to ensure file is accessible mod_time = os.path.getmtime(project_path) @@ -59,50 +63,55 @@ def find_kicad_projects() -> List[Dict[str, Any]]: project_name = get_project_name_from_path(project_path) logging.info(f"Found accessible KiCad project: {project_path}") - projects.append({ - "name": project_name, - "path": project_path, - "relative_path": rel_path, - "modified": mod_time - }) + projects.append( + { + "name": project_name, + "path": project_path, + "relative_path": rel_path, + "modified": mod_time, + } + ) except OSError as e: - logging.error(f"Error accessing project file {project_path}: {e}") # Use error level - continue # Skip if we can't access it - + logging.error( + f"Error accessing project file {project_path}: {e}" + ) # Use error level + continue # Skip if we can't access it + logging.info(f"Found {len(projects)} KiCad projects after scanning.") return projects + def get_project_name_from_path(project_path: str) -> str: """Extract the project name from a .kicad_pro file path. - + Args: project_path: Path to the .kicad_pro file - + Returns: Project name without extension """ basename = os.path.basename(project_path) - return basename[:-len(config.KICAD_EXTENSIONS["project"])] + return basename[: -len(config.KICAD_EXTENSIONS["project"])] def open_kicad_project(project_path: str) -> Dict[str, Any]: """Open a KiCad project using the KiCad application. - + Args: project_path: Path to the .kicad_pro file - + Returns: Dictionary with result information """ if not os.path.exists(project_path): return {"success": False, "error": f"Project not found: {project_path}"} - + try: cmd = [] if sys.platform == "darwin": # macOS # On MacOS, use the 'open' command to open the project in KiCad cmd = ["open", "-a", config.KICAD_APP_PATH, project_path] - elif sys.platform == "linux": # Linux + elif sys.platform == "linux": # Linux # On Linux, use 'xdg-open' cmd = ["xdg-open", project_path] else: @@ -110,13 +119,13 @@ def open_kicad_project(project_path: str) -> Dict[str, Any]: return {"success": False, "error": f"Unsupported operating system: {sys.platform}"} result = subprocess.run(cmd, capture_output=True, text=True) - + return { "success": result.returncode == 0, "command": " ".join(cmd), "output": result.stdout, - "error": result.stderr if result.returncode != 0 else None + "error": result.stderr if result.returncode != 0 else None, } - + except Exception as e: return {"success": False, "error": str(e)} diff --git a/kicad_mcp/utils/netlist_parser.py b/kicad_mcp/utils/netlist_parser.py index 894eb3b..5202652 100644 --- a/kicad_mcp/utils/netlist_parser.py +++ b/kicad_mcp/utils/netlist_parser.py @@ -1,17 +1,19 @@ """ KiCad schematic netlist extraction utilities. """ + import os import re from typing import Any, Dict, List from collections import defaultdict + class SchematicParser: """Parser for KiCad schematic files to extract netlist information.""" - + def __init__(self, schematic_path: str): """Initialize the schematic parser. - + Args: schematic_path: Path to the KiCad schematic file (.kicad_sch) """ @@ -25,14 +27,14 @@ def __init__(self, schematic_path: str): self.power_symbols = [] self.hierarchical_labels = [] self.global_labels = [] - + # Netlist information self.nets = defaultdict(list) # Net name -> connected pins self.component_pins = {} # (component_ref, pin_num) -> net_name - + # Component information self.component_info = {} # component_ref -> component details - + # Load the file self._load_schematic() @@ -41,9 +43,9 @@ def _load_schematic(self) -> None: if not os.path.exists(self.schematic_path): print(f"Schematic file not found: {self.schematic_path}") raise FileNotFoundError(f"Schematic file not found: {self.schematic_path}") - + try: - with open(self.schematic_path, 'r') as f: + with open(self.schematic_path, "r") as f: self.content = f.read() print(f"Successfully loaded schematic: {self.schematic_path}") except Exception as e: @@ -52,33 +54,33 @@ def _load_schematic(self) -> None: def parse(self) -> Dict[str, Any]: """Parse the schematic to extract netlist information. - + Returns: Dictionary with parsed netlist information """ print("Starting schematic parsing") - + # Extract symbols (components) self._extract_components() - + # Extract wires self._extract_wires() - + # Extract junctions self._extract_junctions() - + # Extract labels self._extract_labels() - + # Extract power symbols self._extract_power_symbols() - + # Extract no-connects self._extract_no_connects() - + # Build netlist self._build_netlist() - + # Create result result = { "components": self.component_info, @@ -88,303 +90,325 @@ def parse(self) -> Dict[str, Any]: "junctions": self.junctions, "power_symbols": self.power_symbols, "component_count": len(self.component_info), - "net_count": len(self.nets) + "net_count": len(self.nets), } - - print(f"Schematic parsing complete: found {len(self.component_info)} components and {len(self.nets)} nets") + + print( + f"Schematic parsing complete: found {len(self.component_info)} components and {len(self.nets)} nets" + ) return result def _extract_s_expressions(self, pattern: str) -> List[str]: """Extract all matching S-expressions from the schematic content. - + Args: pattern: Regex pattern to match the start of S-expressions - + Returns: List of matching S-expressions """ matches = [] positions = [] - + # Find all starting positions of matches for match in re.finditer(pattern, self.content): positions.append(match.start()) - + # Extract full S-expressions for each match for pos in positions: # Start from the matching position current_pos = pos depth = 0 s_exp = "" - + # Extract the full S-expression by tracking parentheses while current_pos < len(self.content): char = self.content[current_pos] s_exp += char - - if char == '(': + + if char == "(": depth += 1 - elif char == ')': + elif char == ")": depth -= 1 if depth == 0: # Found the end of the S-expression break - + current_pos += 1 - + matches.append(s_exp) - + return matches def _extract_components(self) -> None: """Extract component information from schematic.""" print("Extracting components") - + # Extract all symbol expressions (components) - symbols = self._extract_s_expressions(r'\(symbol\s+') - + symbols = self._extract_s_expressions(r"\(symbol\s+") + for symbol in symbols: component = self._parse_component(symbol) if component: self.components.append(component) - + # Add to component info dictionary - ref = component.get('reference', 'Unknown') + ref = component.get("reference", "Unknown") self.component_info[ref] = component - + print(f"Extracted {len(self.components)} components") def _parse_component(self, symbol_expr: str) -> Dict[str, Any]: """Parse a component from a symbol S-expression. - + Args: symbol_expr: Symbol S-expression - + Returns: Component information dictionary """ component = {} - + # Extract library component ID lib_id_match = re.search(r'\(lib_id\s+"([^"]+)"\)', symbol_expr) if lib_id_match: - component['lib_id'] = lib_id_match.group(1) - + component["lib_id"] = lib_id_match.group(1) + # Extract reference (e.g., R1, C2) property_matches = re.finditer(r'\(property\s+"([^"]+)"\s+"([^"]+)"', symbol_expr) for match in property_matches: prop_name = match.group(1) prop_value = match.group(2) - + if prop_name == "Reference": - component['reference'] = prop_value + component["reference"] = prop_value elif prop_name == "Value": - component['value'] = prop_value + component["value"] = prop_value elif prop_name == "Footprint": - component['footprint'] = prop_value + component["footprint"] = prop_value else: # Store other properties - if 'properties' not in component: - component['properties'] = {} - component['properties'][prop_name] = prop_value - + if "properties" not in component: + component["properties"] = {} + component["properties"][prop_name] = prop_value + # Extract position - pos_match = re.search(r'\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', symbol_expr) + pos_match = re.search(r"\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)", symbol_expr) if pos_match: - component['position'] = { - 'x': float(pos_match.group(1)), - 'y': float(pos_match.group(2)), - 'angle': float(pos_match.group(3).strip() if pos_match.group(3) else 0) + component["position"] = { + "x": float(pos_match.group(1)), + "y": float(pos_match.group(2)), + "angle": float(pos_match.group(3).strip() if pos_match.group(3) else 0), } - + # Extract pins pins = [] - pin_matches = re.finditer(r'\(pin\s+\(num\s+"([^"]+)"\)\s+\(name\s+"([^"]+)"\)', symbol_expr) + pin_matches = re.finditer( + r'\(pin\s+\(num\s+"([^"]+)"\)\s+\(name\s+"([^"]+)"\)', symbol_expr + ) for match in pin_matches: pin_num = match.group(1) pin_name = match.group(2) - pins.append({ - 'num': pin_num, - 'name': pin_name - }) - + pins.append({"num": pin_num, "name": pin_name}) + if pins: - component['pins'] = pins - + component["pins"] = pins + return component def _extract_wires(self) -> None: """Extract wire information from schematic.""" print("Extracting wires") - + # Extract all wire expressions - wires = self._extract_s_expressions(r'\(wire\s+') - + wires = self._extract_s_expressions(r"\(wire\s+") + for wire in wires: # Extract the wire coordinates - pts_match = re.search(r'\(pts\s+\(xy\s+([\d\.-]+)\s+([\d\.-]+)\)\s+\(xy\s+([\d\.-]+)\s+([\d\.-]+)\)\)', wire) + pts_match = re.search( + r"\(pts\s+\(xy\s+([\d\.-]+)\s+([\d\.-]+)\)\s+\(xy\s+([\d\.-]+)\s+([\d\.-]+)\)\)", + wire, + ) if pts_match: - self.wires.append({ - 'start': { - 'x': float(pts_match.group(1)), - 'y': float(pts_match.group(2)) - }, - 'end': { - 'x': float(pts_match.group(3)), - 'y': float(pts_match.group(4)) + self.wires.append( + { + "start": {"x": float(pts_match.group(1)), "y": float(pts_match.group(2))}, + "end": {"x": float(pts_match.group(3)), "y": float(pts_match.group(4))}, } - }) - + ) + print(f"Extracted {len(self.wires)} wires") def _extract_junctions(self) -> None: """Extract junction information from schematic.""" print("Extracting junctions") - + # Extract all junction expressions - junctions = self._extract_s_expressions(r'\(junction\s+') - + junctions = self._extract_s_expressions(r"\(junction\s+") + for junction in junctions: # Extract the junction coordinates - xy_match = re.search(r'\(junction\s+\(xy\s+([\d\.-]+)\s+([\d\.-]+)\)\)', junction) + xy_match = re.search(r"\(junction\s+\(xy\s+([\d\.-]+)\s+([\d\.-]+)\)\)", junction) if xy_match: - self.junctions.append({ - 'x': float(xy_match.group(1)), - 'y': float(xy_match.group(2)) - }) - + self.junctions.append( + {"x": float(xy_match.group(1)), "y": float(xy_match.group(2))} + ) + print(f"Extracted {len(self.junctions)} junctions") def _extract_labels(self) -> None: """Extract label information from schematic.""" print("Extracting labels") - + # Extract local labels - local_labels = self._extract_s_expressions(r'\(label\s+') - + local_labels = self._extract_s_expressions(r"\(label\s+") + for label in local_labels: # Extract label text and position - label_match = re.search(r'\(label\s+"([^"]+)"\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', label) + label_match = re.search( + r'\(label\s+"([^"]+)"\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', label + ) if label_match: - self.labels.append({ - 'type': 'local', - 'text': label_match.group(1), - 'position': { - 'x': float(label_match.group(2)), - 'y': float(label_match.group(3)), - 'angle': float(label_match.group(4).strip() if label_match.group(4) else 0) + self.labels.append( + { + "type": "local", + "text": label_match.group(1), + "position": { + "x": float(label_match.group(2)), + "y": float(label_match.group(3)), + "angle": float( + label_match.group(4).strip() if label_match.group(4) else 0 + ), + }, } - }) - + ) + # Extract global labels - global_labels = self._extract_s_expressions(r'\(global_label\s+') - + global_labels = self._extract_s_expressions(r"\(global_label\s+") + for label in global_labels: # Extract global label text and position - label_match = re.search(r'\(global_label\s+"([^"]+)"\s+\(shape\s+([^\s\)]+)\)\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', label) + label_match = re.search( + r'\(global_label\s+"([^"]+)"\s+\(shape\s+([^\s\)]+)\)\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', + label, + ) if label_match: - self.global_labels.append({ - 'type': 'global', - 'text': label_match.group(1), - 'shape': label_match.group(2), - 'position': { - 'x': float(label_match.group(3)), - 'y': float(label_match.group(4)), - 'angle': float(label_match.group(5).strip() if label_match.group(5) else 0) + self.global_labels.append( + { + "type": "global", + "text": label_match.group(1), + "shape": label_match.group(2), + "position": { + "x": float(label_match.group(3)), + "y": float(label_match.group(4)), + "angle": float( + label_match.group(5).strip() if label_match.group(5) else 0 + ), + }, } - }) - + ) + # Extract hierarchical labels - hierarchical_labels = self._extract_s_expressions(r'\(hierarchical_label\s+') - + hierarchical_labels = self._extract_s_expressions(r"\(hierarchical_label\s+") + for label in hierarchical_labels: # Extract hierarchical label text and position - label_match = re.search(r'\(hierarchical_label\s+"([^"]+)"\s+\(shape\s+([^\s\)]+)\)\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', label) + label_match = re.search( + r'\(hierarchical_label\s+"([^"]+)"\s+\(shape\s+([^\s\)]+)\)\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', + label, + ) if label_match: - self.hierarchical_labels.append({ - 'type': 'hierarchical', - 'text': label_match.group(1), - 'shape': label_match.group(2), - 'position': { - 'x': float(label_match.group(3)), - 'y': float(label_match.group(4)), - 'angle': float(label_match.group(5).strip() if label_match.group(5) else 0) + self.hierarchical_labels.append( + { + "type": "hierarchical", + "text": label_match.group(1), + "shape": label_match.group(2), + "position": { + "x": float(label_match.group(3)), + "y": float(label_match.group(4)), + "angle": float( + label_match.group(5).strip() if label_match.group(5) else 0 + ), + }, } - }) - - print(f"Extracted {len(self.labels)} local labels, {len(self.global_labels)} global labels, and {len(self.hierarchical_labels)} hierarchical labels") + ) + + print( + f"Extracted {len(self.labels)} local labels, {len(self.global_labels)} global labels, and {len(self.hierarchical_labels)} hierarchical labels" + ) def _extract_power_symbols(self) -> None: """Extract power symbol information from schematic.""" print("Extracting power symbols") - + # Extract all power symbol expressions power_symbols = self._extract_s_expressions(r'\(symbol\s+\(lib_id\s+"power:') - + for symbol in power_symbols: # Extract power symbol type and position type_match = re.search(r'\(lib_id\s+"power:([^"]+)"\)', symbol) - pos_match = re.search(r'\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)', symbol) - + pos_match = re.search(r"\(at\s+([\d\.-]+)\s+([\d\.-]+)(\s+[\d\.-]+)?\)", symbol) + if type_match and pos_match: - self.power_symbols.append({ - 'type': type_match.group(1), - 'position': { - 'x': float(pos_match.group(1)), - 'y': float(pos_match.group(2)), - 'angle': float(pos_match.group(3).strip() if pos_match.group(3) else 0) + self.power_symbols.append( + { + "type": type_match.group(1), + "position": { + "x": float(pos_match.group(1)), + "y": float(pos_match.group(2)), + "angle": float(pos_match.group(3).strip() if pos_match.group(3) else 0), + }, } - }) - + ) + print(f"Extracted {len(self.power_symbols)} power symbols") def _extract_no_connects(self) -> None: """Extract no-connect information from schematic.""" print("Extracting no-connects") - + # Extract all no-connect expressions - no_connects = self._extract_s_expressions(r'\(no_connect\s+') - + no_connects = self._extract_s_expressions(r"\(no_connect\s+") + for no_connect in no_connects: # Extract the no-connect coordinates - xy_match = re.search(r'\(no_connect\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)\)', no_connect) + xy_match = re.search(r"\(no_connect\s+\(at\s+([\d\.-]+)\s+([\d\.-]+)\)", no_connect) if xy_match: - self.no_connects.append({ - 'x': float(xy_match.group(1)), - 'y': float(xy_match.group(2)) - }) - + self.no_connects.append( + {"x": float(xy_match.group(1)), "y": float(xy_match.group(2))} + ) + print(f"Extracted {len(self.no_connects)} no-connects") def _build_netlist(self) -> None: """Build the netlist from extracted components and connections.""" print("Building netlist from schematic data") - + # TODO: Implement netlist building algorithm # This is a complex task that involves: # 1. Tracking connections between components via wires # 2. Handling labels (local, global, hierarchical) # 3. Processing power symbols # 4. Resolving junctions - + # For now, we'll implement a basic version that creates a list of nets # based on component references and pin numbers - + # Process global labels as nets for label in self.global_labels: - net_name = label['text'] + net_name = label["text"] self.nets[net_name] = [] # Initialize empty list for this net - + # Process power symbols as nets for power in self.power_symbols: - net_name = power['type'] + net_name = power["type"] if net_name not in self.nets: self.nets[net_name] = [] - + # In a full implementation, we would now trace connections between # components, but that requires a more complex algorithm to follow wires # and detect connected pins - + # For demonstration, we'll add a placeholder note print("Note: Full netlist building requires complex connectivity tracing") print(f"Found {len(self.nets)} potential nets from labels and power symbols") @@ -392,10 +416,10 @@ def _build_netlist(self) -> None: def extract_netlist(schematic_path: str) -> Dict[str, Any]: """Extract netlist information from a KiCad schematic file. - + Args: schematic_path: Path to the KiCad schematic file (.kicad_sch) - + Returns: Dictionary with netlist information """ @@ -404,21 +428,15 @@ def extract_netlist(schematic_path: str) -> Dict[str, Any]: return parser.parse() except Exception as e: print(f"Error extracting netlist: {str(e)}") - return { - "error": str(e), - "components": {}, - "nets": {}, - "component_count": 0, - "net_count": 0 - } + return {"error": str(e), "components": {}, "nets": {}, "component_count": 0, "net_count": 0} def analyze_netlist(netlist_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze netlist data to provide insights. - + Args: netlist_data: Dictionary with netlist information - + Returns: Dictionary with analysis results """ @@ -426,23 +444,25 @@ def analyze_netlist(netlist_data: Dict[str, Any]) -> Dict[str, Any]: "component_count": netlist_data.get("component_count", 0), "net_count": netlist_data.get("net_count", 0), "component_types": defaultdict(int), - "power_nets": [] + "power_nets": [], } - + # Analyze component types for ref, component in netlist_data.get("components", {}).items(): # Extract component type from reference (e.g., R1 -> R) - comp_type = re.match(r'^([A-Za-z_]+)', ref) + comp_type = re.match(r"^([A-Za-z_]+)", ref) if comp_type: results["component_types"][comp_type.group(1)] += 1 - + # Identify power nets for net_name in netlist_data.get("nets", {}): - if any(net_name.startswith(prefix) for prefix in ["VCC", "VDD", "GND", "+5V", "+3V3", "+12V"]): + if any( + net_name.startswith(prefix) for prefix in ["VCC", "VDD", "GND", "+5V", "+3V3", "+12V"] + ): results["power_nets"].append(net_name) - + # Count pin connections total_pins = sum(len(pins) for pins in netlist_data.get("nets", {}).values()) results["total_pin_connections"] = total_pins - + return results diff --git a/kicad_mcp/utils/pattern_recognition.py b/kicad_mcp/utils/pattern_recognition.py index 958f1c6..d84a568 100644 --- a/kicad_mcp/utils/pattern_recognition.py +++ b/kicad_mcp/utils/pattern_recognition.py @@ -4,491 +4,592 @@ import re from typing import Dict, List, Any -from kicad_mcp.utils.component_utils import extract_voltage_from_regulator, extract_frequency_from_value +from kicad_mcp.utils.component_utils import ( + extract_voltage_from_regulator, + extract_frequency_from_value, +) -def identify_power_supplies(components: Dict[str, Any], nets: Dict[str, Any]) -> List[Dict[str, Any]]: + +def identify_power_supplies( + components: Dict[str, Any], nets: Dict[str, Any] +) -> List[Dict[str, Any]]: """Identify power supply circuits in the schematic. - + Args: components: Dictionary of components from netlist nets: Dictionary of nets from netlist - + Returns: List of identified power supply circuits """ power_supplies = [] - + # Look for voltage regulators (Linear) regulator_patterns = { "78xx": r"78\d\d|LM78\d\d|MC78\d\d", # 7805, 7812, etc. "79xx": r"79\d\d|LM79\d\d|MC79\d\d", # 7905, 7912, etc. - "LDO": r"LM\d{3}|LD\d{3}|AMS\d{4}|LT\d{4}|TLV\d{3}|AP\d{4}|MIC\d{4}|NCP\d{3}|LP\d{4}|L\d{2}|TPS\d{5}" + "LDO": r"LM\d{3}|LD\d{3}|AMS\d{4}|LT\d{4}|TLV\d{3}|AP\d{4}|MIC\d{4}|NCP\d{3}|LP\d{4}|L\d{2}|TPS\d{5}", } - + for ref, component in components.items(): # Check for voltage regulators by part value or lib_id - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + for reg_type, pattern in regulator_patterns.items(): - if re.search(pattern, component_value, re.IGNORECASE) or re.search(pattern, component_lib, re.IGNORECASE): + if re.search(pattern, component_value, re.IGNORECASE) or re.search( + pattern, component_lib, re.IGNORECASE + ): # Found a regulator, look for associated components - power_supplies.append({ - "type": "linear_regulator", - "subtype": reg_type, - "main_component": ref, - "value": component_value, - "input_voltage": "unknown", # Would need more analysis to determine - "output_voltage": extract_voltage_from_regulator(component_value), - "associated_components": [] # Would need connection analysis to find these - }) - + power_supplies.append( + { + "type": "linear_regulator", + "subtype": reg_type, + "main_component": ref, + "value": component_value, + "input_voltage": "unknown", # Would need more analysis to determine + "output_voltage": extract_voltage_from_regulator(component_value), + "associated_components": [], # Would need connection analysis to find these + } + ) + # Look for switching regulators switching_patterns = { "buck": r"LM\d{4}|TPS\d{4}|MP\d{4}|RT\d{4}|LT\d{4}|MC\d{4}|NCP\d{4}|TL\d{4}|LTC\d{4}", "boost": r"MC\d{4}|LT\d{4}|TPS\d{4}|MAX\d{4}|NCP\d{4}|LTC\d{4}", - "buck_boost": r"LTC\d{4}|LM\d{4}|TPS\d{4}|MAX\d{4}" + "buck_boost": r"LTC\d{4}|LM\d{4}|TPS\d{4}|MAX\d{4}", } - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + # Check for inductor (key component in switching supplies) - if ref.startswith('L') or 'Inductor' in component_lib: + if ref.startswith("L") or "Inductor" in component_lib: # Look for nearby ICs that might be switching controllers for ic_ref, ic_component in components.items(): - if ic_ref.startswith('U') or ic_ref.startswith('IC'): - ic_value = ic_component.get('value', '').upper() - ic_lib = ic_component.get('lib_id', '').upper() - + if ic_ref.startswith("U") or ic_ref.startswith("IC"): + ic_value = ic_component.get("value", "").upper() + ic_lib = ic_component.get("lib_id", "").upper() + for converter_type, pattern in switching_patterns.items(): - if re.search(pattern, ic_value, re.IGNORECASE) or re.search(pattern, ic_lib, re.IGNORECASE): - power_supplies.append({ - "type": "switching_regulator", - "subtype": converter_type, - "main_component": ic_ref, - "inductor": ref, - "value": ic_value - }) - + if re.search(pattern, ic_value, re.IGNORECASE) or re.search( + pattern, ic_lib, re.IGNORECASE + ): + power_supplies.append( + { + "type": "switching_regulator", + "subtype": converter_type, + "main_component": ic_ref, + "inductor": ref, + "value": ic_value, + } + ) + return power_supplies def identify_amplifiers(components: Dict[str, Any], nets: Dict[str, Any]) -> List[Dict[str, Any]]: """Identify amplifier circuits in the schematic. - + Args: components: Dictionary of components from netlist nets: Dictionary of nets from netlist - + Returns: List of identified amplifier circuits """ amplifiers = [] - + # Look for op-amps opamp_patterns = [ r"LM\d{3}|TL\d{3}|NE\d{3}|LF\d{3}|OP\d{2}|MCP\d{3}|AD\d{3}|LT\d{4}|OPA\d{3}", - r"Opamp|Op-Amp|OpAmp|Operational Amplifier" + r"Opamp|Op-Amp|OpAmp|Operational Amplifier", ] - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + # Check for op-amps for pattern in opamp_patterns: - if re.search(pattern, component_value, re.IGNORECASE) or re.search(pattern, component_lib, re.IGNORECASE): + if re.search(pattern, component_value, re.IGNORECASE) or re.search( + pattern, component_lib, re.IGNORECASE + ): # Common op-amps - if re.search(r"LM358|LM324|TL072|TL082|NE5532|LF353|MCP6002|AD8620|OPA2134", component_value, re.IGNORECASE): - amplifiers.append({ - "type": "operational_amplifier", - "subtype": "general_purpose", - "component": ref, - "value": component_value - }) + if re.search( + r"LM358|LM324|TL072|TL082|NE5532|LF353|MCP6002|AD8620|OPA2134", + component_value, + re.IGNORECASE, + ): + amplifiers.append( + { + "type": "operational_amplifier", + "subtype": "general_purpose", + "component": ref, + "value": component_value, + } + ) # Audio op-amps - elif re.search(r"NE5534|OPA134|OPA1612|OPA1652|LM4562|LME49720|LME49860|TL071|TL072", component_value, re.IGNORECASE): - amplifiers.append({ - "type": "operational_amplifier", - "subtype": "audio", - "component": ref, - "value": component_value - }) + elif re.search( + r"NE5534|OPA134|OPA1612|OPA1652|LM4562|LME49720|LME49860|TL071|TL072", + component_value, + re.IGNORECASE, + ): + amplifiers.append( + { + "type": "operational_amplifier", + "subtype": "audio", + "component": ref, + "value": component_value, + } + ) # Instrumentation amplifiers - elif re.search(r"INA\d{3}|AD620|AD8221|AD8429|LT1167", component_value, re.IGNORECASE): - amplifiers.append({ - "type": "operational_amplifier", - "subtype": "instrumentation", - "component": ref, - "value": component_value - }) + elif re.search( + r"INA\d{3}|AD620|AD8221|AD8429|LT1167", component_value, re.IGNORECASE + ): + amplifiers.append( + { + "type": "operational_amplifier", + "subtype": "instrumentation", + "component": ref, + "value": component_value, + } + ) else: - amplifiers.append({ - "type": "operational_amplifier", - "subtype": "unknown", - "component": ref, - "value": component_value - }) - + amplifiers.append( + { + "type": "operational_amplifier", + "subtype": "unknown", + "component": ref, + "value": component_value, + } + ) + # Look for transistor amplifiers - transistor_refs = [ref for ref in components.keys() if ref.startswith('Q')] - + transistor_refs = [ref for ref in components.keys() if ref.startswith("Q")] + for ref in transistor_refs: component = components[ref] - component_lib = component.get('lib_id', '').upper() - + component_lib = component.get("lib_id", "").upper() + # Check if it's a BJT or FET - if 'BJT' in component_lib or 'NPN' in component_lib or 'PNP' in component_lib: + if "BJT" in component_lib or "NPN" in component_lib or "PNP" in component_lib: # Look for resistors connected to transistor (biasing network) has_biasing = False for net_name, pins in nets.items(): # Check if this net connects to our transistor - if any(pin.get('component') == ref for pin in pins): + if any(pin.get("component") == ref for pin in pins): # Check if the net also connects to resistors - if any(pin.get('component', '').startswith('R') for pin in pins): + if any(pin.get("component", "").startswith("R") for pin in pins): has_biasing = True break - + if has_biasing: - amplifiers.append({ - "type": "transistor_amplifier", - "subtype": "BJT", - "component": ref, - "value": component.get('value', '') - }) - - elif 'FET' in component_lib or 'MOSFET' in component_lib or 'JFET' in component_lib: + amplifiers.append( + { + "type": "transistor_amplifier", + "subtype": "BJT", + "component": ref, + "value": component.get("value", ""), + } + ) + + elif "FET" in component_lib or "MOSFET" in component_lib or "JFET" in component_lib: # Similar check for FET amplifiers has_biasing = False for net_name, pins in nets.items(): - if any(pin.get('component') == ref for pin in pins): - if any(pin.get('component', '').startswith('R') for pin in pins): + if any(pin.get("component") == ref for pin in pins): + if any(pin.get("component", "").startswith("R") for pin in pins): has_biasing = True break - + if has_biasing: - amplifiers.append({ - "type": "transistor_amplifier", - "subtype": "FET", - "component": ref, - "value": component.get('value', '') - }) - + amplifiers.append( + { + "type": "transistor_amplifier", + "subtype": "FET", + "component": ref, + "value": component.get("value", ""), + } + ) + # Look for audio amplifier ICs audio_amp_patterns = [ r"LM386|LM383|LM380|LM1875|LM3886|TDA\d{4}|TPA\d{4}|SSM\d{4}|PAM\d{4}|TAS\d{4}" ] - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + for pattern in audio_amp_patterns: - if re.search(pattern, component_value, re.IGNORECASE) or re.search(pattern, component_lib, re.IGNORECASE): - amplifiers.append({ - "type": "audio_amplifier_ic", - "component": ref, - "value": component_value - }) - + if re.search(pattern, component_value, re.IGNORECASE) or re.search( + pattern, component_lib, re.IGNORECASE + ): + amplifiers.append( + {"type": "audio_amplifier_ic", "component": ref, "value": component_value} + ) + return amplifiers def identify_filters(components: Dict[str, Any], nets: Dict[str, Any]) -> List[Dict[str, Any]]: """Identify filter circuits in the schematic. - + Args: components: Dictionary of components from netlist nets: Dictionary of nets from netlist - + Returns: List of identified filter circuits """ filters = [] - + # Look for RC low-pass filters # These typically have a resistor followed by a capacitor to ground - resistor_refs = [ref for ref in components.keys() if ref.startswith('R')] - capacitor_refs = [ref for ref in components.keys() if ref.startswith('C')] - + resistor_refs = [ref for ref in components.keys() if ref.startswith("R")] + capacitor_refs = [ref for ref in components.keys() if ref.startswith("C")] + for r_ref in resistor_refs: r_nets = [] # Find which nets this resistor is connected to for net_name, pins in nets.items(): - if any(pin.get('component') == r_ref for pin in pins): + if any(pin.get("component") == r_ref for pin in pins): r_nets.append(net_name) - + # For each net, check if there's a capacitor connected to it for net_name in r_nets: # Find capacitors connected to this net connected_caps = [] for pin in nets.get(net_name, []): - comp = pin.get('component') - if comp and comp.startswith('C'): + comp = pin.get("component") + if comp and comp.startswith("C"): connected_caps.append(comp) - + if connected_caps: # Check if the other side of the capacitor goes to ground for c_ref in connected_caps: c_is_to_ground = False - for gnd_name in ['GND', 'AGND', 'DGND', 'VSS']: + for gnd_name in ["GND", "AGND", "DGND", "VSS"]: for pin in nets.get(gnd_name, []): - if pin.get('component') == c_ref: + if pin.get("component") == c_ref: c_is_to_ground = True break if c_is_to_ground: break - + if c_is_to_ground: - filters.append({ - "type": "passive_filter", - "subtype": "rc_low_pass", - "components": [r_ref, c_ref] - }) - + filters.append( + { + "type": "passive_filter", + "subtype": "rc_low_pass", + "components": [r_ref, c_ref], + } + ) + # Look for active filters (op-amp with feedback RC components) opamp_refs = [] - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - - if re.search(r"LM\d{3}|TL\d{3}|NE\d{3}|LF\d{3}|OP\d{2}|MCP\d{3}|AD\d{3}|LT\d{4}|OPA\d{3}", - component_value, re.IGNORECASE) or "OP_AMP" in component_lib: + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + + if ( + re.search( + r"LM\d{3}|TL\d{3}|NE\d{3}|LF\d{3}|OP\d{2}|MCP\d{3}|AD\d{3}|LT\d{4}|OPA\d{3}", + component_value, + re.IGNORECASE, + ) + or "OP_AMP" in component_lib + ): opamp_refs.append(ref) - + for op_ref in opamp_refs: # Find op-amp output # In a full implementation, we'd know which pin is the output # For simplicity, we'll look for feedback components has_feedback_r = False has_feedback_c = False - + for net_name, pins in nets.items(): # If this net connects to our op-amp - if any(pin.get('component') == op_ref for pin in pins): + if any(pin.get("component") == op_ref for pin in pins): # Check if it also connects to resistors and capacitors - connects_to_r = any(pin.get('component', '').startswith('R') for pin in pins) - connects_to_c = any(pin.get('component', '').startswith('C') for pin in pins) - + connects_to_r = any(pin.get("component", "").startswith("R") for pin in pins) + connects_to_c = any(pin.get("component", "").startswith("C") for pin in pins) + if connects_to_r: has_feedback_r = True if connects_to_c: has_feedback_c = True - + if has_feedback_r and has_feedback_c: - filters.append({ - "type": "active_filter", - "main_component": op_ref, - "value": components[op_ref].get('value', '') - }) - + filters.append( + { + "type": "active_filter", + "main_component": op_ref, + "value": components[op_ref].get("value", ""), + } + ) + # Look for crystal filters or ceramic filters for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - - if ref.startswith('Y') or ref.startswith('X') or "CRYSTAL" in component_lib or "XTAL" in component_lib: - filters.append({ - "type": "crystal_filter", - "component": ref, - "value": component_value - }) - - if "FILTER" in component_lib or "MURATA" in component_lib or "CERAMIC_FILTER" in component_lib: - filters.append({ - "type": "ceramic_filter", - "component": ref, - "value": component_value - }) - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + + if ( + ref.startswith("Y") + or ref.startswith("X") + or "CRYSTAL" in component_lib + or "XTAL" in component_lib + ): + filters.append({"type": "crystal_filter", "component": ref, "value": component_value}) + + if ( + "FILTER" in component_lib + or "MURATA" in component_lib + or "CERAMIC_FILTER" in component_lib + ): + filters.append({"type": "ceramic_filter", "component": ref, "value": component_value}) + return filters def identify_oscillators(components: Dict[str, Any], nets: Dict[str, Any]) -> List[Dict[str, Any]]: """Identify oscillator circuits in the schematic. - + Args: components: Dictionary of components from netlist nets: Dictionary of nets from netlist - + Returns: List of identified oscillator circuits """ oscillators = [] - + # Look for crystal oscillators for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + # Crystals - if ref.startswith('Y') or ref.startswith('X') or "CRYSTAL" in component_lib or "XTAL" in component_lib: + if ( + ref.startswith("Y") + or ref.startswith("X") + or "CRYSTAL" in component_lib + or "XTAL" in component_lib + ): # Check if the crystal has load capacitors has_load_caps = False crystal_nets = [] - + for net_name, pins in nets.items(): - if any(pin.get('component') == ref for pin in pins): + if any(pin.get("component") == ref for pin in pins): crystal_nets.append(net_name) - + # Look for capacitors connected to the crystal nets for net_name in crystal_nets: for pin in nets.get(net_name, []): - comp = pin.get('component') - if comp and comp.startswith('C'): + comp = pin.get("component") + if comp and comp.startswith("C"): has_load_caps = True break if has_load_caps: break - - oscillators.append({ - "type": "crystal_oscillator", - "component": ref, - "value": component_value, - "frequency": extract_frequency_from_value(component_value), - "has_load_capacitors": has_load_caps - }) - + + oscillators.append( + { + "type": "crystal_oscillator", + "component": ref, + "value": component_value, + "frequency": extract_frequency_from_value(component_value), + "has_load_capacitors": has_load_caps, + } + ) + # Oscillator ICs - if "OSC" in component_lib or "OSCILLATOR" in component_lib or re.search(r"OSC|OSCILLATOR", component_value, re.IGNORECASE): - oscillators.append({ - "type": "oscillator_ic", - "component": ref, - "value": component_value, - "frequency": extract_frequency_from_value(component_value) - }) - + if ( + "OSC" in component_lib + or "OSCILLATOR" in component_lib + or re.search(r"OSC|OSCILLATOR", component_value, re.IGNORECASE) + ): + oscillators.append( + { + "type": "oscillator_ic", + "component": ref, + "value": component_value, + "frequency": extract_frequency_from_value(component_value), + } + ) + # RC oscillators (555 timer, etc) - if re.search(r"NE555|LM555|ICM7555|TLC555", component_value, re.IGNORECASE) or "555" in component_lib: - oscillators.append({ - "type": "rc_oscillator", - "subtype": "555_timer", - "component": ref, - "value": component_value - }) - + if ( + re.search(r"NE555|LM555|ICM7555|TLC555", component_value, re.IGNORECASE) + or "555" in component_lib + ): + oscillators.append( + { + "type": "rc_oscillator", + "subtype": "555_timer", + "component": ref, + "value": component_value, + } + ) + return oscillators -def identify_digital_interfaces(components: Dict[str, Any], nets: Dict[str, Any]) -> List[Dict[str, Any]]: +def identify_digital_interfaces( + components: Dict[str, Any], nets: Dict[str, Any] +) -> List[Dict[str, Any]]: """Identify digital interface circuits in the schematic. - + Args: components: Dictionary of components from netlist nets: Dictionary of nets from netlist - + Returns: List of identified digital interface circuits """ interfaces = [] - + # I2C interface detection i2c_signals = {"SCL", "SDA", "I2C_SCL", "I2C_SDA"} has_i2c = False - + for net_name in nets.keys(): if any(signal in net_name.upper() for signal in i2c_signals): has_i2c = True break - + if has_i2c: - interfaces.append({ - "type": "i2c_interface", - "signals_found": [net for net in nets.keys() if any(signal in net.upper() for signal in i2c_signals)] - }) - + interfaces.append( + { + "type": "i2c_interface", + "signals_found": [ + net + for net in nets.keys() + if any(signal in net.upper() for signal in i2c_signals) + ], + } + ) + # SPI interface detection spi_signals = {"MOSI", "MISO", "SCK", "SS", "SPI_MOSI", "SPI_MISO", "SPI_SCK", "SPI_CS"} has_spi = False - + for net_name in nets.keys(): if any(signal in net_name.upper() for signal in spi_signals): has_spi = True break - + if has_spi: - interfaces.append({ - "type": "spi_interface", - "signals_found": [net for net in nets.keys() if any(signal in net.upper() for signal in spi_signals)] - }) - + interfaces.append( + { + "type": "spi_interface", + "signals_found": [ + net + for net in nets.keys() + if any(signal in net.upper() for signal in spi_signals) + ], + } + ) + # UART interface detection uart_signals = {"TX", "RX", "TXD", "RXD", "UART_TX", "UART_RX"} has_uart = False - + for net_name in nets.keys(): if any(signal in net_name.upper() for signal in uart_signals): has_uart = True break - + if has_uart: - interfaces.append({ - "type": "uart_interface", - "signals_found": [net for net in nets.keys() if any(signal in net.upper() for signal in uart_signals)] - }) - + interfaces.append( + { + "type": "uart_interface", + "signals_found": [ + net + for net in nets.keys() + if any(signal in net.upper() for signal in uart_signals) + ], + } + ) + # USB interface detection usb_signals = {"USB_D+", "USB_D-", "USB_DP", "USB_DM", "D+", "D-", "DP", "DM", "VBUS"} has_usb = False - + for net_name in nets.keys(): if any(signal in net_name.upper() for signal in usb_signals): has_usb = True break - + # Also check for USB interface ICs for ref, component in components.items(): - component_value = component.get('value', '').upper() + component_value = component.get("value", "").upper() if re.search(r"FT232|CH340|CP210|MCP2200|TUSB|FT231|FT201", component_value, re.IGNORECASE): has_usb = True break - + if has_usb: - interfaces.append({ - "type": "usb_interface", - "signals_found": [net for net in nets.keys() if any(signal in net.upper() for signal in usb_signals)] - }) - + interfaces.append( + { + "type": "usb_interface", + "signals_found": [ + net + for net in nets.keys() + if any(signal in net.upper() for signal in usb_signals) + ], + } + ) + # Ethernet interface detection ethernet_signals = {"TX+", "TX-", "RX+", "RX-", "MDI", "MDIO", "ETH"} has_ethernet = False - + for net_name in nets.keys(): if any(signal in net_name.upper() for signal in ethernet_signals): has_ethernet = True break - + # Also check for Ethernet PHY ICs for ref, component in components.items(): - component_value = component.get('value', '').upper() + component_value = component.get("value", "").upper() if re.search(r"W5500|ENC28J60|LAN87|KSZ80|DP83|RTL8|AX88", component_value, re.IGNORECASE): has_ethernet = True break - + if has_ethernet: - interfaces.append({ - "type": "ethernet_interface", - "signals_found": [net for net in nets.keys() if any(signal in net.upper() for signal in ethernet_signals)] - }) - + interfaces.append( + { + "type": "ethernet_interface", + "signals_found": [ + net + for net in nets.keys() + if any(signal in net.upper() for signal in ethernet_signals) + ], + } + ) + return interfaces -def identify_sensor_interfaces(components: Dict[str, Any], nets: Dict[str, Any]) -> List[Dict[str, Any]]: +def identify_sensor_interfaces( + components: Dict[str, Any], nets: Dict[str, Any] +) -> List[Dict[str, Any]]: """Identify sensor interface circuits in the schematic. - + Args: components: Dictionary of components from netlist nets: Dictionary of nets from netlist - + Returns: List of identified sensor interface circuits """ sensor_interfaces = [] - + # Common sensor IC patterns sensor_patterns = { "temperature": r"LM35|DS18B20|DHT11|DHT22|BME280|BMP280|TMP\d+|MCP9808|MAX31855|MAX6675|SI7021|HTU21|SHT[0123]\d|PCT2075", @@ -503,208 +604,252 @@ def identify_sensor_interfaces(components: Dict[str, Any], nets: Dict[str, Any]) "current": r"ACS\d+|INA\d+|MAX\d+|ZXCT\d+", "voltage": r"INA\d+|MCP\d+|ADS\d+", "ADC": r"ADS\d+|MCP33\d+|MCP32\d+|LTC\d+|NAU7802|HX711", - "GPS": r"NEO-[67]M|L80|MTK\d+|SIM\d+|SAM-M8Q|MAX-M8" + "GPS": r"NEO-[67]M|L80|MTK\d+|SIM\d+|SAM-M8Q|MAX-M8", } - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + for sensor_type, pattern in sensor_patterns.items(): - if re.search(pattern, component_value, re.IGNORECASE) or re.search(pattern, component_lib, re.IGNORECASE): + if re.search(pattern, component_value, re.IGNORECASE) or re.search( + pattern, component_lib, re.IGNORECASE + ): # Identify specific sensors - + # Temperature sensors if sensor_type == "temperature": if re.search(r"DS18B20", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "temperature_sensor", - "model": "DS18B20", - "component": ref, - "interface": "1-Wire", - "range": "-55°C to +125°C" - }) + sensor_interfaces.append( + { + "type": "temperature_sensor", + "model": "DS18B20", + "component": ref, + "interface": "1-Wire", + "range": "-55°C to +125°C", + } + ) elif re.search(r"BME280|BMP280", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "multi_sensor", - "model": component_value, - "component": ref, - "measures": ["temperature", "pressure", "humidity" if "BME" in component_value else "pressure"], - "interface": "I2C/SPI" - }) + sensor_interfaces.append( + { + "type": "multi_sensor", + "model": component_value, + "component": ref, + "measures": [ + "temperature", + "pressure", + "humidity" if "BME" in component_value else "pressure", + ], + "interface": "I2C/SPI", + } + ) elif re.search(r"LM35", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "temperature_sensor", - "model": "LM35", - "component": ref, - "interface": "Analog", - "range": "0°C to +100°C" - }) + sensor_interfaces.append( + { + "type": "temperature_sensor", + "model": "LM35", + "component": ref, + "interface": "Analog", + "range": "0°C to +100°C", + } + ) else: - sensor_interfaces.append({ - "type": "temperature_sensor", - "model": component_value, - "component": ref - }) - + sensor_interfaces.append( + { + "type": "temperature_sensor", + "model": component_value, + "component": ref, + } + ) + # Motion sensors (accelerometer, gyroscope, etc.) elif sensor_type in ["accelerometer", "gyroscope"]: if re.search(r"MPU6050", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "motion_sensor", - "model": "MPU6050", - "component": ref, - "measures": ["accelerometer", "gyroscope"], - "interface": "I2C" - }) + sensor_interfaces.append( + { + "type": "motion_sensor", + "model": "MPU6050", + "component": ref, + "measures": ["accelerometer", "gyroscope"], + "interface": "I2C", + } + ) elif re.search(r"MPU9250", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "motion_sensor", - "model": "MPU9250", - "component": ref, - "measures": ["accelerometer", "gyroscope", "magnetometer"], - "interface": "I2C/SPI" - }) + sensor_interfaces.append( + { + "type": "motion_sensor", + "model": "MPU9250", + "component": ref, + "measures": ["accelerometer", "gyroscope", "magnetometer"], + "interface": "I2C/SPI", + } + ) elif re.search(r"LSM6DS3", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "motion_sensor", - "model": "LSM6DS3", - "component": ref, - "measures": ["accelerometer", "gyroscope"], - "interface": "I2C/SPI" - }) + sensor_interfaces.append( + { + "type": "motion_sensor", + "model": "LSM6DS3", + "component": ref, + "measures": ["accelerometer", "gyroscope"], + "interface": "I2C/SPI", + } + ) else: - sensor_interfaces.append({ - "type": "motion_sensor", - "model": component_value, - "component": ref, - "measures": [sensor_type] - }) - + sensor_interfaces.append( + { + "type": "motion_sensor", + "model": component_value, + "component": ref, + "measures": [sensor_type], + } + ) + # Light and proximity sensors elif sensor_type in ["light", "proximity"]: if re.search(r"APDS9960", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "optical_sensor", - "model": "APDS9960", - "component": ref, - "measures": ["proximity", "light", "gesture", "color"], - "interface": "I2C" - }) + sensor_interfaces.append( + { + "type": "optical_sensor", + "model": "APDS9960", + "component": ref, + "measures": ["proximity", "light", "gesture", "color"], + "interface": "I2C", + } + ) elif re.search(r"VL53L0X", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "optical_sensor", - "model": "VL53L0X", - "component": ref, - "measures": ["time-of-flight distance"], - "interface": "I2C", - "range": "Up to 2m" - }) + sensor_interfaces.append( + { + "type": "optical_sensor", + "model": "VL53L0X", + "component": ref, + "measures": ["time-of-flight distance"], + "interface": "I2C", + "range": "Up to 2m", + } + ) elif re.search(r"BH1750", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "optical_sensor", - "model": "BH1750", - "component": ref, - "measures": ["ambient light"], - "interface": "I2C" - }) + sensor_interfaces.append( + { + "type": "optical_sensor", + "model": "BH1750", + "component": ref, + "measures": ["ambient light"], + "interface": "I2C", + } + ) else: - sensor_interfaces.append({ - "type": "optical_sensor", - "model": component_value, - "component": ref, - "measures": [sensor_type] - }) - + sensor_interfaces.append( + { + "type": "optical_sensor", + "model": component_value, + "component": ref, + "measures": [sensor_type], + } + ) + # ADCs (often used for sensor interfaces) elif sensor_type == "ADC": if re.search(r"ADS1115", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "analog_interface", - "model": "ADS1115", - "component": ref, - "resolution": "16-bit", - "channels": 4, - "interface": "I2C" - }) + sensor_interfaces.append( + { + "type": "analog_interface", + "model": "ADS1115", + "component": ref, + "resolution": "16-bit", + "channels": 4, + "interface": "I2C", + } + ) elif re.search(r"HX711", component_value, re.IGNORECASE): - sensor_interfaces.append({ - "type": "analog_interface", - "model": "HX711", - "component": ref, - "resolution": "24-bit", - "common_usage": "Load cell/strain gauge", - "interface": "Digital" - }) + sensor_interfaces.append( + { + "type": "analog_interface", + "model": "HX711", + "component": ref, + "resolution": "24-bit", + "common_usage": "Load cell/strain gauge", + "interface": "Digital", + } + ) else: - sensor_interfaces.append({ - "type": "analog_interface", - "model": component_value, - "component": ref - }) - + sensor_interfaces.append( + {"type": "analog_interface", "model": component_value, "component": ref} + ) + # Other types of sensors else: - sensor_interfaces.append({ - "type": f"{sensor_type}_sensor", - "model": component_value, - "component": ref - }) - + sensor_interfaces.append( + { + "type": f"{sensor_type}_sensor", + "model": component_value, + "component": ref, + } + ) + # Once identified a component as a specific sensor, no need to check other types break - + # Look for common analog sensors # These often don't have specific ICs but have designators like "RT" for thermistors - thermistor_refs = [ref for ref in components.keys() if ref.startswith('RT') or ref.startswith('TH')] + thermistor_refs = [ + ref for ref in components.keys() if ref.startswith("RT") or ref.startswith("TH") + ] for ref in thermistor_refs: component = components[ref] - sensor_interfaces.append({ - "type": "temperature_sensor", - "subtype": "thermistor", - "component": ref, - "value": component.get('value', ''), - "interface": "Analog" - }) - + sensor_interfaces.append( + { + "type": "temperature_sensor", + "subtype": "thermistor", + "component": ref, + "value": component.get("value", ""), + "interface": "Analog", + } + ) + # Look for photodiodes, photoresistors (LDRs) - photosensor_refs = [ref for ref in components.keys() if ref.startswith('PD') or ref.startswith('LDR')] + photosensor_refs = [ + ref for ref in components.keys() if ref.startswith("PD") or ref.startswith("LDR") + ] for ref in photosensor_refs: component = components[ref] - sensor_interfaces.append({ - "type": "optical_sensor", - "subtype": "photosensor", - "component": ref, - "value": component.get('value', ''), - "interface": "Analog" - }) - + sensor_interfaces.append( + { + "type": "optical_sensor", + "subtype": "photosensor", + "component": ref, + "value": component.get("value", ""), + "interface": "Analog", + } + ) + # Look for potentiometers (often used for manual sensing/control) - pot_refs = [ref for ref in components.keys() if ref.startswith('RV') or ref.startswith('POT')] + pot_refs = [ref for ref in components.keys() if ref.startswith("RV") or ref.startswith("POT")] for ref in pot_refs: component = components[ref] - sensor_interfaces.append({ - "type": "position_sensor", - "subtype": "potentiometer", - "component": ref, - "value": component.get('value', ''), - "interface": "Analog" - }) - + sensor_interfaces.append( + { + "type": "position_sensor", + "subtype": "potentiometer", + "component": ref, + "value": component.get("value", ""), + "interface": "Analog", + } + ) + return sensor_interfaces def identify_microcontrollers(components: Dict[str, Any]) -> List[Dict[str, Any]]: """Identify microcontroller circuits in the schematic. - + Args: components: Dictionary of components from netlist - + Returns: List of identified microcontroller circuits """ microcontrollers = [] - + # Common microcontroller families mcu_patterns = { "AVR": r"ATMEGA\d+|ATTINY\d+|AT90\w+", @@ -717,143 +862,167 @@ def identify_microcontrollers(components: Dict[str, Any]) -> List[Dict[str, Any] "NXP": r"LPC\d+|IMXRT\d+|MK\d+", "SAM": r"SAMD\d+|SAM\w+", "ARM Cortex": r"CORTEX|ARM", - "8051": r"8051|AT89" + "8051": r"8051|AT89", } - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + for family, pattern in mcu_patterns.items(): - if re.search(pattern, component_value, re.IGNORECASE) or re.search(pattern, component_lib, re.IGNORECASE): + if re.search(pattern, component_value, re.IGNORECASE) or re.search( + pattern, component_lib, re.IGNORECASE + ): # Identify specific models identified = False - + # ATmega328P (Arduino Uno/Nano) if re.search(r"ATMEGA328P|ATMEGA328", component_value, re.IGNORECASE): - microcontrollers.append({ - "type": "microcontroller", - "family": "AVR", - "model": "ATmega328P", - "component": ref, - "common_usage": "Arduino Uno/Nano compatible" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "AVR", + "model": "ATmega328P", + "component": ref, + "common_usage": "Arduino Uno/Nano compatible", + } + ) identified = True - + # ATmega32U4 (Arduino Leonardo/Micro) elif re.search(r"ATMEGA32U4", component_value, re.IGNORECASE): - microcontrollers.append({ - "type": "microcontroller", - "family": "AVR", - "model": "ATmega32U4", - "component": ref, - "common_usage": "Arduino Leonardo/Micro compatible" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "AVR", + "model": "ATmega32U4", + "component": ref, + "common_usage": "Arduino Leonardo/Micro compatible", + } + ) identified = True - + # ESP32 elif re.search(r"ESP32", component_value, re.IGNORECASE): - microcontrollers.append({ - "type": "microcontroller", - "family": "ESP", - "model": "ESP32", - "component": ref, - "features": "Wi-Fi & Bluetooth" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "ESP", + "model": "ESP32", + "component": ref, + "features": "Wi-Fi & Bluetooth", + } + ) identified = True - + # ESP8266 elif re.search(r"ESP8266", component_value, re.IGNORECASE): - microcontrollers.append({ - "type": "microcontroller", - "family": "ESP", - "model": "ESP8266", - "component": ref, - "features": "Wi-Fi" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "ESP", + "model": "ESP8266", + "component": ref, + "features": "Wi-Fi", + } + ) identified = True - + # STM32 series elif re.search(r"STM32F\d+", component_value, re.IGNORECASE): model = re.search(r"(STM32F\d+)", component_value, re.IGNORECASE).group(1) - microcontrollers.append({ - "type": "microcontroller", - "family": "STM32", - "model": model.upper(), - "component": ref, - "features": "ARM Cortex-M" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "STM32", + "model": model.upper(), + "component": ref, + "features": "ARM Cortex-M", + } + ) identified = True - + # Raspberry Pi Pico (RP2040) elif re.search(r"RP2040|PICO", component_value, re.IGNORECASE): - microcontrollers.append({ - "type": "microcontroller", - "family": "RP2040", - "model": "RP2040", - "component": ref, - "common_usage": "Raspberry Pi Pico" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "RP2040", + "model": "RP2040", + "component": ref, + "common_usage": "Raspberry Pi Pico", + } + ) identified = True - + # PIC microcontrollers elif re.search(r"PIC\d+", component_value, re.IGNORECASE): model = re.search(r"(PIC\d+\w+)", component_value, re.IGNORECASE) if model: - microcontrollers.append({ - "type": "microcontroller", - "family": "PIC", - "model": model.group(1).upper(), - "component": ref - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "PIC", + "model": model.group(1).upper(), + "component": ref, + } + ) identified = True - + # MSP430 series elif re.search(r"MSP430\w+", component_value, re.IGNORECASE): model = re.search(r"(MSP430\w+)", component_value, re.IGNORECASE) if model: - microcontrollers.append({ - "type": "microcontroller", - "family": "MSP430", - "model": model.group(1).upper(), - "component": ref, - "features": "Ultra-low power" - }) + microcontrollers.append( + { + "type": "microcontroller", + "family": "MSP430", + "model": model.group(1).upper(), + "component": ref, + "features": "Ultra-low power", + } + ) identified = True - + # If not identified specifically but matches a family if not identified: - microcontrollers.append({ - "type": "microcontroller", - "family": family, - "component": ref, - "value": component_value - }) - + microcontrollers.append( + { + "type": "microcontroller", + "family": family, + "component": ref, + "value": component_value, + } + ) + # Once identified a component as a microcontroller, no need to check other families break - + # Look for microcontroller development boards dev_board_patterns = { "Arduino": r"ARDUINO|UNO|NANO|MEGA|LEONARDO|DUE", "ESP32 Dev Board": r"ESP32-DEVKIT|NODEMCU-32S|ESP-WROOM-32", "ESP8266 Dev Board": r"NODEMCU|WEMOS|D1_MINI|ESP-01", "STM32 Dev Board": r"NUCLEO|DISCOVERY|BLUEPILL", - "Raspberry Pi": r"RASPBERRY|RPI|RPICO|PICO" + "Raspberry Pi": r"RASPBERRY|RPI|RPICO|PICO", } - + for ref, component in components.items(): - component_value = component.get('value', '').upper() - component_lib = component.get('lib_id', '').upper() - + component_value = component.get("value", "").upper() + component_lib = component.get("lib_id", "").upper() + for board_type, pattern in dev_board_patterns.items(): - if re.search(pattern, component_value, re.IGNORECASE) or re.search(pattern, component_lib, re.IGNORECASE): - microcontrollers.append({ - "type": "development_board", - "board_type": board_type, - "component": ref, - "value": component_value - }) + if re.search(pattern, component_value, re.IGNORECASE) or re.search( + pattern, component_lib, re.IGNORECASE + ): + microcontrollers.append( + { + "type": "development_board", + "board_type": board_type, + "component": ref, + "value": component_value, + } + ) break - + return microcontrollers From 25139d2060ecf0efa0d024f491c8201913eebe52 Mon Sep 17 00:00:00 2001 From: Neil-TC Date: Wed, 8 Oct 2025 17:24:48 +0800 Subject: [PATCH 5/5] Add files via upload