diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index fa1cfdb7e9b..722415073f1 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -331,3 +331,5 @@ /src/carbon/ @itiinani /src/amlfs/ @Aman-Jain-14 @amajai @mawhite @brpanask @tibanyas + +/src/azext_mcp/ @ReaNAiveD diff --git a/src/mcp-server/HISTORY.rst b/src/mcp-server/HISTORY.rst new file mode 100644 index 00000000000..abbff5a61a7 --- /dev/null +++ b/src/mcp-server/HISTORY.rst @@ -0,0 +1,8 @@ +.. :changelog: + +Release History +=============== + +1.0.0b1 +++++++ +* Initial release. \ No newline at end of file diff --git a/src/mcp-server/README.rst b/src/mcp-server/README.rst new file mode 100644 index 00000000000..088b34ea380 --- /dev/null +++ b/src/mcp-server/README.rst @@ -0,0 +1,5 @@ +Microsoft Azure CLI 'mcp' Extension +========================================== + +This package is for the 'mcp' extension. +i.e. 'az mcp' \ No newline at end of file diff --git a/src/mcp-server/azext_mcp/__init__.py b/src/mcp-server/azext_mcp/__init__.py new file mode 100644 index 00000000000..2cf748941ac --- /dev/null +++ b/src/mcp-server/azext_mcp/__init__.py @@ -0,0 +1,28 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from azure.cli.core import AzCommandsLoader + +from azext_mcp._help import helps # pylint: disable=unused-import + + +class McpCommandsLoader(AzCommandsLoader): + + def __init__(self, cli_ctx=None): + from azure.cli.core.commands import CliCommandType + mcp_custom = CliCommandType(operations_tmpl='azext_mcp.custom#{}') + super(McpCommandsLoader, self).__init__(cli_ctx=cli_ctx, custom_command_type=mcp_custom) + + def load_command_table(self, args): + from azext_mcp.commands import load_command_table + load_command_table(self, args) + return self.command_table + + def load_arguments(self, command): + from azext_mcp._params import load_arguments + load_arguments(self, command) + + +COMMAND_LOADER_CLS = McpCommandsLoader diff --git a/src/mcp-server/azext_mcp/_help.py b/src/mcp-server/azext_mcp/_help.py new file mode 100644 index 00000000000..5eea349ad57 --- /dev/null +++ b/src/mcp-server/azext_mcp/_help.py @@ -0,0 +1,18 @@ +# coding=utf-8 +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from knack.help_files import helps # pylint: disable=unused-import + + +helps['mcp'] = """ + type: group + short-summary: CLI as local MCP servers. +""" + +helps['mcp up'] = """ + type: command + short-summary: local MCP server up. +""" diff --git a/src/mcp-server/azext_mcp/_params.py b/src/mcp-server/azext_mcp/_params.py new file mode 100644 index 00000000000..30eba268d65 --- /dev/null +++ b/src/mcp-server/azext_mcp/_params.py @@ -0,0 +1,20 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +# pylint: disable=line-too-long + +from knack.arguments import CLIArgumentType + + +def load_arguments(self, _): + + with self.argument_context('mcp') as c: + pass + + with self.argument_context('mcp up') as c: + # c.argument('port', required=False, default=8080, type=int, help='MCP server port.') + c.argument('disable_elicit', action='store_true', + help='Disable elicit confirmation for destructive commands. ' + 'Use with caution as it may lead to unintended actions.') + pass diff --git a/src/mcp-server/azext_mcp/azext_metadata.json b/src/mcp-server/azext_mcp/azext_metadata.json new file mode 100644 index 00000000000..916deb3b5c2 --- /dev/null +++ b/src/mcp-server/azext_mcp/azext_metadata.json @@ -0,0 +1,4 @@ +{ + "azext.isPreview": true, + "azext.minCliCoreVersion": "2.57.0" +} \ No newline at end of file diff --git a/src/mcp-server/azext_mcp/command_introspection.py b/src/mcp-server/azext_mcp/command_introspection.py new file mode 100644 index 00000000000..2c765e3f939 --- /dev/null +++ b/src/mcp-server/azext_mcp/command_introspection.py @@ -0,0 +1,583 @@ +import ast +from importlib import import_module +import inspect +from pathlib import Path +import re +import textwrap + +from azure.cli.core.aaz import AAZArgumentsSchema, AAZObjectType, AAZDictType, AAZListType, AAZIntType, AAZStrType, AAZBoolType, AAZFloatType, AAZAnyType, AAZIntArg, AAZStrArg, AAZBoolArg, AAZFloatArg, AAZBaseArg +from azure.cli.core.aaz._base import AAZBaseType, _AAZUndefinedType +from knack import log +import yaml + +logger = log.get_logger(__name__) + +# Constants moved from AzCLIBridge class +STORED_DEPRECATION_KEY = ["expiration", "target", "redirect", "hide"] +IMPORT_AAZ_EXPRESS = re.compile(r'^\s*from (.*\.)?aaz(\..*)? .*$') +COMMAND_ARGS_EXPRESS = re.compile(r'^.*[\s\(]command_args=.*$') + + +def get_command_help_info(name: str, command=None): + """Get help information for a command. + + Args: + name: Command name + command: Command object (optional) + + Returns: + dict: Help information with at least 'short-summary' key + """ + if command and hasattr(command, 'AZ_HELP') and command.AZ_HELP: + return command.AZ_HELP + elif command and hasattr(command, 'help') and command.help: + return command.help + from knack.help_files import helps + if name in helps: + return yaml.safe_load(helps[name]) + else: + return {"short-summary": "No help available for this command"} + + +def get_command_codegen_info(command): + """Get code generation information for a command. + + Args: + command: Command object to analyze + + Returns: + dict or None: Code generation info with 'version' and 'type' keys, or None + """ + from azure.cli.core.commands import AzCliCommand + from azure.cli.core.aaz import AAZCommand + if isinstance(command, AAZCommand): + return { + "version": "v2", + "type": "Atomic" + } + + if isinstance(command, AzCliCommand): + if 'command_operation' not in command.command_kwargs: + return None + + command_operation = command.command_kwargs['command_operation'] + is_v2_convenience = False + is_generated = False + if getattr(command_operation, 'op_path', None): + operation_path = command_operation.op_path + operation_module_path = operation_path.split("#")[0] + op = command_operation.get_op_handler(operation_path) + func_map = get_module_functions(operation_module_path) + op_source = expand_all_functions(op, func_map) + for line in op_source.splitlines(): + if IMPORT_AAZ_EXPRESS.match(line): + is_v2_convenience = True + break + if COMMAND_ARGS_EXPRESS.match(line): + is_v2_convenience = True + + path_parts = list(Path(inspect.getfile(op)).parts) + if "generated" in path_parts: + is_generated = True + + if not is_v2_convenience and getattr(command_operation, 'getter_op_path', None): + op = command_operation.get_op_handler(command_operation.getter_op_path) + op_source = inspect.getsource(op) + for line in op_source.splitlines(): + if IMPORT_AAZ_EXPRESS.match(line): + is_v2_convenience = True + break + if COMMAND_ARGS_EXPRESS.match(line): + is_v2_convenience = True + + path_parts = list(Path(inspect.getfile(op)).parts) + if "generated" in path_parts: + is_generated = True + + if not is_v2_convenience and getattr(command_operation, 'setter_op_path', None): + op = command_operation.get_op_handler(command_operation.setter_op_path) + op_source = inspect.getsource(op) + for line in op_source.splitlines(): + if IMPORT_AAZ_EXPRESS.match(line): + is_v2_convenience = True + break + if COMMAND_ARGS_EXPRESS.match(line): + is_v2_convenience = True + + path_parts = list(Path(inspect.getfile(op)).parts) + if "generated" in path_parts: + is_generated = True + + if not is_v2_convenience and getattr(command_operation, 'custom_function_op_path', None): + op = command_operation.get_op_handler(command_operation.custom_function_op_path) + op_source = inspect.getsource(op) + for line in op_source.splitlines(): + if IMPORT_AAZ_EXPRESS.match(line): + is_v2_convenience = True + break + if COMMAND_ARGS_EXPRESS.match(line): + is_v2_convenience = True + + path_parts = list(Path(inspect.getfile(op)).parts) + if "generated" in path_parts: + is_generated = True + if is_v2_convenience: + return { + "version": "v2", + "type": "Convenience" + } + elif is_generated: + return { + "version": "v1", + "type": "SDK" + } + return None + + +def extract_argument_deprecation_info(argument_settings): + """Extract deprecation information from argument settings. + + Returns: + dict or None: Deprecation info if present, None otherwise + """ + if argument_settings.get("deprecate_info", None) is None: + return None + + deprecate_info = {} + for info_key in STORED_DEPRECATION_KEY: + if hasattr(argument_settings["deprecate_info"], info_key) and \ + getattr(argument_settings["deprecate_info"], info_key): + deprecate_info[info_key] = getattr(argument_settings["deprecate_info"], info_key) + + return deprecate_info if deprecate_info else None + + +def extract_argument_options(argument_settings): + """Extract options list from argument settings. + + Returns: + list: Sorted list of argument options + """ + if not argument_settings.get("options_list", None): + return [] + + raw_options_list = argument_settings["options_list"] + option_list = set() + for opt in raw_options_list: + opt_type = opt.__class__.__name__ + if opt_type == "str": + option_list.add(opt) + elif opt_type == "Deprecated": + if hasattr(opt, "hide") and opt.hide: + continue + if hasattr(opt, "target"): + option_list.add(opt.target) + else: + logger.warning("Unsupported option type: %s", opt_type) + + return sorted(option_list) + + +def extract_argument_options_deprecation(argument_settings): + """Extract deprecation information for argument options. + + Returns: + list: List of option deprecation info dictionaries + """ + if not argument_settings.get("options_list", None): + return [] + + raw_options_list = argument_settings["options_list"] + option_deprecation_list = [] + for opt in raw_options_list: + opt_type = opt.__class__.__name__ + if opt_type != "Deprecated": + continue + opt_deprecation = {} + for info_key in STORED_DEPRECATION_KEY: + if hasattr(opt, info_key) and getattr(opt, info_key): + opt_deprecation[info_key] = getattr(opt, info_key) + if opt_deprecation: + option_deprecation_list.append(opt_deprecation) + + return option_deprecation_list + + +def extract_argument_type(argument): + """Extract type information from argument settings. + + Returns: + str or None: Type name if present, None otherwise + """ + argument_settings = argument.type.settings + if not argument_settings.get("type", None): + return None + + configured_type = argument_settings["type"] + raw_type = None + if hasattr(configured_type, "__name__"): + raw_type = configured_type.__name__ + elif hasattr(configured_type, "__class__"): + raw_type = configured_type.__class__.__name__ + else: + logger.warning("Unsupported type: %s", configured_type) + return None + + return raw_type if raw_type in ["str", "int", "float", "bool", "file_type"] else "custom_type" + + +def get_module_functions(path): + """Get all functions from a module by import path. + + Args: + path: Module import path (e.g., 'azure.cli.command.module') + + Returns: + dict or None: Dictionary mapping function names to function objects, or None + """ + try: + module = import_module(path) + functions = inspect.getmembers(module, predicate=inspect.isfunction) + return dict(functions) + + except ModuleNotFoundError: + return None # bypass functions in sdk + + +def expand_all_functions(func, func_map): + """Expand function source code by including all called functions. + + Args: + func: The function to expand + func_map: Dictionary mapping function names to function objects + + Returns: + str: Expanded source code + """ + source = "" + try: + source = textwrap.dedent(inspect.getsource(func)) + except (OSError, TypeError) as e: + # https://docs.python.org/3/library/inspect.html#inspect.getsource + logger.warning("Cannot retrieve the source code of %s: %s", func, e) + + if func_map is None: + return source + + tree = ast.parse(source) + for node in ast.walk(tree): + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): + function_name = node.func.id + function = func_map.get(function_name, None) + # skip recursion and `locals()` + if function_name == func.__name__ or function is None: + continue + + source += expand_all_functions(function, func_map) + + return source + + +def build_argument_info(argument): + """Build complete argument information from argument settings. + + Args: + argument: Raw argument object with type and settings + + Returns: + dict: Complete argument information dictionary + """ + argument_settings = argument.type.settings + arg_info = {} + + # Extract options + options = extract_argument_options(argument_settings) + if options: + arg_info["options"] = options + + # Extract type + arg_type = extract_argument_type(argument) + if arg_type: + arg_info["type"] = arg_type + + # Extract deprecation info + deprecation_info = extract_argument_deprecation_info(argument_settings) + if deprecation_info: + arg_info["deprecate_info"] = deprecation_info + + # Extract options deprecation + options_deprecation = extract_argument_options_deprecation(argument_settings) + if options_deprecation: + arg_info["options_deprecate_info"] = options_deprecation + + return arg_info + + +def build_command_help_info(name: str, command=None): + """Build comprehensive help information for a command. + + Args: + name: Command name + command: Command object (optional) + command_table: Command table for resolving subcommands (optional) + + Returns: + dict: Complete command help information including arguments + """ + if command is None: + return {"short-summary": "Command not found"} + + help_info = get_command_help_info(name, command) + help_info['arguments'] = {} + + # Add code generation info + codegen_info = get_command_codegen_info(command) + if codegen_info: + help_info['codegen_info'] = codegen_info + + # # Add arguments schema if available + # if hasattr(command, '_args_schema'): + # help_info['arguments_schema'] = command._args_schema + + # Process command arguments + for arg_name, argument in command.arguments.items(): + if argument.type is None: + continue + settings = argument.type.settings + # Skip ignore actions + if settings.get("action", None): + action = settings["action"] + if hasattr(action, "__name__") and action.__name__ == "IgnoreAction": + continue + + # Build argument info using new extract functions + arg_info = build_argument_info(argument) + arg_info["name"] = settings["dest"] + + # Add additional argument properties + if settings.get("required", False): + arg_info["required"] = True + if settings.get("choices", None): + arg_info["choices"] = sorted(list(settings["choices"])) + if settings.get("id_part", None): + arg_info["id_part"] = settings["id_part"] + if settings.get("nargs", None): + arg_info["nargs"] = settings["nargs"] + if settings.get("completer", None): + arg_info["has_completer"] = True + if settings.get("default", None) is not None: + if not isinstance(settings["default"], (float, int, str, list, bool)): + arg_info["default"] = str(settings["default"]) + else: + arg_info["default"] = settings["default"] + + arg_info["desc"] = settings.get("help", "") + help_info['arguments'][arg_name] = arg_info + + return help_info + + +def build_command_group_help_info(name: str, command_group=None, command_table=None, group_table=None): + """Build comprehensive help information for a command group. + + Args: + name: Command group name + command_group: Command group object (optional) + command_table: Command table for resolving subcommands (optional) + group_table: Group table for resolving subgroups (optional) + + Returns: + dict: Complete command group help information including subcommands and subgroups + """ + if command_group is None: + return {"short-summary": "Command group not found"} + + help_info = get_command_help_info(name, command_group) + + # Add subcommands if command_table is provided + if command_table: + help_info['subcommands'] = { + cmd_name: get_command_help_info(cmd_name, cmd) + for cmd_name, cmd in command_table.items() + if cmd_name.startswith(name + ' ') + } + + # Add subgroups if group_table is provided + if group_table: + help_info['subgroups'] = { + group_name: get_command_help_info(group_name, group) + for group_name, group in group_table.items() + if group_name.startswith(name + ' ') + } + + return help_info + + +def handle_aaz_type(aaz_type: AAZBaseType): + """Convert AAZ type to JSON schema representation. + + Args: + aaz_type: AAZ type to convert + + Returns: + dict: JSON schema representation of the type + """ + # Get the base schema first + schema = _get_base_schema_for_type(aaz_type) + + # Handle nullable types by adding "null" to the type array + if hasattr(aaz_type, '_nullable') and aaz_type._nullable: + if "type" in schema: + # Convert single type to array if needed + if isinstance(schema["type"], str): + schema["type"] = [schema["type"], "null"] + elif isinstance(schema["type"], list) and "null" not in schema["type"]: + schema["type"].append("null") + else: + # For complex schemas without a simple type (like empty schema for any type) + # we fall back to oneOf approach + return { + "oneOf": [ + schema, + {"type": "null"} + ] + } + + return schema + + +def _get_base_schema_for_type(aaz_type: AAZBaseType): + """Get the base JSON schema for an AAZ type without nullable handling. + + Args: + aaz_type: AAZ type to convert + + Returns: + dict: Base JSON schema representation + """ + if isinstance(aaz_type, AAZObjectType): + schema = { + "type": "object", + "properties": {} + } + # Process each field in the object + for field_name, field_type in aaz_type._fields.items(): + field_schema = handle_aaz_type(field_type) + + # Add description from various possible sources + description = None + if hasattr(field_type, '_help') and field_type._help: + if isinstance(field_type._help, dict): + description = field_type._help.get('short-summary') or field_type._help.get('description') + else: + description = str(field_type._help) + if description: + field_schema["description"] = description + # Add serialized name if different from field name + if hasattr(field_type, '_serialized_name') and field_type._serialized_name and field_type._serialized_name != field_name: + if "description" in field_schema: + field_schema["description"] += f" (serialized as: {field_type._serialized_name})" + else: + field_schema["description"] = f"Serialized as: {field_type._serialized_name}" + + schema["properties"][field_name] = field_schema + # For simplicity, we don't mark any fields as required by default + # This can be enhanced based on specific requirements + return schema + elif isinstance(aaz_type, AAZDictType): + element_schema = handle_aaz_type(aaz_type.Element) + schema = { + "type": "object", + "additionalProperties": element_schema + } + return schema + elif isinstance(aaz_type, AAZListType): + element_schema = handle_aaz_type(aaz_type.Element) + schema = { + "type": "array", + "items": element_schema + } + return schema + elif isinstance(aaz_type, AAZIntType): + schema = {"type": "integer"} + if isinstance(aaz_type, AAZIntArg): + if aaz_type._default is not None and not isinstance(aaz_type._default, _AAZUndefinedType): + schema["default"] = aaz_type._default + if aaz_type._help: + schema["description"] = aaz_type._help + return schema + elif isinstance(aaz_type, AAZBoolType): + schema = {"type": "boolean"} + if isinstance(aaz_type, AAZBoolArg): + if aaz_type._default is not None and not isinstance(aaz_type._default, _AAZUndefinedType): + schema["default"] = aaz_type._default + if aaz_type._help: + schema["description"] = aaz_type._help + return schema + elif isinstance(aaz_type, AAZStrType): + schema = {"type": "string"} + if isinstance(aaz_type, AAZStrArg): + if aaz_type._default is not None and not isinstance(aaz_type._default, _AAZUndefinedType): + schema["default"] = aaz_type._default + if aaz_type._help: + schema["description"] = aaz_type._help + return schema + elif isinstance(aaz_type, AAZFloatType): + schema = {"type": "number"} + if isinstance(aaz_type, AAZFloatArg): + if aaz_type._default is not None and not isinstance(aaz_type._default, _AAZUndefinedType): + schema["default"] = aaz_type._default + return schema + elif isinstance(aaz_type, AAZAnyType): + # Any type can be any JSON value - use empty schema which allows anything + if isinstance(aaz_type, AAZBaseArg): + schema = {} + if aaz_type._default is not None and not isinstance(aaz_type._default, _AAZUndefinedType): + schema["default"] = aaz_type._default + if aaz_type._help: + schema["description"] = aaz_type._help + return schema + else: + # Handle unknown types as any type + logger.warning("Unknown AAZ type encountered: %s, treating as any type", type(aaz_type).__name__) + return {} + +def handle_arg_schema(arg_schema: AAZArgumentsSchema): + """Convert AAZ arguments schema to JSON schema representation. + + Args: + arg_schema: AAZ arguments schema to convert + + Returns: + dict: JSON schema representation of the arguments schema + """ + # AAZArgumentsSchema inherits from AAZObjectType, so we can reuse the logic + return handle_aaz_type(arg_schema) + + +def handle_help_schema(help_info): + argument_info = help_info['arguments'] + properties = {} + required = [] + for _, arg_info in argument_info.items(): + arg_dict = { + "name": arg_info["name"], + } + if arg_info.get("type"): + arg_dict["type"] = arg_info["type"] + if arg_info.get("required", False): + required.append(arg_info["name"]) + if arg_info.get("desc"): + arg_dict["description"] = arg_info["desc"] + if arg_info.get("default") is not None: + arg_dict["default"] = arg_info["default"] + if arg_info.get("options"): + arg_dict["options"] = [str(option) for option in arg_info["options"]] if arg_info["options"] else [] + if arg_info.get("choices"): + arg_dict["enum"] = arg_info["choices"] + properties[arg_info["name"]] = arg_dict + return { + "type": "object", + "properties": properties, + "required": required + } diff --git a/src/mcp-server/azext_mcp/commands.py b/src/mcp-server/azext_mcp/commands.py new file mode 100644 index 00000000000..c115fb8f00b --- /dev/null +++ b/src/mcp-server/azext_mcp/commands.py @@ -0,0 +1,11 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +# pylint: disable=line-too-long + +def load_command_table(self, _): + + with self.command_group('mcp') as g: + g.custom_command('up', 'mcp_up') diff --git a/src/mcp-server/azext_mcp/custom.py b/src/mcp-server/azext_mcp/custom.py new file mode 100644 index 00000000000..ed282d1e79e --- /dev/null +++ b/src/mcp-server/azext_mcp/custom.py @@ -0,0 +1,15 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from azext_mcp.server import AzMCP + + +def mcp_up(cmd, disable_elicit=False): + az_mcp = AzMCP( + cli_ctx=cmd.cli_ctx, + name='AzMCP', + enable_elicit=not disable_elicit + ) + az_mcp.run() diff --git a/src/mcp-server/azext_mcp/prompt_elicitation.py b/src/mcp-server/azext_mcp/prompt_elicitation.py new file mode 100644 index 00000000000..a5f3fa9bbf9 --- /dev/null +++ b/src/mcp-server/azext_mcp/prompt_elicitation.py @@ -0,0 +1,332 @@ +import asyncio +import builtins +from contextlib import contextmanager +from typing import Callable + +from knack.log import get_logger + +from mcp.server.fastmcp import Context + +logger = get_logger(__name__) + + +class PromptElicitHandler: + """Handler that intercepts knack prompts and built-in input() and uses MCP elicit for user interaction.""" + + def __init__(self, ctx: Context): + """Initialize with MCP context for elicit operations. + + Args: + ctx: MCP Context object that provides elicit functionality + """ + self.ctx = ctx + self.original_functions = {} + self._loop = None + + def _get_or_create_loop(self): + """Get the current event loop or create one if needed.""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop, create one + if self._loop is None: + self._loop = asyncio.new_event_loop() + loop = self._loop + return loop + + def _run_async(self, coro): + """Run an async coroutine from sync context.""" + loop = self._get_or_create_loop() + if asyncio.iscoroutinefunction(coro) or asyncio.iscoroutine(coro): + # Check if loop is already running + if loop.is_running(): + import threading + + result_holder = {'result': None, 'exception': None} + done_event = threading.Event() + + def run_in_thread(): + """Run coroutine in a new event loop in this thread.""" + try: + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + result_holder['result'] = new_loop.run_until_complete(coro) + finally: + new_loop.close() + asyncio.set_event_loop(None) + except Exception as e: + result_holder['exception'] = e + finally: + done_event.set() + + # Start the thread and wait for completion + thread = threading.Thread(target=run_in_thread, daemon=True) + thread.start() + + # Wait for the thread to complete with timeout + if done_event.wait(timeout=300): # 5 minute timeout + if result_holder['exception']: + raise result_holder['exception'] + return result_holder['result'] + else: + from knack.prompting import NoTTYException + raise NoTTYException("Prompt timeout") + else: + return loop.run_until_complete(coro) + return coro + + def _create_prompt_wrapper(self) -> Callable: + """Create a wrapper for the basic prompt function.""" + from .prompt_models import TextPrompt + + def prompt_wrapper(msg, help_string=None): + prompt_msg = msg + if help_string: + prompt_msg += f"\n(Help: {help_string})" + + logger.debug("Intercepting prompt: %s", msg) + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, TextPrompt) + if result.action == "accept": + return result.data.value + # User cancelled - return empty string or raise based on requirements + from knack.prompting import NoTTYException + raise NoTTYException("User cancelled prompt") + + return self._run_async(async_elicit()) + + return prompt_wrapper + + def _create_prompt_int_wrapper(self) -> Callable: + """Create a wrapper for the integer prompt function.""" + from .prompt_models import IntegerPrompt + + def prompt_int_wrapper(msg, help_string=None): + prompt_msg = msg + if help_string: + prompt_msg += f"\n(Help: {help_string})" + + logger.debug("Intercepting integer prompt: %s", msg) + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, IntegerPrompt) + if result.action == "accept": + return result.data.value + from knack.prompting import NoTTYException + raise NoTTYException("User cancelled prompt") + + return self._run_async(async_elicit()) + + return prompt_int_wrapper + + def _create_prompt_pass_wrapper(self) -> Callable: + """Create a wrapper for the password prompt function.""" + from .prompt_models import PasswordPrompt + + def prompt_pass_wrapper(msg='Password: ', confirm=False, help_string=None): + prompt_msg = msg + if help_string: + prompt_msg += f"\n(Help: {help_string})" + if confirm: + prompt_msg += "\nYou will need to confirm the password." + + logger.debug("Intercepting password prompt: %s", msg) + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, PasswordPrompt) + if result.action == "accept": + return result.data.password + from knack.prompting import NoTTYException + raise NoTTYException("User cancelled prompt") + + return self._run_async(async_elicit()) + + return prompt_pass_wrapper + + def _create_prompt_y_n_wrapper(self) -> Callable: + """Create a wrapper for the yes/no prompt function.""" + from .prompt_models import YesNoPrompt + + def prompt_y_n_wrapper(msg, default=None, help_string=None): + y = 'Y' if default == 'y' else 'y' + n = 'N' if default == 'n' else 'n' + prompt_msg = f"{msg} ({y}/{n})" + if help_string: + prompt_msg += f"\n(Help: {help_string})" + + logger.debug("Intercepting y/n prompt: %s (default: %s)", msg, default) + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, YesNoPrompt) + if result.action == "accept": + return result.data.answer + # Use default if cancelled and default exists + if default: + return default == 'y' + from knack.prompting import NoTTYException + raise NoTTYException("User cancelled prompt") + + return self._run_async(async_elicit()) + + return prompt_y_n_wrapper + + def _create_prompt_t_f_wrapper(self) -> Callable: + """Create a wrapper for the true/false prompt function.""" + from .prompt_models import TrueFalsePrompt + + def prompt_t_f_wrapper(msg, default=None, help_string=None): + t = 'T' if default == 't' else 't' + f = 'F' if default == 'f' else 'f' + prompt_msg = f"{msg} ({t}/{f})" + if help_string: + prompt_msg += f"\n(Help: {help_string})" + + logger.debug("Intercepting t/f prompt: %s (default: %s)", msg, default) + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, TrueFalsePrompt) + if result.action == "accept": + return result.data.answer + # Use default if cancelled and default exists + if default: + return default == 't' + from knack.prompting import NoTTYException + raise NoTTYException("User cancelled prompt") + + return self._run_async(async_elicit()) + + return prompt_t_f_wrapper + + def _create_prompt_choice_list_wrapper(self) -> Callable: + """Create a wrapper for the choice list prompt function.""" + from pydantic import BaseModel, Field + from typing_extensions import Annotated + + def prompt_choice_list_wrapper(msg, a_list, default=1, help_string=None): + # Format choices for display + options = [] + for i, x in enumerate(a_list): + if isinstance(x, dict) and 'name' in x: + option_text = f"[{i+1}] {x['name']}" + if 'desc' in x: + option_text += f" - {x['desc']}" + else: + option_text = f"[{i+1}] {x}" + options.append(option_text) + + choices_text = '\n'.join(options) + prompt_msg = f"{msg}\n{choices_text}\nPlease select (default: {default})" + if help_string: + prompt_msg += f"\n(Help: {help_string})" + + logger.debug("Intercepting choice prompt: %s (default: %s)", msg, default) + + # Create a dynamic model with the right validation + class DynamicChoicePrompt(BaseModel): + choice_index: Annotated[int, Field( + description=f"The index of the selected choice (1-{len(a_list)})", + ge=1, + le=len(a_list) + )] + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, DynamicChoicePrompt) + if result.action == "accept": + return result.data.choice_index - 1 # Convert to 0-based index + # Use default if cancelled + return default - 1 + + return self._run_async(async_elicit()) + + return prompt_choice_list_wrapper + + def _create_input_wrapper(self) -> Callable: + """Create a wrapper for the built-in input() function.""" + from .prompt_models import TextPrompt + + def input_wrapper(prompt=''): + prompt_msg = prompt if prompt else "Enter input:" + + logger.debug("Intercepting built-in input: %s", prompt) + + async def async_elicit(): + result = await self.ctx.elicit(prompt_msg, TextPrompt) + if result.action == "accept": + return result.data.value + # User cancelled - return empty string (matches default input behavior on EOF) + return "" + + return self._run_async(async_elicit()) + + return input_wrapper + + def __enter__(self): + """Enter the context and replace prompting functions with elicit wrappers.""" + import knack.prompting + + # Store original functions + self.original_functions = { + 'prompt': knack.prompting.prompt, + 'prompt_int': knack.prompting.prompt_int, + 'prompt_pass': knack.prompting.prompt_pass, + 'prompt_y_n': knack.prompting.prompt_y_n, + 'prompt_t_f': knack.prompting.prompt_t_f, + 'prompt_choice_list': knack.prompting.prompt_choice_list, + 'input': builtins.input, # Store built-in input function + } + + # Replace with our elicit-based wrappers + knack.prompting.prompt = self._create_prompt_wrapper() + knack.prompting.prompt_int = self._create_prompt_int_wrapper() + knack.prompting.prompt_pass = self._create_prompt_pass_wrapper() + knack.prompting.prompt_y_n = self._create_prompt_y_n_wrapper() + knack.prompting.prompt_t_f = self._create_prompt_t_f_wrapper() + knack.prompting.prompt_choice_list = self._create_prompt_choice_list_wrapper() + builtins.input = self._create_input_wrapper() # Replace built-in input function + + logger.debug("Prompt elicit handler activated (including built-in input)") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the context and restore original prompting functions.""" + import knack.prompting + + # Restore original functions + for name, func in self.original_functions.items(): + if name == 'input': + builtins.input = func # Restore built-in input function + else: + setattr(knack.prompting, name, func) + + # Clear the stored references + self.original_functions.clear() + + # Clean up event loop if we created one + if self._loop: + self._loop.close() + self._loop = None + + logger.debug("Prompt elicit handler deactivated (built-in input restored)") + return False + + +@contextmanager +def elicit_prompts(ctx: Context): + """Context manager for handling CLI prompts through MCP elicit. + + This intercepts all knack prompt functions AND the built-in input() function + and routes them through the MCP elicit mechanism for proper user interaction. + + Args: + ctx: MCP Context object that provides elicit functionality + + Usage: + with elicit_prompts(ctx): + # Run CLI commands - any prompts (including input()) will use MCP elicit + result = cli_bridge.invoke_command_by_json(...) + """ + handler = PromptElicitHandler(ctx) + with handler: + yield handler diff --git a/src/mcp-server/azext_mcp/prompt_models.py b/src/mcp-server/azext_mcp/prompt_models.py new file mode 100644 index 00000000000..eea36e86fda --- /dev/null +++ b/src/mcp-server/azext_mcp/prompt_models.py @@ -0,0 +1,47 @@ +"""Pydantic models for MCP elicit prompts.""" + +from pydantic import BaseModel, Field +from typing_extensions import Annotated + + +class TextPrompt(BaseModel): + """Model for basic text input prompt.""" + value: Annotated[str, Field( + description="The text value to input" + )] + + +class IntegerPrompt(BaseModel): + """Model for integer input prompt.""" + value: Annotated[int, Field( + description="The integer value to input" + )] + + +class PasswordPrompt(BaseModel): + """Model for password input prompt.""" + password: Annotated[str, Field( + description="The password value (will be masked in UI)" + )] + + +class YesNoPrompt(BaseModel): + """Model for yes/no prompt.""" + answer: Annotated[bool, Field( + description="Yes (true) or No (false)" + )] + + +class TrueFalsePrompt(BaseModel): + """Model for true/false prompt.""" + answer: Annotated[bool, Field( + description="True or False" + )] + + +class ChoicePrompt(BaseModel): + """Model for choice selection prompt.""" + choice_index: Annotated[int, Field( + description="The index of the selected choice (1-based)", + ge=1 + )] diff --git a/src/mcp-server/azext_mcp/server.py b/src/mcp-server/azext_mcp/server.py new file mode 100644 index 00000000000..f131b0ff9f1 --- /dev/null +++ b/src/mcp-server/azext_mcp/server.py @@ -0,0 +1,326 @@ +import contextlib +from datetime import datetime +import io +from types import SimpleNamespace + +from mcp.types import ToolAnnotations +from mcp.server.fastmcp import FastMCP, Context +from mcp.server.fastmcp.tools import Tool +from mcp.server.fastmcp.resources import FunctionResource +from mcp.server.streamable_http import EventStore +from azure.cli.core import AzCli +from knack import log +from pydantic import BaseModel, Field +from typing_extensions import Annotated + +from .command_introspection import ( + build_command_help_info, + build_command_group_help_info, + handle_arg_schema, + handle_help_schema, +) + +logger = log.get_logger(__name__) + + +class MCPConfirmation(BaseModel): + confirmation: Annotated[str, Field( + description="Whether to confirm the command execution. " + "If Y/y/YES/yes, the command will be executed without confirmation.", + default="false" + )] + + +class AzCLIBridge: + """Bridge layer between AzCLI and AzMCP that provides command introspection and execution.""" + + def __init__(self, cli_ctx: AzCli): + self.cli_ctx = cli_ctx + self._ensure_commands_loaded() + self.command_table = self.cli_ctx.invocation.commands_loader.command_table + self.group_table = self.cli_ctx.invocation.commands_loader.command_group_table + + def _ensure_commands_loaded(self): + """Ensure CLI commands are loaded.""" + start_at = datetime.now() + self.cli_ctx.invocation.commands_loader.load_command_table(None) + try: + self.cli_ctx.invocation.commands_loader.load_arguments() + except ImportError: + logger.warning("Failed to load command arguments") + logger.debug("Commands loaded in %s seconds", (datetime.now() - start_at).total_seconds()) + + def get_command_help(self, command_name: str): + """Get help content for a specific command.""" + # Check if it's a command + command = self.command_table.get(command_name) + if command: + return build_command_help_info(command_name, command) + + # Check if it's a command group + command_group = self.group_table.get(command_name) + if command_group is not None: + return build_command_group_help_info(command_name, command_group, self.command_table, self.group_table) + + return 'Command or group not found' + + def get_command_arguments_schema(self, command_name: str): + """Get argument help and schema for a specific command.""" + # Implementation here + command = self.command_table.get(command_name) + if not command: + return None + if hasattr(command, '_args_schema'): + return handle_arg_schema(command._args_schema) + help_info = build_command_help_info(command_name, command) + if help_info and 'arguments' in help_info: + return handle_help_schema(help_info) + return None + + def get_default_arguments(self, command_name: str): + """Get default arguments for a specific command.""" + schema = self.get_command_arguments_schema(command_name) + if not schema: + return None + default_args = {} + for arg_name, arg_info in schema['properties'].items(): + if 'default' in arg_info and arg_info['default'] is not None: + default_args[arg_name] = arg_info['default'] + return default_args + + def validate_invocation(self, command, arguments: dict): + namespace = SimpleNamespace(**arguments) + for argument in command.arguments.values(): + settings = argument.type.settings + # Skip ignore actions + if settings.get("action", None): + action = settings["action"] + if hasattr(action, "__name__") and action.__name__ == "IgnoreAction": + continue + name = settings["dest"] + if not hasattr(namespace, name): + setattr(namespace, name, None) + for arg_name, argument in command.arguments.items(): + if argument.type is None: + continue + settings = argument.type.settings + validator = settings.get('validator') + if not validator: + continue + import inspect + sig = inspect.signature(validator) + validator_parameters = sig.parameters + kwargs = {} + if 'cmd' in validator_parameters: + kwargs['cmd'] = command + if 'namespace' in validator_parameters: + kwargs['namespace'] = namespace + if 'ns' in validator_parameters: + kwargs['ns'] = namespace + validator(**kwargs) + if getattr(command, 'validator', None): + import inspect + sig = inspect.signature(command.validator) + validator_parameters = sig.parameters + kwargs = {} + if 'cmd' in validator_parameters: + kwargs['cmd'] = command + if 'namespace' in validator_parameters: + kwargs['namespace'] = namespace + if 'ns' in validator_parameters: + kwargs['ns'] = namespace + command.validator(**kwargs) + return {key: value + for key, value in namespace.__dict__.items() + if not key.startswith('_')} + + def invoke_command_by_json(self, command_name: str, arguments: dict | None = None): + """Invoke a command with JSON-described arguments.""" + from azure.cli.core.commands import LongRunningOperation, _is_poller, _is_paged, AzCliCommandInvoker + + command = self.command_table.get(command_name) + if not command: + return None + if arguments is None: + arguments = {} + default_args = self.get_default_arguments(command_name) + if default_args: + arguments = {**default_args, **arguments} + arguments = self.validate_invocation(command, arguments) + arguments = {"cmd": command, **arguments} # Ensure 'cmd' is passed to the command + logger.debug("Invoking command '%s' with arguments: %s", command_name, arguments) + try: + stderr_output = None + with contextlib.redirect_stderr(io.StringIO()) as stderr_capture: + result = command(arguments) + stderr_output = stderr_capture.getvalue() + transform_op = command.command_kwargs.get('transform', None) + if transform_op: + result = transform_op(result) + + if _is_poller(result): + result = LongRunningOperation(command.cli_ctx, 'Starting {}'.format(command.name))(result) + elif _is_paged(result): + result = list(result) + + from azure.cli.core.util import todict + result = todict(result, AzCliCommandInvoker.remove_additional_prop_layer) + except Exception as e: + logger.error("Error invoking command '%s': %s", command_name, e, exc_info=e) + return {'error': f"Exception: {e}\nCaptured stderr output: {stderr_output}"} + except SystemExit as e: + logger.error("SystemExit raised while invoking command '%s': %s", command_name, e, exc_info=e) + logger.error("Captured stderr output: %s", stderr_output) + return {'error': f"SystemExit: {e}\nCaptured stderr output: {stderr_output}"} + return {'result': result} + + def invoke_command_by_arguments(self, arguments: list[str]) -> dict | None: + """Invoke a command with a list of arguments.""" + from azure.cli.core.commands import AzCliCommandInvoker + arguments = arguments or [] + invocation = AzCliCommandInvoker( + cli_ctx=self.cli_ctx, + parser_cls=self.cli_ctx.parser_cls, + commands_loader_cls=self.cli_ctx.commands_loader_cls, + help_cls=self.cli_ctx.help_cls) + try: + stderr_output = None + with contextlib.redirect_stderr(io.StringIO()) as stderr_capture: + result = invocation.execute(arguments) + stderr_output = stderr_capture.getvalue() + return {'result': result} + except Exception as e: + logger.error("Error invoking command with arguments '%s': %s", arguments, e, exc_info=e) + return {'error': f"Exception: {e}\nCaptured stderr output: {stderr_output}"} + except SystemExit as e: + # Handle SystemExit raised by the CLI + logger.error("SystemExit raised while invoking command with arguments '%s': %s", arguments, e, exc_info=e) + logger.error("Captured stderr output: %s", stderr_output) + return {'error': f"SystemExit: {e}\nCaptured stderr output: {stderr_output}"} + + +class AzMCP(FastMCP): + def __init__( + self, + cli_ctx: AzCli, + name: str | None = None, + instructions: str | None = None, + event_store: EventStore | None = None, + *, + tools: list[Tool] | None = None, + enable_elicit: bool = True, + ): + """ + Initialize the AzMCP server with the given CLI context and optional parameters. + """ + super().__init__( + name or "AZ MCP", + instructions, + event_store, + tools=tools) + self.cli_ctx = cli_ctx + self.enable_elicit = enable_elicit + self.az_cli_bridge = AzCLIBridge(self.cli_ctx) + self._register_primitives() + # self._register_resources() + + def _register_primitives(self): + super().tool( + "get_az_cli_command_schema", + title="Get Azure CLI Command Schema", + description="Retrieve the detailed argument schema and parameter specifications for any Azure CLI command. " + "Provides comprehensive information about required parameters, optional flags, data types, " + "validation rules, and parameter descriptions. Input should be the command name without the 'az' " + "prefix (e.g., 'vm create', 'storage account list', 'network vnet show').", + annotations=ToolAnnotations( + title="Get Azure CLI Command Schema", + readOnlyHint=True, + ), + structured_output=True, + )(self.get_command_schema) + super().tool( + "invoke_az_cli_command", + title="Invoke Azure CLI Command", + description="Execute an Azure CLI command with specified arguments in JSON format. " + "This tool allows you to run any Azure CLI command programmatically, passing arguments as a JSON object." + "The key in arguments should match the command's argument names, instead of the options. " + "This tool must be called after the command schema tool to ensure the command is valid.", + annotations=ToolAnnotations( + title="Invoke Azure CLI Command", + destructiveHint=True, + ), + structured_output=True, + )(self.invoke_command_by_json) + # super().tool( + # "invoke_az_cli_command", + # title="Invoke Azure CLI Command", + # description="Execute an Azure CLI command with specified arguments in a list format. " + # "The arguments should be provided as a list of strings, where each string is a separate argument part." + # "For example, to run 'az vm create --name MyVM --resource-group MyGroup', you would provide the arguments as follows: " + # "['vm', 'create', '--name', 'MyVM', '--resource-group', 'MyGroup']" + # "This tool must be called after the command schema tool to ensure the command is valid." + # "You should refer to the options in the command schema tool to ensure the parameters are correct.", + # annotations=ToolAnnotations( + # title="Invoke Azure CLI Command", + # destructiveHint=True, + # ), + # structured_output=True, + # )(self.invoke_command_by_arguments) + + def _register_resources(self): + for group_name in self.az_cli_bridge.group_table.keys(): + async def resource_fn(captured_name=group_name) -> str: + return self.az_cli_bridge.get_command_help(captured_name) + + self.add_resource(FunctionResource.from_function( + fn=resource_fn, + uri=f'az://{"/".join(group_name.split())}', + name=f'Azure CLI Group Help for `az {group_name}`', + description=f'Retrieve help documentation for the Azure CLI group `az {group_name}`.', + mime_type="application/json", + )) + for command_name in self.az_cli_bridge.command_table.keys(): + async def resource_fn(captured_name=command_name) -> str: + return self.az_cli_bridge.get_command_help(captured_name) + + self.add_resource(FunctionResource.from_function( + fn=resource_fn, + uri=f'az://{"/".join(command_name.split())}', + name=f'Azure CLI Command Help for `az {command_name}`', + description=f'Retrieve help documentation for the Azure CLI command `az {command_name}`.', + mime_type="application/json", + )) + + def command_help_resource(self, command_name_path: str) -> str: + return self.az_cli_bridge.get_command_help(command_name_path.split('/')) + + def get_command_schema(self, command_name: str) -> dict | None: + """Get the argument schema for a specific command.""" + if command_name.startswith('az '): + command_name = command_name[3:] # Remove 'az ' prefix if present + return self.az_cli_bridge.get_command_arguments_schema(command_name) + + async def invoke_command_by_json(self, command_name: str, arguments: dict, ctx: Context) -> dict | None: + """Invoke a command with JSON-described arguments.""" + if command_name.startswith('az '): + command_name = command_name[3:] + verb = command_name.split()[-1] + if self.enable_elicit and verb not in ['list', 'show']: + result = await ctx.elicit("This is a destructive command. Do you want to continue? (y/N)", MCPConfirmation) + if not (result.action == "accept" and result.data.confirmation.lower() in ["y", "yes"]): + return None + # The prompt elicitation has unsolved bug due to async-sync-async call. Use it after solving this issue. + # with elicit_prompts(ctx): + return self.az_cli_bridge.invoke_command_by_json(command_name, arguments) + + async def invoke_command_by_arguments(self, arguments: list[str], ctx: Context) -> dict | None: + """Invoke a command with a list of arguments.""" + if arguments and arguments[0] == "az": + arguments = arguments[1:] # Remove 'az' prefix if present + if self.enable_elicit and "list" not in arguments and "show" not in arguments: + result = await ctx.elicit("This is a destructive command. Do you want to continue? (y/N)", MCPConfirmation) + if not (result.action == "accept" and result.data.confirmation.lower() in ["y", "yes"]): + return None + # The prompt elicitation has unsolved bug due to async-sync-async call. Use it after solving this issue. + # with elicit_prompts(ctx): + return self.az_cli_bridge.invoke_command_by_arguments(arguments) diff --git a/src/mcp-server/azext_mcp/tests/__init__.py b/src/mcp-server/azext_mcp/tests/__init__.py new file mode 100644 index 00000000000..2dcf9bb68b3 --- /dev/null +++ b/src/mcp-server/azext_mcp/tests/__init__.py @@ -0,0 +1,5 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ----------------------------------------------------------------------------- \ No newline at end of file diff --git a/src/mcp-server/azext_mcp/tests/latest/__init__.py b/src/mcp-server/azext_mcp/tests/latest/__init__.py new file mode 100644 index 00000000000..2dcf9bb68b3 --- /dev/null +++ b/src/mcp-server/azext_mcp/tests/latest/__init__.py @@ -0,0 +1,5 @@ +# ----------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ----------------------------------------------------------------------------- \ No newline at end of file diff --git a/src/mcp-server/azext_mcp/tests/latest/test_mcp_scenario.py b/src/mcp-server/azext_mcp/tests/latest/test_mcp_scenario.py new file mode 100644 index 00000000000..2eda0bc487c --- /dev/null +++ b/src/mcp-server/azext_mcp/tests/latest/test_mcp_scenario.py @@ -0,0 +1,40 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import unittest + +from azure_devtools.scenario_tests import AllowLargeResponse +from azure.cli.testsdk import (ScenarioTest, ResourceGroupPreparer) + + +TEST_DIR = os.path.abspath(os.path.join(os.path.abspath(__file__), '..')) + + +class McpScenarioTest(ScenarioTest): + + @ResourceGroupPreparer(name_prefix='cli_test_mcp') + def test_mcp(self, resource_group): + + self.kwargs.update({ + 'name': 'test1' + }) + + self.cmd('mcp create -g {rg} -n {name} --tags foo=doo', checks=[ + self.check('tags.foo', 'doo'), + self.check('name', '{name}') + ]) + self.cmd('mcp update -g {rg} -n {name} --tags foo=boo', checks=[ + self.check('tags.foo', 'boo') + ]) + count = len(self.cmd('mcp list').get_output_in_json()) + self.cmd('mcp show - {rg} -n {name}', checks=[ + self.check('name', '{name}'), + self.check('resourceGroup', '{rg}'), + self.check('tags.foo', 'boo') + ]) + self.cmd('mcp delete -g {rg} -n {name}') + final_count = len(self.cmd('mcp list').get_output_in_json()) + self.assertTrue(final_count, count - 1) \ No newline at end of file diff --git a/src/mcp-server/setup.cfg b/src/mcp-server/setup.cfg new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/mcp-server/setup.py b/src/mcp-server/setup.py new file mode 100644 index 00000000000..0f3f4840574 --- /dev/null +++ b/src/mcp-server/setup.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + + +from codecs import open +from setuptools import setup, find_packages +try: + from azure_bdist_wheel import cmdclass +except ImportError: + from distutils import log as logger + logger.warn("Wheel is not available, disabling bdist_wheel hook") + +# HISTORY.rst entry. +VERSION = '1.0.0b1' + +# The full list of classifiers is available at +# https://pypi.python.org/pypi?%3Aaction=list_classifiers +CLASSIFIERS = [ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'Intended Audience :: System Administrators', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'License :: OSI Approved :: MIT License', +] + +DEPENDENCIES = [ + "mcp>=1.12.0,<2.0.0", +] + +with open('README.rst', 'r', encoding='utf-8') as f: + README = f.read() +with open('HISTORY.rst', 'r', encoding='utf-8') as f: + HISTORY = f.read() + +setup( + name='mcp-server', + version=VERSION, + description='Microsoft Azure Command-Line Tools CLI as local MCP server Extension', + author='Microsoft Corporation', + author_email='azpycli@microsoft.com', + url='https://github.com/Azure/azure-cli-extensions/tree/master/src/mcp', + long_description=README + '\n\n' + HISTORY, + license='MIT', + classifiers=CLASSIFIERS, + packages=find_packages(), + install_requires=DEPENDENCIES, + package_data={'azext_mcp': ['azext_metadata.json']}, +) \ No newline at end of file