Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ core,github.com/google/btree,Apache-2.0
core,github.com/google/cel-go,Apache-2.0
core,github.com/google/gnostic-models,Apache-2.0
core,github.com/google/go-cmp/cmp,BSD-3-Clause
core,github.com/google/jsonschema-go/jsonschema,MIT
core,github.com/google/pprof/profile,Apache-2.0
core,github.com/google/shlex,Apache-2.0
core,github.com/google/uuid,BSD-3-Clause
Expand Down Expand Up @@ -145,6 +146,7 @@ core,github.com/mitchellh/mapstructure,MIT
core,github.com/mitchellh/reflectwalk,MIT
core,github.com/moby/spdystream,Apache-2.0
core,github.com/moby/term,Apache-2.0
core,github.com/modelcontextprotocol/go-sdk,MIT
core,github.com/modern-go/concurrent,Apache-2.0
core,github.com/modern-go/reflect2,Apache-2.0
core,github.com/mohae/deepcopy,MIT
Expand Down Expand Up @@ -200,6 +202,7 @@ core,github.com/ulikunitz/xz,BSD-3-Clause
core,github.com/x448/float16,MIT
core,github.com/xi2/xz,Unknown
core,github.com/xlab/treeprint,MIT
core,github.com/yosida95/uritemplate/v3,BSD-3-Clause
core,go.etcd.io/bbolt,MIT
core,go.opentelemetry.io/auto/sdk,Apache-2.0
core,go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp,Apache-2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// IsAwsAuthConfigMapPresent checks if the aws-auth ConfigMap exists in the kube-system namespace
func IsAwsAuthConfigMapPresent(ctx context.Context, clientset *kubernetes.Clientset) (bool, error) {
func IsAwsAuthConfigMapPresent(ctx context.Context, clientset kubernetes.Interface) (bool, error) {
if _, err := clientset.CoreV1().ConfigMaps("kube-system").Get(ctx, "aws-auth", metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const nodeListChunkSize = 100
// Format: aws:///ZONE/INSTANCE_ID (e.g., aws:///us-east-1a/i-0abc123def456789)
var awsProviderIDRegexp = regexp.MustCompile(`^aws:///[^/]+/(i-[0-9a-f]+)$`)

func GetNodesProperties(ctx context.Context, clientset *kubernetes.Clientset, ec2Client *ec2.Client) (*NodePoolsSet, error) {
func GetNodesProperties(ctx context.Context, clientset kubernetes.Interface, ec2Client *ec2.Client) (*NodePoolsSet, error) {
nps := NewNodePoolsSet()

var cont string
Expand Down
4 changes: 2 additions & 2 deletions cmd/kubectl-datadog/autoscaling/cluster/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ type clients struct {
sts *sts.Client

// Kubernetes clients
k8sClient client.Client // controller-runtime client
k8sClientset *kubernetes.Clientset // typed Kubernetes client
k8sClient client.Client // controller-runtime client
k8sClientset kubernetes.Interface // typed Kubernetes client
}

func (o *options) getClusterNameFromKubeconfig(ctx context.Context) (string, error) {
Expand Down
28 changes: 23 additions & 5 deletions cmd/kubectl-datadog/clusteragent/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,22 @@ func (o *options) run(cmd *cobra.Command) error {
}

func (o *options) getLeaderFromLease(objKey client.ObjectKey) (string, error) {
return GetLeaderFromLease(o.Client, objKey)
}

func (o *options) getLeaderFromConfigMap(objKey client.ObjectKey) (string, error) {
return GetLeaderFromConfigMap(o.Client, objKey)
}

// GetLeaderFromLease retrieves the leader pod name from a Lease resource
// This is a public function that can be reused by other packages
func GetLeaderFromLease(c client.Client, objKey client.ObjectKey) (string, error) {
lease := &coordv1.Lease{}
err := o.Client.Get(context.TODO(), objKey, lease)
err := c.Get(context.TODO(), objKey, lease)
if err != nil && apierrors.IsNotFound(err) {
return "", fmt.Errorf("lease %s/%s not found", objKey.Namespace, objKey.Name)
} else if err != nil {
return "", fmt.Errorf("unable to get leader election config map: %w", err)
return "", fmt.Errorf("unable to get leader election lease: %w", err)
}

// get the info from the lease
Expand All @@ -147,10 +157,12 @@ func (o *options) getLeaderFromLease(objKey client.ObjectKey) (string, error) {
return *lease.Spec.HolderIdentity, nil
}

func (o *options) getLeaderFromConfigMap(objKey client.ObjectKey) (string, error) {
// GetLeaderFromConfigMap retrieves the leader pod name from a ConfigMap annotation
// This is a public function that can be reused by other packages
func GetLeaderFromConfigMap(c client.Client, objKey client.ObjectKey) (string, error) {
// Get the config map holding the leader identity.
cm := &corev1.ConfigMap{}
err := o.Client.Get(context.TODO(), objKey, cm)
err := c.Get(context.TODO(), objKey, cm)
if err != nil && apierrors.IsNotFound(err) {
return "", fmt.Errorf("config map %s/%s not found", objKey.Namespace, objKey.Name)
} else if err != nil {
Expand All @@ -172,7 +184,13 @@ func (o *options) getLeaderFromConfigMap(objKey client.ObjectKey) (string, error
}

func isLeaseSupported(client discovery.DiscoveryInterface) (bool, error) {
apiGroupList, err := client.ServerGroups()
return IsLeaseSupported(client)
}

// IsLeaseSupported checks if the Kubernetes cluster supports Lease resources
// This is a public function that can be reused by other packages
func IsLeaseSupported(discoveryClient discovery.DiscoveryInterface) (bool, error) {
apiGroupList, err := discoveryClient.ServerGroups()
if err != nil {
return false, fmt.Errorf("unable to discover APIGroups, err:%w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/kubectl-datadog/datadog/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/flare"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/get"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/helm2dda"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/mcp"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/metrics"
"github.com/DataDog/datadog-operator/cmd/kubectl-datadog/validate/validate"
)
Expand Down Expand Up @@ -43,6 +44,7 @@ func NewCmd(streams genericclioptions.IOStreams) *cobra.Command {
cmd.AddCommand(get.New(streams))
cmd.AddCommand(flare.New(streams))
cmd.AddCommand(validate.New(streams))
cmd.AddCommand(mcp.New(streams))

// Agent commands
cmd.AddCommand(agent.New(streams))
Expand Down
179 changes: 179 additions & 0 deletions cmd/kubectl-datadog/mcp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package mcp

import (
"context"
"fmt"
"net/http"
"time"

"github.com/modelcontextprotocol/go-sdk/mcp"
)

// ClusterAgentMCPClient communicates with the cluster-agent MCP server via HTTP
type ClusterAgentMCPClient struct {
baseURL string
endpoint string
httpClient *http.Client
mcpClient *mcp.Client
session *mcp.ClientSession
}

// ClusterAgentMCPClientConfig contains configuration for creating a ClusterAgentMCPClient
type ClusterAgentMCPClientConfig struct {
BaseURL string // e.g., "http://localhost:12345"
Endpoint string // e.g., "/mcp"
Timeout time.Duration // HTTP timeout for tool calls
MaxRetries int // Maximum reconnection retries
}

// NewClusterAgentMCPClient creates a new MCP HTTP client for cluster-agent communication
func NewClusterAgentMCPClient(config ClusterAgentMCPClientConfig) (*ClusterAgentMCPClient, error) {
if config.BaseURL == "" {
return nil, fmt.Errorf("baseURL is required")
}

if config.Endpoint == "" {
config.Endpoint = "/mcp"
}

if config.Timeout == 0 {
config.Timeout = 120 * time.Second
}

if config.MaxRetries == 0 {
config.MaxRetries = 3
}

// Create HTTP client with timeout
httpClient := &http.Client{
Timeout: config.Timeout,
}

// Create MCP client
mcpClient := mcp.NewClient(&mcp.Implementation{
Name: "kubectl-datadog-proxy",
Version: "1.0.0",
}, nil)

return &ClusterAgentMCPClient{
baseURL: config.BaseURL,
endpoint: config.Endpoint,
httpClient: httpClient,
mcpClient: mcpClient,
}, nil
}

// Connect establishes a connection to the cluster-agent MCP server
func (c *ClusterAgentMCPClient) Connect(ctx context.Context) error {
// Build full endpoint URL
fullURL := c.baseURL + c.endpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: If baseURL and endpoint are only ever used together it makes sense to have one config value instead of two (this url value could also be parsed into parts if needed).


// Create streamable HTTP transport
transport := &mcp.StreamableClientTransport{
Endpoint: fullURL,
HTTPClient: c.httpClient,
MaxRetries: 3,
}

// Connect to the cluster-agent MCP server
session, err := c.mcpClient.Connect(ctx, transport, nil)
if err != nil {
return fmt.Errorf("failed to connect to cluster-agent MCP server: %w", err)
}

c.session = session
return nil
}

// ListTools fetches the list of available tools from the cluster-agent
func (c *ClusterAgentMCPClient) ListTools(ctx context.Context) ([]*mcp.Tool, error) {
if c.session == nil {
return nil, fmt.Errorf("client not connected, call Connect() first")
}

// List tools from the cluster-agent
result, err := c.session.ListTools(ctx, &mcp.ListToolsParams{})
if err != nil {
return nil, fmt.Errorf("failed to list tools: %w", err)
}

return result.Tools, nil
}

// CallTool executes a tool on the cluster-agent
func (c *ClusterAgentMCPClient) CallTool(ctx context.Context, name string, arguments map[string]interface{}) (*mcp.CallToolResult, error) {
if c.session == nil {
return nil, fmt.Errorf("client not connected, call Connect() first")
}

// Call the tool on the cluster-agent
result, err := c.session.CallTool(ctx, &mcp.CallToolParams{
Name: name,
Arguments: arguments,
})
if err != nil {
return nil, fmt.Errorf("failed to call tool %s: %w", name, err)
}

return result, nil
}

// Close closes the connection to the cluster-agent MCP server
func (c *ClusterAgentMCPClient) Close() error {
if c.session != nil {
return c.session.Close()
}
return nil
}

// IsConnected returns true if the client is connected to the cluster-agent
func (c *ClusterAgentMCPClient) IsConnected() bool {
return c.session != nil
}

// GetServerCapabilities returns capabilities of the connected MCP server
func (c *ClusterAgentMCPClient) GetServerCapabilities() (*mcp.ServerCapabilities, error) {
if c.session == nil {
return nil, fmt.Errorf("client not connected")
}

// Server capabilities are available in the InitializeResult
initResult := c.session.InitializeResult()
if initResult == nil {
return nil, fmt.Errorf("server not initialized")
}

return initResult.Capabilities, nil
}

// Ping sends a ping request to verify the connection is alive
func (c *ClusterAgentMCPClient) Ping(ctx context.Context) error {
if c.session == nil {
return fmt.Errorf("client not connected")
}

// Use a simple operation like listing tools to verify connectivity
_, err := c.session.ListTools(ctx, &mcp.ListToolsParams{})
if err != nil {
return fmt.Errorf("ping failed: %w", err)
}

return nil
}

// Reconnect attempts to reconnect to the cluster-agent MCP server
func (c *ClusterAgentMCPClient) Reconnect(ctx context.Context) error {
// Close existing connection if any
if c.session != nil {
_ = c.session.Close()
c.session = nil
}

// Attempt to reconnect
return c.Connect(ctx)
}
Loading
Loading