diff --git a/src/oci-monitoring-mcp-server/README.md b/src/oci-monitoring-mcp-server/README.md index 0e4057c..d30ad4c 100644 --- a/src/oci-monitoring-mcp-server/README.md +++ b/src/oci-monitoring-mcp-server/README.md @@ -12,25 +12,29 @@ uv run oracle.oci-monitoring-mcp-server ## Tools -| Tool Name | Description | -| --- | --- | -| list_metrics | List metrics in the tenancy | -| get_metric | Get metric by name | +| Tool Name | Description | +|-----------------------|------------------------------------------------------------------| +| list_alarms | List Alarms in the tenancy | +| get_metrics_data | Gets aggregated metric data | +| get_available_metrics | Lists the available metrics a user can query on in their tenancy | - -⚠️ **NOTE**: All actions are performed with the permissions of the configured OCI CLI profile. We advise least-privilege IAM setup, secure credential management, safe network practices, secure logging, and warn against exposing secrets. +⚠️ **NOTE**: All actions are performed with the permissions of the configured OCI CLI profile. We advise least-privilege +IAM setup, secure credential management, safe network practices, secure logging, and warn against exposing secrets. ## Third-Party APIs -Developers choosing to distribute a binary implementation of this project are responsible for obtaining and providing all required licenses and copyright notices for the third-party code used in order to ensure compliance with their respective open source licenses. +Developers choosing to distribute a binary implementation of this project are responsible for obtaining and providing +all required licenses and copyright notices for the third-party code used in order to ensure compliance with their +respective open source licenses. ## Disclaimer -Users are responsible for their local environment and credential safety. Different language model selections may yield different results and performance. +Users are responsible for their local environment and credential safety. Different language model selections may yield +different results and performance. ## License Copyright (c) 2025 Oracle and/or its affiliates. - + Released under the Universal Permissive License v1.0 as shown at . diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/alarms/models.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/alarms/models.py new file mode 100644 index 0000000..ad3eade --- /dev/null +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/alarms/models.py @@ -0,0 +1,236 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Any, Dict, List, Literal, Optional + +import oci +from pydantic import BaseModel, Field + + +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +SeverityType = Literal["CRITICAL", "ERROR", "WARNING", "INFO", "UNKNOWN_ENUM_VALUE"] + + +class Suppression(BaseModel): + """ + Pydantic model mirroring oci.monitoring.models.Suppression. + """ + + description: Optional[str] = Field( + None, description="Human-readable description of the suppression." + ) + time_suppress_from: Optional[datetime] = Field( + None, description="The start time for the suppression (RFC3339)." + ) + time_suppress_until: Optional[datetime] = Field( + None, description="The end time for the suppression (RFC3339)." + ) + + +def map_suppression(s: oci.monitoring.models.Suppression | None) -> Suppression | None: + if not s: + return None + return Suppression( + description=getattr(s, "description", None), + time_suppress_from=getattr(s, "time_suppress_from", None) + or getattr(s, "timeSuppressFrom", None), + time_suppress_until=getattr(s, "time_suppress_until", None) + or getattr(s, "timeSuppressUntil", None), + ) + + +class AlarmOverride(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.AlarmOverride. + Each override can specify values for query, severity, body, and pending duration. + """ + + rule_name: Optional[str] = Field( + None, + description="Identifier of the alarm's base/override values. Default is 'BASE'.", + ) + query: Optional[str] = Field( + None, description="MQL expression override for this rule." + ) + severity: Optional[SeverityType] = Field( + None, description="Severity override for this rule." + ) + body: Optional[str] = Field(None, description="Message body override (alarm body).") + pending_duration: Optional[str] = Field( + None, + description="Override for pending duration as ISO 8601 duration (e.g., 'PT5M').", + ) + + +def map_alarm_override( + o: oci.monitoring.models.AlarmOverride | None, +) -> AlarmOverride | None: + if not o: + return None + return AlarmOverride( + rule_name=getattr(o, "rule_name", None) or getattr(o, "ruleName", None), + query=getattr(o, "query", None), + severity=getattr(o, "severity", None), + body=getattr(o, "body", None), + pending_duration=getattr(o, "pending_duration", None) + or getattr(o, "pendingDuration", None), + ) + + +def map_alarm_overrides(items) -> list[AlarmOverride] | None: + if not items: + return None + result: list[AlarmOverride] = [] + for it in items: + mapped = map_alarm_override(it) + if mapped is not None: + result.append(mapped) + return result if result else None + + +class AlarmSummary(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.AlarmSummary. + """ + + id: Optional[str] = Field(None, description="The OCID of the alarm.") + display_name: Optional[str] = Field( + None, + description="A user-friendly name for the alarm; used as title in notifications.", + ) + compartment_id: Optional[str] = Field( + None, description="The OCID of the compartment containing the alarm." + ) + metric_compartment_id: Optional[str] = Field( + None, + description="The OCID of the compartment containing the metric evaluated by the alarm.", + ) + namespace: Optional[str] = Field( + None, description="The source service/application emitting the metric." + ) + query: Optional[str] = Field( + None, + description="The Monitoring Query Language (MQL) expression to evaluate for the alarm.", + ) + severity: Optional[SeverityType] = Field( + None, + description="The perceived type of response required when the alarm is FIRING.", + ) + destinations: Optional[List[str]] = Field( + None, + description="List of destination OCIDs for alarm notifications (e.g., NotificationTopic).", + ) + suppression: Optional[Suppression] = Field( + None, description="Configuration details for suppressing an alarm." + ) + is_enabled: Optional[bool] = Field( + None, description="Whether the alarm is enabled." + ) + is_notifications_per_metric_dimension_enabled: Optional[bool] = Field( + None, + description="Whether the alarm sends a separate message for each metric stream.", + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, description="Simple key/value pair tags applied without predefined names." + ) + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, description="Defined tags for this resource, scoped to namespaces." + ) + lifecycle_state: Optional[str] = Field( + None, description="The current lifecycle state of the alarm." + ) + overrides: Optional[List[AlarmOverride]] = Field( + None, + description="Overrides controlling alarm evaluations (query, severity, body, pending duration).", + ) + rule_name: Optional[str] = Field( + None, + description="Identifier of the alarm’s base values when overrides are present; default 'BASE'.", + ) + notification_version: Optional[str] = Field( + None, + description="Version of the alarm notification to be delivered (e.g., '1.X').", + ) + notification_title: Optional[str] = Field( + None, + description="Customizable notification title used as subject/title in messages.", + ) + evaluation_slack_duration: Optional[str] = Field( + None, + description="Slack period for metric ingestion before evaluating the alarm, ISO 8601 (e.g., 'PT3M').", + ) + alarm_summary: Optional[str] = Field( + None, + description="Customizable alarm summary (message body) with optional dynamic variables.", + ) + resource_group: Optional[str] = Field( + None, + description="Resource group to match for metrics used by this alarm.", + ) + + +def map_alarm_summary( + alarm: oci.monitoring.models.AlarmSummary, +) -> AlarmSummary: + """ + Convert an oci.monitoring.models.AlarmSummary to + oracle.oci_monitoring_mcp_server.alarms.models.AlarmSummary, including nested types. + """ + return AlarmSummary( + id=getattr(alarm, "id", None), + display_name=getattr(alarm, "display_name", None) + or getattr(alarm, "displayName", None), + compartment_id=getattr(alarm, "compartment_id", None) + or getattr(alarm, "compartmentId", None), + metric_compartment_id=getattr(alarm, "metric_compartment_id", None) + or getattr(alarm, "metricCompartmentId", None), + namespace=getattr(alarm, "namespace", None), + query=getattr(alarm, "query", None), + severity=getattr(alarm, "severity", None), + destinations=getattr(alarm, "destinations", None), + suppression=map_suppression(getattr(alarm, "suppression", None)), + is_enabled=getattr(alarm, "is_enabled", None) + or getattr(alarm, "isEnabled", None), + is_notifications_per_metric_dimension_enabled=getattr( + alarm, "is_notifications_per_metric_dimension_enabled", None + ) + or getattr(alarm, "isNotificationsPerMetricDimensionEnabled", None), + freeform_tags=getattr(alarm, "freeform_tags", None) + or getattr(alarm, "freeformTags", None), + defined_tags=getattr(alarm, "defined_tags", None) + or getattr(alarm, "definedTags", None), + lifecycle_state=getattr(alarm, "lifecycle_state", None) + or getattr(alarm, "lifecycleState", None), + overrides=map_alarm_overrides(getattr(alarm, "overrides", None)), + rule_name=getattr(alarm, "rule_name", None) or getattr(alarm, "ruleName", None), + notification_version=getattr(alarm, "notification_version", None) + or getattr(alarm, "notificationVersion", None), + notification_title=getattr(alarm, "notification_title", None) + or getattr(alarm, "notificationTitle", None), + evaluation_slack_duration=getattr(alarm, "evaluation_slack_duration", None) + or getattr(alarm, "evaluationSlackDuration", None), + alarm_summary=getattr(alarm, "alarm_summary", None) + or getattr(alarm, "alarmSummary", None), + resource_group=getattr(alarm, "resource_group", None) + or getattr(alarm, "resourceGroup", None), + ) diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/alarms/tools.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/alarms/tools.py new file mode 100644 index 0000000..03c8966 --- /dev/null +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/alarms/tools.py @@ -0,0 +1,69 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +import os +from logging import Logger +from typing import Annotated, List + +import oci +from fastmcp import FastMCP +from oci import Response +from oracle.oci_monitoring_mcp_server import __project__, __version__ +from oracle.oci_monitoring_mcp_server.alarms.models import ( + AlarmSummary, + map_alarm_summary, +) + +logger = Logger(__name__, level="INFO") + +mcp = FastMCP(name=__project__) + + +class MonitoringAlarmTools: + def __init__(self): + logger.info("Loaded alarm class") + + def register(self, mcp): + """Register all alarm tools with the MCP server.""" + # Register list_alarms tool + mcp.tool( + name="list_alarms", description="Lists all alarms in a given compartment" + )(self.list_alarms) + + def get_monitoring_client(self): + logger.info("entering get_monitoring_client") + config = oci.config.from_file( + profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) + ) + user_agent_name = __project__.split("oracle.", 1)[1].split("-server", 1)[0] + config["additional_user_agent"] = f"{user_agent_name}/{__version__}" + + private_key = oci.signer.load_private_key_from_file(config["key_file"]) + token_file = config["security_token_file"] + token = None + with open(token_file, "r") as f: + token = f.read() + signer = oci.auth.signers.SecurityTokenSigner(token, private_key) + return oci.monitoring.MonitoringClient(config, signer=signer) + + def list_alarms( + self, + compartment_id: Annotated[ + str, + "The ID of the compartment containing the resources" + "monitored by the metric that you are searching for.", + ], + ) -> list[AlarmSummary] | str: + monitoring_client = self.get_monitoring_client() + response: Response | None = monitoring_client.list_alarms( + compartment_id=compartment_id + ) + if response is None: + logger.error("Received None response from list_metrics") + return "There was no response returned from the Monitoring API" + + alarms: List[oci.monitoring.models.AlarmSummary] = response.data + return [map_alarm_summary(alarm) for alarm in alarms] diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/metrics/models.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/metrics/models.py new file mode 100644 index 0000000..862a38d --- /dev/null +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/metrics/models.py @@ -0,0 +1,397 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from datetime import datetime +from typing import Dict, List, Literal, Optional + +import oci +from pydantic import BaseModel, Field + + +def _oci_to_dict(obj): + """Best-effort conversion of OCI SDK model objects to plain dicts.""" + if obj is None: + return None + try: + from oci.util import to_dict as oci_to_dict + + return oci_to_dict(obj) + except Exception: + pass + if isinstance(obj, dict): + return obj + if hasattr(obj, "__dict__"): + return {k: v for k, v in obj.__dict__.items() if not k.startswith("_")} + return None + + +# @TODO Percentile should be converted to either common or custom percentages +StatisticType = Literal[ + "absent", + "avg", + "count", + "first", + "increment", + "last", + "max", + "mean", + "min", + "percentile", + "rate", + "sum", +] +PredicateType = Literal[ + "greater_than", + "greater_than_or_equal", + "less_than", + "less_than_or_equal", + "not_equal_to", + "between", + "outside", + "absent", +] + +# Reusable Fields across tools + +CompartmentField = Field( + ..., + description="The OCID of the compartment", +) + +CompartmentIdInSubtreeField = Field( + False, + description="Whether to include metrics from all subcompartments of the specified compartment", +) + +NamespaceField = Field( + "oci_compute", + description="The source service or application to use when searching for metric data points" + "to aggregate.", + examples=["oci_compute"], +) + + +class Metric(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.Metric. + The fields below represent the commonly used attributes found on the OCI SDK Metric model. + """ + + namespace: Optional[str] = Field( + None, description="The source service or application emitting the metric." + ) + resource_group: Optional[str] = Field( + None, + description="Resource group specified for the metric. A metric can be part of a resource group.", + ) + compartment_id: Optional[str] = Field( + None, + description="The OCID of the compartment containing the resource emitting the metric.", + ) + name: Optional[str] = Field( + None, description="The metric name (for example, CpuUtilization)." + ) + dimensions: Optional[Dict[str, str]] = Field( + None, + description="Dimensions (key/value pairs) that qualify the metric (for example, resourceId).", + ) + metadata: Optional[Dict[str, str]] = Field( + None, + description="Metric metadata (for example, unit). " + "Keys and values are defined by the emitting service.", + ) + resolution: Optional[str] = Field( + None, + description="The publication resolution of the metric, such as '1m'.", + ) + + +def map_metric(metric_data: oci.monitoring.models.Metric) -> Metric: + """ + Convert an oci.monitoring.models.Metric to oracle.oci_monitoring_mcp_server.models.Metric. + """ + return Metric( + namespace=getattr(metric_data, "namespace", None), + resource_group=getattr(metric_data, "resource_group", None), + compartment_id=getattr(metric_data, "compartment_id", None), + name=getattr(metric_data, "name", None), + dimensions=getattr(metric_data, "dimensions", None), + metadata=getattr(metric_data, "metadata", None), + resolution=getattr(metric_data, "resolution", None), + ) + + +class AggregatedDatapoint(BaseModel): + """ + Pydantic model mirroring oci.monitoring.models.AggregatedDatapoint. + """ + + timestamp: Optional[datetime] = Field( + None, + description="The date and time associated with the aggregated value (RFC3339).", + ) + value: Optional[float] = Field( + None, description="The aggregated metric value for the time window." + ) + + +class MetricData(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.MetricData. + """ + + namespace: Optional[str] = Field( + None, description="The source service or application emitting the metric." + ) + resource_group: Optional[str] = Field( + None, description="Resource group specified for the metric." + ) + compartment_id: Optional[str] = Field( + None, description="The OCID of the compartment containing the resource." + ) + name: Optional[str] = Field( + None, description="The metric name (for example, CpuUtilization)." + ) + dimensions: Optional[Dict[str, str]] = Field( + None, + description="Dimensions that qualify the metric (for example, resourceId).", + ) + metadata: Optional[Dict[str, str]] = Field( + None, description="Metric metadata such as unit." + ) + resolution: Optional[str] = Field( + None, + description="The publication resolution of the metric (for example, '1m').", + ) + aggregated_datapoints: Optional[List[AggregatedDatapoint]] = Field( + None, + description="Time series datapoints aggregated at the requested resolution.", + ) + + +def map_aggregated_datapoint( + p: oci.monitoring.models.AggregatedDatapoint | None, +) -> AggregatedDatapoint | None: + if not p: + return None + return AggregatedDatapoint( + timestamp=getattr(p, "timestamp", None), + value=getattr(p, "value", None), + ) + + +def map_aggregated_datapoints(items) -> list[AggregatedDatapoint] | None: + if not items: + return None + result: list[AggregatedDatapoint] = [] + for it in items: + mapped_datapoint = map_aggregated_datapoint(it) + if mapped_datapoint is not None: + result.append(mapped_datapoint) + return result if result else None + + +def map_metric_data(metric_data: oci.monitoring.models.MetricData) -> MetricData: + """ + Convert an oci.monitoring.models.MetricData to oracle.oci_monitoring_mcp_server.models.MetricData. + """ + return MetricData( + namespace=getattr(metric_data, "namespace", None), + resource_group=getattr(metric_data, "resource_group", None), + compartment_id=getattr(metric_data, "compartment_id", None), + name=getattr(metric_data, "name", None), + dimensions=getattr(metric_data, "dimensions", None), + metadata=getattr(metric_data, "metadata", None), + resolution=getattr(metric_data, "resolution", None), + aggregated_datapoints=map_aggregated_datapoints( + getattr(metric_data, "aggregated_datapoints", None) + ), + ) + + +class Datapoint(BaseModel): + """ + Pydantic model mirroring oci.monitoring.models.Datapoint + used when posting metric data (not aggregated/summarized). + """ + + timestamp: Optional[datetime] = Field( + None, description="The time the metric value was recorded (RFC3339)." + ) + value: Optional[float] = Field( + None, description="Metric value at the given timestamp." + ) + count: Optional[int] = Field( + None, + description="Optional number of samples represented by this value (if provided).", + ) + + +def map_datapoint(p: oci.monitoring.models.Datapoint) -> Datapoint | None: + if not p: + return None + return Datapoint( + timestamp=getattr(p, "timestamp", None), + value=getattr(p, "value", None), + count=getattr(p, "count", None), + ) + + +def map_datapoints(items) -> list[Datapoint] | None: + if not items: + return None + result: list[Datapoint] = [] + for it in items: + mapped_datapoint = map_datapoint(it) + if mapped_datapoint is not None: + result.append(mapped_datapoint) + return result if result else None + + +class MetricDataDetails(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.MetricDataDetails. + Represents a single time series being posted to the Monitoring service. + """ + + namespace: Optional[str] = Field( + None, description="The source service or application emitting the metric." + ) + resource_group: Optional[str] = Field( + None, description="Resource group specified for the metric." + ) + compartment_id: Optional[str] = Field( + None, description="The OCID of the compartment containing the resource." + ) + name: Optional[str] = Field( + None, description="The metric name (for example, CpuUtilization)." + ) + dimensions: Optional[Dict[str, str]] = Field( + None, + description="Dimensions that qualify the metric (for example, resourceId).", + ) + metadata: Optional[Dict[str, str]] = Field( + None, description="Metric metadata such as unit." + ) + datapoints: Optional[List[Datapoint]] = Field( + None, description="Raw datapoints to post for this metric." + ) + + +def map_metric_data_details( + mdd: oci.monitoring.models.MetricDataDetails, +) -> MetricDataDetails: + """ + Convert an oci.monitoring.models.MetricDataDetails to + oracle.oci_monitoring_mcp_server.models.MetricDataDetails. + """ + return MetricDataDetails( + namespace=getattr(mdd, "namespace", None), + resource_group=getattr(mdd, "resource_group", None), + compartment_id=getattr(mdd, "compartment_id", None), + name=getattr(mdd, "name", None), + dimensions=getattr(mdd, "dimensions", None), + metadata=getattr(mdd, "metadata", None), + datapoints=map_datapoints(getattr(mdd, "datapoints", None)), + ) + + +# region List Metrics + + +class ListMetricsDetails(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.ListMetricsDetails. + Used to filter and group results when listing available metrics. + """ + + namespace: Optional[str] = Field( + None, description="The source service or application emitting the metric." + ) + resource_group: Optional[str] = Field( + None, description="Resource group specified for the metric." + ) + name: Optional[str] = Field( + None, description="Optional metric name to filter by (e.g., CpuUtilization)." + ) + dimension_filters: Optional[Dict[str, str]] = Field( + None, + description="Filter to only include metrics that match these dimension key/value pairs.", + ) + group_by: Optional[List[str]] = Field( + None, + description="Optional list of fields to group by in the response (e.g., ['namespace', 'name']).", + ) + + +def map_list_metrics_details( + lmd: oci.monitoring.models.ListMetricsDetails, +) -> ListMetricsDetails | None: + """ + Convert an oci.monitoring.models.ListMetricsDetails to + oracle.oci_monitoring_mcp_server.models.ListMetricsDetails. + """ + if not lmd: + return None + return ListMetricsDetails( + namespace=getattr(lmd, "namespace", None), + resource_group=getattr(lmd, "resource_group", None), + name=getattr(lmd, "name", None), + # OCI SDK may expose snake_case or camelCase depending on version + dimension_filters=getattr(lmd, "dimension_filters", None) + or getattr(lmd, "dimensionFilters", None), + group_by=getattr(lmd, "group_by", None) or getattr(lmd, "groupBy", None), + ) + + +class SummarizeMetricsDataDetails(BaseModel): + """ + Pydantic model mirroring (a subset of) oci.monitoring.models.SummarizeMetricsDataDetails. + Used to request aggregated time series from the Monitoring service. + """ + + namespace: Optional[str] = Field( + None, description="The source service or application emitting the metric." + ) + resource_group: Optional[str] = Field( + None, description="Resource group specified for the metric." + ) + query: Optional[str] = Field( + None, + description=( + "The Monitoring Query Language (MQL) expression, e.g. " + "'CpuUtilization[1m]{resourceId=\"ocid1.instance...\"}.mean()'" + ), + ) + start_time: Optional[datetime] = Field( + None, description="The beginning of the time window for the metrics (RFC3339)." + ) + end_time: Optional[datetime] = Field( + None, description="The end of the time window for the metrics (RFC3339)." + ) + resolution: Optional[str] = Field( + None, + description="The time window used to aggregate metrics, e.g., '1m', '5m', '1h'.", + ) + + +def map_summarize_metrics_data_details( + smd: oci.monitoring.models.SummarizeMetricsDataDetails, +) -> SummarizeMetricsDataDetails | None: + """ + Convert an oci.monitoring.models.SummarizeMetricsDataDetails to + oracle.oci_monitoring_mcp_server.models.SummarizeMetricsDataDetails. + """ + if not smd: + return None + return SummarizeMetricsDataDetails( + namespace=getattr(smd, "namespace", None), + resource_group=getattr(smd, "resource_group", None), + query=getattr(smd, "query", None), + start_time=getattr(smd, "start_time", None), + end_time=getattr(smd, "end_time", None), + resolution=getattr(smd, "resolution", None), + ) diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/metrics/tools.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/metrics/tools.py new file mode 100644 index 0000000..1dce3f3 --- /dev/null +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/metrics/tools.py @@ -0,0 +1,337 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +import logging +import os +from datetime import datetime, timezone +from typing import Annotated, List, Tuple + +import oci +from fastmcp import Context +from oci import Response +from oci.monitoring.models import ( + ListMetricsDetails, + SummarizeMetricsDataDetails, +) +from oracle.oci_monitoring_mcp_server import __project__, __version__ +from oracle.oci_monitoring_mcp_server.metrics.models import ( + CompartmentField, + CompartmentIdInSubtreeField, + Metric, + MetricData, + NamespaceField, + StatisticType, + map_metric, + map_metric_data, +) +from pydantic import Field + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class MonitoringMetricsTools: + def __init__(self): + logger.info("Loaded metric initialization") + + def get_monitoring_client(self): + logger.info("entering get_monitoring_client") + config = oci.config.from_file( + profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) + ) + user_agent_name = __project__.split("oracle.", 1)[1].split("-server", 1)[0] + config["additional_user_agent"] = f"{user_agent_name}/{__version__}" + + private_key = oci.signer.load_private_key_from_file(config["key_file"]) + token_file = config["security_token_file"] + token = None + with open(token_file, "r") as f: + token = f.read() + signer = oci.auth.signers.SecurityTokenSigner(token, private_key) + return oci.monitoring.MonitoringClient(config, signer=signer) + + def register(self, mcp): + """Register all Metrics tools with the MCP server.""" + # Register get_compute_metrics tool + # @TODO Do we even need this tool anymore + # mcp.tool(name="get_compute_metrics")(self.get_compute_metrics) + mcp.tool( + name="get_metrics_data", + description="This tool retrieves aggregated metric data points in the OCI monitoring service." + "Use the query and optional properties to filter the returned results. " + "If there are no aggregated data points returned, " + "suggest using another query or expanding the time range.", + )(self.get_metrics_data) + + mcp.tool( + name="get_available_metrics", + description="This tool returns the available metric definitions " + "that match the criteria specified in the request.", + )(self.get_available_metrics) + + def _prepare_time_parameters( + self, start_time, end_time + ) -> Tuple[datetime, datetime]: + """Process time parameters and calculate the period.""" + # Convert string times to datetime objects + if isinstance(start_time, str): + start_time = datetime.fromisoformat(start_time.replace("Z", "+00:00")) + + if end_time is None: + end_time = datetime.now(timezone.utc) + elif isinstance(end_time, str): + end_time = datetime.fromisoformat(end_time.replace("Z", "+00:00")) + + return start_time, end_time + + def _build_metric_query( + self, metric: str, resolution: str, resource_id: str, statistic: str + ) -> str: + """Build the query string for the metric query.""" + # @TODO -> Convert user question into defined namespace or custom + # @TODO -> Convert metric, statistic, and interval into query for API call + # Need to group by + # custom dimensions? + + filter_clause = f'{{resourceId = "{resource_id}"}}' if resource_id else "" + query = f"{metric}[{resolution}]{filter_clause}.{statistic}()" + + logger.info(f"The metric query: {query}") + return query + + async def get_metrics_data( + self, + context: Context, + compartment_id: str = CompartmentField, + resource_id: str = Field( + ..., + description="The OCID of the resource", + examples=[ + "ocid1.instance.oc1.phx.anyhqljt6vnpo4icbvfcrphq2vkdmivmlbhupdw46nth7scky6rbigwjc7ea" + ], + ), + metric: str = Field( + "InstanceMetadataRequests", + description="The metric that the user wants to fetch.", + ), + start_time: str | None = Field( + "2025-11-04T18:17:00.000Z", + description="The beginning of the time range to use when searching for metric data points. " + "Format is defined by RFC3339. " + "The response is inclusive of metric data points for the startTime. " + "If no value is provided, this value will be the timestamp 3 hours before the call was sent.", + examples=["2023-02-01T01:02:29.600Z", "2023-03-10T22:44:26.789Z"], + ), + end_time: str | None = Field( + None, + description="The end of the time range to use when searching for metric data points. " + "Format is defined by RFC3339. " + "The response is exclusive metric data points for the endTime. " + "If no value is provided, this value will be the timestamp representing when the call was sent.", + examples=["2023-02-01T01:02:29.600Z", "2023-03-10T22:44:26.789Z"], + ), + namespace: str = NamespaceField, + statistic: StatisticType = Field( + "mean", description="The statistic to use for the metric" + ), + resolution: str = Field( + "1m", + description="The time between calculated aggregation windows. " + "Use with the query interval to vary the frequency for returning aggregated data points. " + "For example, use a query interval of 5 minutes with a resolution of " + "1 minute to retrieve five-minute aggregations at a one-minute frequency. " + "The resolution must be equal or less than the interval in the query. " + "The default resolution is 1m (one minute).", + examples=["1m", "5m", "1h", "1d"], + ), + resource_group: str | None = Field( + None, + description="Resource group that you want to match. " + "A null value returns only metric data that has no resource groups. " + "The specified resource group must exist in the definition of the posted metric. " + "Only one resource group can be applied per metric. " + "A valid resourceGroup value starts with an alphabetical character " + "and includes only alphanumeric characters," + " periods (.), underscores (_), hyphens (-), and dollar signs ($).", + examples=["frontend-fleet"], + ), + compartment_id_in_subtree: bool = CompartmentIdInSubtreeField, + ) -> List[Metric] | str: + try: + # Process time parameters and calculate period + start_time_obj, end_time_obj = self._prepare_time_parameters( + start_time, end_time + ) + start_time = start_time_obj.isoformat().replace("+00:00", "Z") + end_time = end_time_obj.isoformat().replace("+00:00", "Z") + + # Preprocess the query from the given params + query = self._build_metric_query(metric, resolution, resource_id, statistic) + + logger.info( + f"Calling get metrics data with these parameters: {query} namespace {namespace}" + ) + + # Create client + monitoring_client = self.get_monitoring_client() + + # Call Summarize metrics data api and process the results + summarize_metrics_data_details = SummarizeMetricsDataDetails( + namespace=namespace, + query=query, + start_time=start_time, + end_time=end_time, + ) + response: Response | None = monitoring_client.summarize_metrics_data( + compartment_id, + summarize_metrics_data_details=summarize_metrics_data_details, + compartment_id_in_subtree=compartment_id_in_subtree, + ) + + if response is None: + logger.error("Received None response from summarize_metrics_data") + await context.error( + "Received None response from summarize_metrics_data" + ) + return "There was no response returned from the Monitoring API" + + data: List[oci.monitoring.models.Metric] = response.data + return [map_metric(metric) for metric in data] + except Exception as e: + logger.error(f"Error in get_metric_data: {str(e)}") + await context.error(f"Error getting metric data: {str(e)}") + raise + + async def get_available_metrics( + self, + context: Context, + compartment_id: str = CompartmentField, + group_by: List[str] | None = Field( + None, + description="Group metrics by these fields in the response. " + "For example, to list all metric namespaces available in a compartment, " + 'groupBy the "namespace" field. ' + "Supported fields: namespace, name, resourceGroup. " + "If groupBy is used, then dimensionFilters is ignored.", + examples=[["namespace"]], + ), + metric_name: str | None = Field( + None, + description="The metric name to use when searching for metric definitions.", + ), + namespace: str | None = Field( + None, + description="The source service or application to use when searching for metric definitions.", + examples=["oci_computeagent"], + ), + resource_group: str | None = Field( + None, + description="Resource group that you want to match. " + "A null value returns only metric data that has no resource groups. " + "The specified resource group must exist in the definition of the posted metric. " + "Only one resource group can be applied per metric. " + "A valid resourceGroup value starts with an alphabetical character " + "and includes only alphanumeric characters," + " periods (.), underscores (_), hyphens (-), and dollar signs ($).", + examples=["frontend-fleet"], + ), + compartment_id_in_subtree: bool = CompartmentIdInSubtreeField, + ) -> List[MetricData] | str: + try: + # Create client + monitoring_client = self.get_monitoring_client() + + list_metrics_details = ListMetricsDetails( + name=metric_name, + namespace=namespace, + resource_group=resource_group, + group_by=None, # Add this line to avoid the error + ) + response: Response | None = monitoring_client.list_metrics( + compartment_id, + list_metrics_details=list_metrics_details, + compartment_id_in_subtree=compartment_id_in_subtree, + ) + + if response is None: + logger.error("Received None response from list_metrics") + await context.error("Received None response from list_metrics") + return "There was no response returned from the Monitoring API" + + data: List[oci.monitoring.models.MetricData] = response.data + return [map_metric_data(metric) for metric in data] + except Exception as e: + logger.error(f"Error in get_available_metrics: {str(e)}") + await context.error(f"Error getting metric data: {str(e)}") + raise + + def get_compute_metrics( + self, + compartment_id: str, + start_time: str, + end_time: str, + metric_name: Annotated[ + str, + "The metric that the user wants to fetch. Currently we only support:" + "CpuUtilization, MemoryUtilization, DiskIopsRead, DiskIopsWritten," + "DiskBytesRead, DiskBytesWritten, NetworksBytesIn," + "NetworksBytesOut, LoadAverage, MemoryAllocationStalls", + ], + resolution: Annotated[ + str, + "The granularity of the metric. Currently we only support: 1m, 5m, 1h, 1d. Default: 1m.", + ] = "1m", + aggregation: Annotated[ + str, + "The aggregation for the metric. Currently we only support: " + "mean, sum, max, min, count. Default: mean", + ] = "mean", + instance_id: Annotated[ + str | None, + "Optional compute instance OCID to filter by " + "(maps to resourceId dimension)", + ] = None, + compartment_id_in_subtree: Annotated[ + bool, + "Whether to include metrics from all subcompartments of the specified compartment", + ] = False, + ) -> list[dict]: + # @TODO Remove this - its too specific to a use case + monitoring_client = self.get_monitoring_client() + namespace = "oci_computeagent" + filter_clause = f'{{resourceId="{instance_id}"}}' if instance_id else "" + query = f"{metric_name}[{resolution}]{filter_clause}.{aggregation}()" + + series_list = monitoring_client.summarize_metrics_data( + compartment_id=compartment_id, + summarize_metrics_data_details=SummarizeMetricsDataDetails( + namespace=namespace, + query=query, + start_time=start_time, + end_time=end_time, + resolution=resolution, + ), + compartment_id_in_subtree=compartment_id_in_subtree, + ).data + + result: list[dict] = [] + for series in series_list: + dims = getattr(series, "dimensions", None) + points = [] + for p in getattr(series, "aggregated_datapoints", []): + points.append( + { + "timestamp": getattr(p, "timestamp", None), + "value": getattr(p, "value", None), + } + ) + result.append( + { + "dimensions": dims, + "datapoints": points, + } + ) + return result diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/server.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/server.py index cf97423..0c8cfef 100644 --- a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/server.py +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/server.py @@ -4,129 +4,32 @@ https://oss.oracle.com/licenses/upl. """ -import os from logging import Logger -from typing import Annotated -import oci from fastmcp import FastMCP -from oci.monitoring.models import SummarizeMetricsDataDetails +from oracle.oci_monitoring_mcp_server.alarms.tools import MonitoringAlarmTools +from oracle.oci_monitoring_mcp_server.metrics.tools import MonitoringMetricsTools -from . import __project__, __version__ +from . import __project__ logger = Logger(__name__, level="INFO") -mcp = FastMCP(name=__project__) - - -def get_monitoring_client(): - logger.info("entering get_monitoring_client") - config = oci.config.from_file( - profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) - ) - user_agent_name = __project__.split("oracle.", 1)[1].split("-server", 1)[0] - config["additional_user_agent"] = f"{user_agent_name}/{__version__}" - - private_key = oci.signer.load_private_key_from_file(config["key_file"]) - token_file = config["security_token_file"] - token = None - with open(token_file, "r") as f: - token = f.read() - signer = oci.auth.signers.SecurityTokenSigner(token, private_key) - return oci.monitoring.MonitoringClient(config, signer=signer) - - -@mcp.tool -def get_compute_metrics( - compartment_id: str, - start_time: str, - end_time: str, - metricName: Annotated[ - str, - "The metric that the user wants to fetch. Currently we only support:" - "CpuUtilization, MemoryUtilization, DiskIopsRead, DiskIopsWritten," - "DiskBytesRead, DiskBytesWritten, NetworksBytesIn," - "NetworksBytesOut, LoadAverage, MemoryAllocationStalls", - ], - resolution: Annotated[ - str, - "The granularity of the metric. Currently we only support: 1m, 5m, 1h, 1d. Default: 1m.", - ] = "1m", - aggregation: Annotated[ - str, - "The aggregation for the metric. Currently we only support: " - "mean, sum, max, min, count. Default: mean", - ] = "mean", - instance_id: Annotated[ - str, - "Optional compute instance OCID to filter by " "(maps to resourceId dimension)", - ] = None, - compartment_id_in_subtree: Annotated[ - bool, - "Whether to include metrics from all subcompartments of the specified compartment", - ] = False, -) -> list[dict]: - monitoring_client = get_monitoring_client() - namespace = "oci_computeagent" - filter_clause = f'{{resourceId="{instance_id}"}}' if instance_id else "" - query = f"{metricName}[{resolution}]{filter_clause}.{aggregation}()" - - series_list = monitoring_client.summarize_metrics_data( - compartment_id=compartment_id, - summarize_metrics_data_details=SummarizeMetricsDataDetails( - namespace=namespace, - query=query, - start_time=start_time, - end_time=end_time, - resolution=resolution, - ), - compartment_id_in_subtree=compartment_id_in_subtree, - ).data - - result: list[dict] = [] - for series in series_list: - dims = getattr(series, "dimensions", None) - points = [] - for p in getattr(series, "aggregated_datapoints", []): - points.append( - { - "timestamp": getattr(p, "timestamp", None), - "value": getattr(p, "value", None), - } - ) - result.append( - { - "dimensions": dims, - "datapoints": points, - } - ) - return result - - -@mcp.tool -def list_alarms( - compartment_id: Annotated[ - str, - "The ID of the compartment containing the resources" - "monitored by the metric that you are searching for.", - ], -) -> list[dict]: - monitoring_client = get_monitoring_client() - response = monitoring_client.list_alarms(compartment_id=compartment_id) - alarms = response.data - result = [] - for alarm in alarms: - result.append( - { - "id": alarm.id, - "display_name": alarm.display_name, - "severity": alarm.severity, - "lifecycle_state": alarm.lifecycle_state, - "namespace": alarm.namespace, - "query": alarm.query, - } - ) - return result +mcp = FastMCP( + name=__project__, + instructions="Use this MCP server to run read-only commands and analyze " + "Monitoring Logs, Metrics, and Alarms.", +) + +try: + monitoring_metrics_tools = MonitoringMetricsTools() + monitoring_metrics_tools.register(mcp) + logger.info("Monitoring metrics tools registered successfully") + monitoring_alarms_tools = MonitoringAlarmTools() + monitoring_alarms_tools.register(mcp) + logger.info("Monitoring alarms tools registered successfully") +except Exception as e: + logger.error(f"Error initializing OCI Monitoring tools: {str(e)}") + raise def main(): diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_alarm_tools.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_alarm_tools.py new file mode 100644 index 0000000..0c4e5be --- /dev/null +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_alarm_tools.py @@ -0,0 +1,58 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from unittest.mock import AsyncMock, Mock, patch + +import oci +import pytest +from oracle.oci_monitoring_mcp_server.alarms.models import AlarmSummary +from oracle.oci_monitoring_mcp_server.alarms.tools import MonitoringAlarmTools + + +@pytest.fixture +def mock_context(): + """Create mock MCP context.""" + context = Mock() + context.info = AsyncMock() + context.warning = AsyncMock() + context.error = AsyncMock() + return context + + +class TestAlarmTools: + @pytest.mark.asyncio + @patch.object(MonitoringAlarmTools, "get_monitoring_client") + async def test_list_alarms(self, mock_context): + with patch("oci.monitoring.MonitoringClient") as mock_oci_monitoring_client: + mock_alarm1 = oci.monitoring.models.Alarm( + id="alarm1", + display_name="Test Alarm 1", + severity="CRITICAL", + lifecycle_state="ACTIVE", + namespace="oci_monitoring", + query="CpuUtilization[1m].mean() > 80", + ) + mock_alarm2 = oci.monitoring.models.Alarm( + id="alarm2", + display_name="Test Alarm 2", + severity="WARNING", + lifecycle_state="ACTIVE", + namespace="oci_monitoring", + query="MemoryUtilization[1m].mean() > 90", + ) + + mock_oci_monitoring_client.return_value = Mock() + mock_list_response = Mock() + mock_list_response.data = [mock_alarm1, mock_alarm2] + mock_oci_monitoring_client.return_value.list_alarms.return_value = ( + mock_list_response + ) + + alarm_tools = MonitoringAlarmTools() + + result = alarm_tools.list_alarms(compartment_id="compartment1") + for alarm in result: + assert isinstance(alarm, AlarmSummary) diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_metric_tools.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_metric_tools.py new file mode 100644 index 0000000..66e55b5 --- /dev/null +++ b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_metric_tools.py @@ -0,0 +1,101 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import oci +import pytest +from oracle.oci_monitoring_mcp_server.metrics.models import Metric +from oracle.oci_monitoring_mcp_server.metrics.tools import MonitoringMetricsTools + + +@pytest.fixture +def mock_context(): + """Create mock MCP context.""" + context = Mock() + context.info = AsyncMock() + context.warning = AsyncMock() + context.error = AsyncMock() + return context + + +class TestMetricTools: + @pytest.mark.asyncio + async def test_get_compute_metrics(self, mock_context): + with patch.object( + MonitoringMetricsTools, "get_monitoring_client" + ) as mock_oci_monitoring_client: + metric = oci.monitoring.models.MetricData( + namespace="123", + resource_group=None, + dimensions={"resourceId": "instance1"}, + compartment_id="compartment1", + aggregated_datapoints=[ + MagicMock(timestamp="2023-01-01T00:00:00Z", value=42.0), + MagicMock(timestamp="2023-01-01T00:01:00Z", value=43.5), + ], + ) + + mock_oci_monitoring_client.return_value = Mock() + mock_list_response = Mock() + mock_list_response.data = [metric] + mock_oci_monitoring_client.return_value.summarize_metrics_data.return_value = ( + mock_list_response + ) + + metrics_tools = MonitoringMetricsTools() + + result = metrics_tools.get_compute_metrics( + compartment_id="compartment1", + start_time="2023-01-01T00:00:00Z", + end_time="2023-01-01T00:00:00Z", + metric_name="MemoryUtilization", + ) + + assert result is not None + assert isinstance(result, list) + assert len(result) == 1 + assert result[0]["dimensions"] == {"resourceId": "instance1"} + assert "datapoints" in result[0] + assert len(result[0]["datapoints"]) == 2 + assert result[0]["datapoints"][0]["timestamp"] == "2023-01-01T00:00:00Z" + assert result[0]["datapoints"][0]["value"] == pytest.approx(42.0) + + @pytest.mark.asyncio + async def test_get_metrics_data(self, mock_context): + with patch.object( + MonitoringMetricsTools, "get_monitoring_client" + ) as mock_oci_monitoring_client: + metric = oci.monitoring.models.MetricData( + namespace="123", + resource_group=None, + dimensions={"resourceId": "instance1"}, + compartment_id="compartment1", + aggregated_datapoints=[ + MagicMock(timestamp="2023-01-01T00:00:00Z", value=42.0), + MagicMock(timestamp="2023-01-01T00:01:00Z", value=43.5), + ], + ) + + mock_oci_monitoring_client.return_value = Mock() + mock_list_response = Mock() + mock_list_response.data = [metric] + mock_oci_monitoring_client.return_value.summarize_metrics_data.return_value = ( + mock_list_response + ) + + metrics_tools = MonitoringMetricsTools() + + result = await metrics_tools.get_metrics_data( + mock_context, + compartment_id="compartment1", + start_time="2023-01-01T00:00:00Z", + end_time="2023-01-01T00:00:00Z", + ) + + assert result is not None + for metric in result: + assert isinstance(metric, Metric) diff --git a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_monitoring_tools.py b/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_monitoring_tools.py deleted file mode 100644 index b517e6c..0000000 --- a/src/oci-monitoring-mcp-server/oracle/oci_monitoring_mcp_server/tests/test_monitoring_tools.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -Copyright (c) 2025, Oracle and/or its affiliates. -Licensed under the Universal Permissive License v1.0 as shown at -https://oss.oracle.com/licenses/upl. -""" - -from unittest.mock import MagicMock, create_autospec, patch - -import oci -import pytest -from fastmcp import Client -from oracle.oci_monitoring_mcp_server.server import mcp - - -class TestMonitoringTools: - @pytest.mark.asyncio - @patch("oracle.oci_monitoring_mcp_server.server.get_monitoring_client") - async def test_get_compute_metrics(self, mock_get_client): - mock_client = MagicMock() - mock_get_client.return_value = mock_client - - # Mock OCI summarize_metrics_data response with one series containing two points - mock_summarize_response = create_autospec(oci.response.Response) - series = MagicMock() - series.dimensions = {"resourceId": "instance1"} - series.aggregated_datapoints = [ - MagicMock(timestamp="2023-01-01T00:00:00Z", value=42.0), - MagicMock(timestamp="2023-01-01T00:01:00Z", value=43.5), - ] - mock_summarize_response.data = [series] - mock_client.summarize_metrics_data.return_value = mock_summarize_response - - # Call the MCP tool - async with Client(mcp) as client: - result = ( - await client.call_tool( - "get_compute_metrics", - { - "compartment_id": "compartment1", - "start_time": "2023-01-01T00:00:00Z", - "end_time": "2023-01-01T01:00:00Z", - "metricName": "CpuUtilization", - "resolution": "1m", - "aggregation": "mean", - "instance_id": "instance1", - "compartment_id_in_subtree": False, - }, - ) - ).structured_content["result"] - - # Validate result structure and values - assert isinstance(result, list) - assert len(result) == 1 - assert result[0]["dimensions"] == {"resourceId": "instance1"} - assert "datapoints" in result[0] - assert len(result[0]["datapoints"]) == 2 - assert result[0]["datapoints"][0]["timestamp"] == "2023-01-01T00:00:00Z" - assert result[0]["datapoints"][0]["value"] == pytest.approx(42.0) - - @pytest.mark.asyncio - @patch("oracle.oci_monitoring_mcp_server.server.get_monitoring_client") - async def test_list_alarms(self, mock_get_client): - mock_client = MagicMock() - mock_get_client.return_value = mock_client - - mock_alarm1 = oci.monitoring.models.Alarm( - id="alarm1", - display_name="Test Alarm 1", - severity="CRITICAL", - lifecycle_state="ACTIVE", - namespace="oci_monitoring", - query="CpuUtilization[1m].mean() > 80", - ) - mock_alarm2 = oci.monitoring.models.Alarm( - id="alarm2", - display_name="Test Alarm 2", - severity="WARNING", - lifecycle_state="ACTIVE", - namespace="oci_monitoring", - query="MemoryUtilization[1m].mean() > 90", - ) - - mock_list_response = create_autospec(oci.response.Response) - mock_list_response.data = [mock_alarm1, mock_alarm2] - mock_client.list_alarms.return_value = mock_list_response - - async with Client(mcp) as client: - result = ( - await client.call_tool( - "list_alarms", - { - "compartment_id": "compartment1", - }, - ) - ).structured_content["result"] - - assert len(result) == 2 - assert result[0]["id"] == "alarm1" - assert result[0]["display_name"] == "Test Alarm 1" - assert result[1]["id"] == "alarm2" - assert result[1]["display_name"] == "Test Alarm 2"