From 6297c1618e7fa6bdaef5d289b984c0789c4939f1 Mon Sep 17 00:00:00 2001 From: Nitesh Vijay Date: Thu, 19 Mar 2026 15:29:59 +0530 Subject: [PATCH] Add Cosmos DB container copy job MCP tools Add 7 new MCP tools for managing Cosmos DB container copy jobs via the ARM copyJobs REST API (api-version 2025-05-01-preview): - create_copy_job: Create a copy job (NoSQL, Cassandra, Mongo, Blob) - get_copy_job: Get job status and details - list_copy_jobs: List all copy jobs for the account - cancel_copy_job: Cancel a running job - pause_copy_job: Pause a running job - resume_copy_job: Resume a paused job - complete_copy_job: Complete an Online copy job New files: - Services/CopyJobService.cs: ARM REST service using DefaultAzureCredential - Tests/CopyJobServiceTests.cs: Unit tests for parameter validation Modified files: - MCPProtocolController.cs: Tool registration and dispatch - Program.cs: DI registration for CopyJobService + IHttpClientFactory Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Controllers/MCPProtocolController.cs | 127 ++++++- src/AzureCosmosDB.MCP.Toolkit/Program.cs | 2 + .../Services/CopyJobService.cs | 311 ++++++++++++++++++ .../CopyJobServiceTests.cs | 130 ++++++++ 4 files changed, 569 insertions(+), 1 deletion(-) create mode 100644 src/AzureCosmosDB.MCP.Toolkit/Services/CopyJobService.cs create mode 100644 tests/AzureCosmosDB.MCP.Toolkit.Tests/CopyJobServiceTests.cs diff --git a/src/AzureCosmosDB.MCP.Toolkit/Controllers/MCPProtocolController.cs b/src/AzureCosmosDB.MCP.Toolkit/Controllers/MCPProtocolController.cs index 48cd087..247e6a7 100644 --- a/src/AzureCosmosDB.MCP.Toolkit/Controllers/MCPProtocolController.cs +++ b/src/AzureCosmosDB.MCP.Toolkit/Controllers/MCPProtocolController.cs @@ -11,15 +11,18 @@ namespace AzureCosmosDB.MCP.Toolkit.Controllers; public class MCPProtocolController : ControllerBase { private readonly CosmosDbToolsService _cosmosDbTools; + private readonly CopyJobService _copyJobService; private readonly AuthenticationService _authService; private readonly ILogger _logger; public MCPProtocolController( - CosmosDbToolsService cosmosDbTools, + CosmosDbToolsService cosmosDbTools, + CopyJobService copyJobService, AuthenticationService authService, ILogger logger) { _cosmosDbTools = cosmosDbTools; + _copyJobService = copyJobService; _authService = authService; _logger = logger; } @@ -232,6 +235,93 @@ public async Task HandleMCPRequest([FromBody] JsonElement request }, required = new string[] { "databaseId", "containerId", "searchText", "vectorProperty", "selectProperties", "topN" } } + }, + // Copy Job tools (ARM management plane) + new { + name = "create_copy_job", + description = "Creates a Cosmos DB container copy job. Supports NoSQL, Cassandra, and Mongo copy types, as well as backup/restore to Azure Blob Storage.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" }, + jobName = new { type = "string", description = "Unique name for the copy job" }, + jobProperties = new { type = "string", description = "JSON string describing copy source/destination. Example for NoSQL: {\"jobType\":\"NoSqlRUToNoSqlRU\",\"source\":{\"databaseName\":\"srcDb\",\"containerName\":\"srcContainer\"},\"destination\":{\"databaseName\":\"destDb\",\"containerName\":\"destContainer\"}}" }, + mode = new { type = "string", description = "Copy mode: 'Offline' (default) or 'Online' (continuous)" }, + workerCount = new { type = "integer", description = "Number of worker threads (default determined by service)" } + }, + required = new string[] { "subscriptionId", "jobName", "jobProperties" } + } + }, + new { + name = "get_copy_job", + description = "Gets the status and details of a specific Cosmos DB copy job.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" }, + jobName = new { type = "string", description = "Name of the copy job to retrieve" } + }, + required = new string[] { "subscriptionId", "jobName" } + } + }, + new { + name = "list_copy_jobs", + description = "Lists all Cosmos DB copy jobs for the current account.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" } + }, + required = new string[] { "subscriptionId" } + } + }, + new { + name = "cancel_copy_job", + description = "Cancels a running Cosmos DB copy job.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" }, + jobName = new { type = "string", description = "Name of the copy job to cancel" } + }, + required = new string[] { "subscriptionId", "jobName" } + } + }, + new { + name = "pause_copy_job", + description = "Pauses a running Cosmos DB copy job.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" }, + jobName = new { type = "string", description = "Name of the copy job to pause" } + }, + required = new string[] { "subscriptionId", "jobName" } + } + }, + new { + name = "resume_copy_job", + description = "Resumes a paused Cosmos DB copy job.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" }, + jobName = new { type = "string", description = "Name of the copy job to resume" } + }, + required = new string[] { "subscriptionId", "jobName" } + } + }, + new { + name = "complete_copy_job", + description = "Completes an Online Cosmos DB copy job, flushing remaining changes from source to destination.", + inputSchema = new { + type = "object", + properties = new { + subscriptionId = new { type = "string", description = "Azure subscription ID" }, + jobName = new { type = "string", description = "Name of the Online copy job to complete" } + }, + required = new string[] { "subscriptionId", "jobName" } + } } } } @@ -395,6 +485,41 @@ private async Task ExecuteTool(string toolName, Dictionary await _copyJobService.CreateCopyJob( + GetStringArg(args, "subscriptionId"), + GetStringArg(args, "jobName"), + GetStringArg(args, "jobProperties"), + GetStringArg(args, "mode"), + args.ContainsKey("workerCount") ? GetIntArg(args, "workerCount") : null, + cancellationToken), + "get_copy_job" => await _copyJobService.GetCopyJob( + GetStringArg(args, "subscriptionId"), + GetStringArg(args, "jobName"), + cancellationToken), + "list_copy_jobs" => await _copyJobService.ListCopyJobs( + GetStringArg(args, "subscriptionId"), + cancellationToken), + "cancel_copy_job" => await _copyJobService.CopyJobAction( + GetStringArg(args, "subscriptionId"), + GetStringArg(args, "jobName"), + "cancel", + cancellationToken), + "pause_copy_job" => await _copyJobService.CopyJobAction( + GetStringArg(args, "subscriptionId"), + GetStringArg(args, "jobName"), + "pause", + cancellationToken), + "resume_copy_job" => await _copyJobService.CopyJobAction( + GetStringArg(args, "subscriptionId"), + GetStringArg(args, "jobName"), + "resume", + cancellationToken), + "complete_copy_job" => await _copyJobService.CopyJobAction( + GetStringArg(args, "subscriptionId"), + GetStringArg(args, "jobName"), + "complete", + cancellationToken), _ => throw new ArgumentException($"Unknown tool: {toolName}") }; } diff --git a/src/AzureCosmosDB.MCP.Toolkit/Program.cs b/src/AzureCosmosDB.MCP.Toolkit/Program.cs index 5aed871..3ec66e2 100644 --- a/src/AzureCosmosDB.MCP.Toolkit/Program.cs +++ b/src/AzureCosmosDB.MCP.Toolkit/Program.cs @@ -232,6 +232,8 @@ // Register services for dependency injection builder.Services.AddScoped(); builder.Services.AddScoped(); +builder.Services.AddHttpClient(); +builder.Services.AddScoped(); // Configure forwarded headers for proxy scenarios builder.Services.Configure(options => diff --git a/src/AzureCosmosDB.MCP.Toolkit/Services/CopyJobService.cs b/src/AzureCosmosDB.MCP.Toolkit/Services/CopyJobService.cs new file mode 100644 index 0000000..eef6904 --- /dev/null +++ b/src/AzureCosmosDB.MCP.Toolkit/Services/CopyJobService.cs @@ -0,0 +1,311 @@ +using System.Net.Http.Headers; +using System.Text; +using System.Text.Json; +using Azure.Identity; + +namespace AzureCosmosDB.MCP.Toolkit.Services; + +/// +/// Service for managing Cosmos DB container copy jobs via the ARM copyJobs REST API. +/// Uses DefaultAzureCredential for ARM authentication. +/// +public class CopyJobService +{ + private readonly IHttpClientFactory _httpClientFactory; + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + + private const string ApiVersion = "2025-05-01-preview"; + private const string ArmScope = "https://management.azure.com/.default"; + + public CopyJobService( + IHttpClientFactory httpClientFactory, + ILogger logger, + IConfiguration configuration) + { + _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); + } + + /// + /// Extracts the Cosmos DB account name from the COSMOS_ENDPOINT environment variable. + /// e.g., "https://myaccount.documents.azure.com:443/" → "myaccount" + /// + private string GetAccountName() + { + var endpoint = Environment.GetEnvironmentVariable("COSMOS_ENDPOINT") + ?? _configuration["Cosmos:Endpoint"] + ?? throw new InvalidOperationException("COSMOS_ENDPOINT is not configured."); + + var uri = new Uri(endpoint); + var host = uri.Host; // e.g., "myaccount.documents.azure.com" + var accountName = host.Split('.')[0]; + return accountName; + } + + /// + /// Gets an ARM access token using DefaultAzureCredential. + /// + private async Task GetArmTokenAsync(CancellationToken cancellationToken) + { + var credential = new DefaultAzureCredential(); + var tokenResult = await credential.GetTokenAsync( + new Azure.Core.TokenRequestContext([ArmScope]), + cancellationToken); + return tokenResult.Token; + } + + /// + /// Creates an authenticated HttpRequestMessage for ARM REST API calls. + /// + private async Task CreateArmRequestAsync( + HttpMethod method, string url, + HttpContent? content = null, + CancellationToken cancellationToken = default) + { + var token = await GetArmTokenAsync(cancellationToken); + var request = new HttpRequestMessage(method, url) { Content = content }; + request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); + request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); + return request; + } + + /// + /// Discovers the ARM resource ID for the Cosmos DB account by listing accounts in the subscription. + /// + private async Task GetAccountResourceIdAsync(string subscriptionId, CancellationToken cancellationToken) + { + var accountName = GetAccountName(); + var client = _httpClientFactory.CreateClient(); + var url = $"https://management.azure.com/subscriptions/{Uri.EscapeDataString(subscriptionId)}" + + $"/providers/Microsoft.DocumentDB/databaseAccounts?api-version=2024-05-15"; + + using var request = await CreateArmRequestAsync(HttpMethod.Get, url, cancellationToken: cancellationToken); + var response = await client.SendAsync(request, cancellationToken); + var body = await response.Content.ReadAsStringAsync(cancellationToken); + + if (!response.IsSuccessStatusCode) + { + throw new HttpRequestException($"Failed to list Cosmos DB accounts (HTTP {(int)response.StatusCode}): {body}"); + } + + using var doc = JsonDocument.Parse(body); + if (doc.RootElement.TryGetProperty("value", out var accounts)) + { + foreach (var account in accounts.EnumerateArray()) + { + if (account.TryGetProperty("name", out var nameProp) && + string.Equals(nameProp.GetString(), accountName, StringComparison.OrdinalIgnoreCase)) + { + return account.GetProperty("id").GetString()!; + } + } + } + + throw new InvalidOperationException( + $"Cosmos DB account '{accountName}' not found in subscription '{subscriptionId}'."); + } + + private string BuildCopyJobsUrl(string accountResourceId, string? jobName = null) + { + var url = $"https://management.azure.com{accountResourceId}/copyJobs"; + if (!string.IsNullOrEmpty(jobName)) + { + url += $"/{Uri.EscapeDataString(jobName)}"; + } + url += $"?api-version={ApiVersion}"; + return url; + } + + public async Task CreateCopyJob( + string subscriptionId, string jobName, string jobPropertiesJson, + string? mode = null, int? workerCount = null, + CancellationToken cancellationToken = default) + { + try + { + if (string.IsNullOrWhiteSpace(subscriptionId)) + throw new ArgumentException("Parameter 'subscriptionId' is required.", nameof(subscriptionId)); + if (string.IsNullOrWhiteSpace(jobName)) + throw new ArgumentException("Parameter 'jobName' is required.", nameof(jobName)); + if (string.IsNullOrWhiteSpace(jobPropertiesJson)) + throw new ArgumentException("Parameter 'jobProperties' is required.", nameof(jobPropertiesJson)); + + // Parse and validate job properties + JsonElement jobProps; + try + { + using var propsDoc = JsonDocument.Parse(jobPropertiesJson); + jobProps = propsDoc.RootElement.Clone(); + } + catch (JsonException ex) + { + throw new ArgumentException($"Invalid JSON in jobProperties: {ex.Message}", ex); + } + + var accountResourceId = await GetAccountResourceIdAsync(subscriptionId, cancellationToken); + + // Build request body + var properties = new Dictionary { ["jobProperties"] = jobProps }; + if (!string.IsNullOrEmpty(mode)) + properties["mode"] = mode; + if (workerCount.HasValue) + properties["workerCount"] = workerCount.Value; + + var body = JsonSerializer.Serialize(new { properties }); + + var client = _httpClientFactory.CreateClient(); + var url = BuildCopyJobsUrl(accountResourceId, jobName); + + _logger.LogInformation("Creating copy job '{JobName}' on account", jobName); + + using var request = await CreateArmRequestAsync( + HttpMethod.Put, url, + new StringContent(body, Encoding.UTF8, "application/json"), + cancellationToken); + var response = await client.SendAsync(request, cancellationToken); + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken); + + if (!response.IsSuccessStatusCode) + { + return new { error = $"Failed to create copy job (HTTP {(int)response.StatusCode}): {responseBody}" }; + } + + using var doc = JsonDocument.Parse(responseBody); + return doc.RootElement.Clone(); + } + catch (Exception ex) when (ex is not ArgumentException) + { + _logger.LogError(ex, "Error creating copy job"); + return new { error = ex.Message }; + } + } + + public async Task GetCopyJob( + string subscriptionId, string jobName, + CancellationToken cancellationToken = default) + { + try + { + if (string.IsNullOrWhiteSpace(subscriptionId)) + throw new ArgumentException("Parameter 'subscriptionId' is required.", nameof(subscriptionId)); + if (string.IsNullOrWhiteSpace(jobName)) + throw new ArgumentException("Parameter 'jobName' is required.", nameof(jobName)); + + var accountResourceId = await GetAccountResourceIdAsync(subscriptionId, cancellationToken); + var client = _httpClientFactory.CreateClient(); + var url = BuildCopyJobsUrl(accountResourceId, jobName); + + using var request = await CreateArmRequestAsync(HttpMethod.Get, url, cancellationToken: cancellationToken); + var response = await client.SendAsync(request, cancellationToken); + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken); + + if (!response.IsSuccessStatusCode) + { + return new { error = $"Failed to get copy job (HTTP {(int)response.StatusCode}): {responseBody}" }; + } + + using var doc = JsonDocument.Parse(responseBody); + return doc.RootElement.Clone(); + } + catch (Exception ex) when (ex is not ArgumentException) + { + _logger.LogError(ex, "Error getting copy job"); + return new { error = ex.Message }; + } + } + + public async Task ListCopyJobs( + string subscriptionId, + CancellationToken cancellationToken = default) + { + try + { + if (string.IsNullOrWhiteSpace(subscriptionId)) + throw new ArgumentException("Parameter 'subscriptionId' is required.", nameof(subscriptionId)); + + var accountResourceId = await GetAccountResourceIdAsync(subscriptionId, cancellationToken); + var client = _httpClientFactory.CreateClient(); + var jobs = new List(); + var url = BuildCopyJobsUrl(accountResourceId); + + while (!string.IsNullOrEmpty(url)) + { + using var request = await CreateArmRequestAsync(HttpMethod.Get, url, cancellationToken: cancellationToken); + var response = await client.SendAsync(request, cancellationToken); + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken); + + if (!response.IsSuccessStatusCode) + { + return new { error = $"Failed to list copy jobs (HTTP {(int)response.StatusCode}): {responseBody}" }; + } + + using var doc = JsonDocument.Parse(responseBody); + if (doc.RootElement.TryGetProperty("value", out var valueArray)) + { + foreach (var item in valueArray.EnumerateArray()) + { + jobs.Add(item.Clone()); + } + } + + url = doc.RootElement.TryGetProperty("nextLink", out var nextLink) + ? nextLink.GetString() + : null; + } + + return jobs; + } + catch (Exception ex) when (ex is not ArgumentException) + { + _logger.LogError(ex, "Error listing copy jobs"); + return new { error = ex.Message }; + } + } + + public async Task CopyJobAction( + string subscriptionId, string jobName, string action, + CancellationToken cancellationToken = default) + { + try + { + if (string.IsNullOrWhiteSpace(subscriptionId)) + throw new ArgumentException("Parameter 'subscriptionId' is required.", nameof(subscriptionId)); + if (string.IsNullOrWhiteSpace(jobName)) + throw new ArgumentException("Parameter 'jobName' is required.", nameof(jobName)); + + var accountResourceId = await GetAccountResourceIdAsync(subscriptionId, cancellationToken); + var client = _httpClientFactory.CreateClient(); + var url = $"https://management.azure.com{accountResourceId}/copyJobs" + + $"/{Uri.EscapeDataString(jobName)}/{action}?api-version={ApiVersion}"; + + _logger.LogInformation("{Action} copy job '{JobName}'", action, jobName); + + using var request = await CreateArmRequestAsync( + HttpMethod.Post, url, + new StringContent("{}", Encoding.UTF8, "application/json"), + cancellationToken); + var response = await client.SendAsync(request, cancellationToken); + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken); + + if (!response.IsSuccessStatusCode) + { + return new { error = $"Failed to {action} copy job (HTTP {(int)response.StatusCode}): {responseBody}" }; + } + + if (string.IsNullOrWhiteSpace(responseBody)) + { + return new { status = $"{action} accepted", jobName }; + } + + using var doc = JsonDocument.Parse(responseBody); + return doc.RootElement.Clone(); + } + catch (Exception ex) when (ex is not ArgumentException) + { + _logger.LogError(ex, "Error performing {Action} on copy job", action); + return new { error = ex.Message }; + } + } +} diff --git a/tests/AzureCosmosDB.MCP.Toolkit.Tests/CopyJobServiceTests.cs b/tests/AzureCosmosDB.MCP.Toolkit.Tests/CopyJobServiceTests.cs new file mode 100644 index 0000000..c1f1dc2 --- /dev/null +++ b/tests/AzureCosmosDB.MCP.Toolkit.Tests/CopyJobServiceTests.cs @@ -0,0 +1,130 @@ +using Xunit; +using FluentAssertions; +using System.Net; +using System.Text; +using System.Text.Json; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Configuration; +using Moq; +using Moq.Protected; +using AzureCosmosDB.MCP.Toolkit.Services; + +namespace AzureCosmosDB.MCP.Toolkit.Tests; + +public class CopyJobServiceTests +{ + private readonly Mock> _loggerMock; + private readonly Mock _configurationMock; + + public CopyJobServiceTests() + { + _loggerMock = new Mock>(); + _configurationMock = new Mock(); + } + + [Fact] + public void CopyJobService_Should_Exist() + { + var type = typeof(CopyJobService); + type.Should().NotBeNull(); + type.Name.Should().Be("CopyJobService"); + } + + [Fact] + public async Task CreateCopyJob_Should_Require_SubscriptionId() + { + // Arrange + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + // Act & Assert + await Assert.ThrowsAsync(() => + service.CreateCopyJob("", "testJob", "{}", cancellationToken: CancellationToken.None)); + } + + [Fact] + public async Task CreateCopyJob_Should_Require_JobName() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.CreateCopyJob("sub-123", "", "{}", cancellationToken: CancellationToken.None)); + } + + [Fact] + public async Task CreateCopyJob_Should_Require_JobProperties() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.CreateCopyJob("sub-123", "testJob", "", cancellationToken: CancellationToken.None)); + } + + [Fact] + public async Task CreateCopyJob_Should_Validate_JobProperties_Json() + { + // Arrange + Environment.SetEnvironmentVariable("COSMOS_ENDPOINT", "https://testaccount.documents.azure.com:443/"); + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + // Act & Assert — invalid JSON should throw ArgumentException + await Assert.ThrowsAsync(() => + service.CreateCopyJob("sub-123", "testJob", "not-valid-json", cancellationToken: CancellationToken.None)); + + // Cleanup + Environment.SetEnvironmentVariable("COSMOS_ENDPOINT", null); + } + + [Fact] + public async Task GetCopyJob_Should_Require_SubscriptionId() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.GetCopyJob("", "testJob", CancellationToken.None)); + } + + [Fact] + public async Task GetCopyJob_Should_Require_JobName() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.GetCopyJob("sub-123", "", CancellationToken.None)); + } + + [Fact] + public async Task ListCopyJobs_Should_Require_SubscriptionId() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.ListCopyJobs("", CancellationToken.None)); + } + + [Fact] + public async Task CopyJobAction_Should_Require_SubscriptionId() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.CopyJobAction("", "testJob", "cancel", CancellationToken.None)); + } + + [Fact] + public async Task CopyJobAction_Should_Require_JobName() + { + var httpClientFactory = new Mock(); + var service = new CopyJobService(httpClientFactory.Object, _loggerMock.Object, _configurationMock.Object); + + await Assert.ThrowsAsync(() => + service.CopyJobAction("sub-123", "", "cancel", CancellationToken.None)); + } +}