Skip to content

Commit 7e89262

Browse files
authored
feat(aws-data-processing-mcp-server): Add Support for EMR-S Job and Application (#1881)
* feat(aws-data-processing-mcp-server): Add Support for EMR-S Job and Application * chore(aws-data-processing-mcp-server): Fix pyright errors * chore(aws-data-processing-mcp-server): Add more test coverage * chore(aws-dataprocessing-mcp-server):Add more tests for aws_helper
1 parent ffef17e commit 7e89262

File tree

11 files changed

+3759
-0
lines changed

11 files changed

+3759
-0
lines changed

src/aws-dataprocessing-mcp-server/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,13 @@ Controls whether the MCP server adds and verifies MCP-managed tags on resources.
373373
|-----------|-------------|----------------|--------------|
374374
| manage_aws_emr_ec2_steps | Manage Amazon EMR steps for processing data on EMR clusters | add-steps, cancel-steps, describe-step, list-steps | --allow-write flag for add/cancel operations, appropriate AWS permissions |
375375

376+
### EMR Serverless Handler Tools
377+
378+
| Tool Name | Description | Key Operations | Requirements |
379+
|-----------|-------------|----------------|--------------|
380+
| manage_aws_emr_serverless_applications | Manage Amazon EMR Serverless applications with comprehensive lifecycle control | create-application, get-application, update-application, delete-application, list-applications, start-application, stop-application | --allow-write flag for create/update/delete/start/stop operations, appropriate AWS permissions |
381+
| manage_aws_emr_serverless_job_runs | Manage Amazon EMR Serverless job runs for executing data processing workloads | start-job-run, get-job-run, cancel-job-run, list-job-runs, get-dashboard-for-job-run | --allow-write flag for start/cancel operations, application must exist, appropriate AWS permissions |
382+
376383
### Athena Query Handler Tools
377384

378385
| Tool Name | Description | Key Operations | Requirements |

src/aws-dataprocessing-mcp-server/awslabs/aws_dataprocessing_mcp_server/handlers/emr/emr_serverless_application_handler.py

Lines changed: 623 additions & 0 deletions
Large diffs are not rendered by default.

src/aws-dataprocessing-mcp-server/awslabs/aws_dataprocessing_mcp_server/handlers/emr/emr_serverless_job_run_handler.py

Lines changed: 483 additions & 0 deletions
Large diffs are not rendered by default.

src/aws-dataprocessing-mcp-server/awslabs/aws_dataprocessing_mcp_server/models/emr_models.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,3 +465,123 @@ class WaitClusterResponse(CallToolResult):
465465
cluster_id: str = Field(..., description='ID of the cluster')
466466
state: str = Field(..., description='Current state of the cluster')
467467
operation: str = Field(default='wait', description='Operation performed')
468+
469+
470+
# Response models for EMR Serverless Operations
471+
472+
473+
class CreateApplicationResponse(CallToolResult):
474+
"""Response model for create EMR Serverless application operation."""
475+
476+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
477+
content: List[Content] = Field(..., description='Content of the response')
478+
application_id: str = Field(..., description='ID of the created application')
479+
name: str = Field(..., description='Name of the created application')
480+
arn: str = Field(..., description='ARN of the created application')
481+
operation: str = Field(default='create-application', description='Operation performed')
482+
483+
484+
class GetApplicationResponse(CallToolResult):
485+
"""Response model for get EMR Serverless application operation."""
486+
487+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
488+
content: List[Content] = Field(..., description='Content of the response')
489+
application: Dict[str, Any] = Field(..., description='Application details')
490+
operation: str = Field(default='get-application', description='Operation performed')
491+
492+
493+
class UpdateApplicationResponse(CallToolResult):
494+
"""Response model for update EMR Serverless application operation."""
495+
496+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
497+
content: List[Content] = Field(..., description='Content of the response')
498+
application: Dict[str, Any] = Field(..., description='Updated application details')
499+
operation: str = Field(default='update-application', description='Operation performed')
500+
501+
502+
class DeleteApplicationResponse(CallToolResult):
503+
"""Response model for delete EMR Serverless application operation."""
504+
505+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
506+
content: List[Content] = Field(..., description='Content of the response')
507+
application_id: str = Field(..., description='ID of the deleted application')
508+
operation: str = Field(default='delete-application', description='Operation performed')
509+
510+
511+
class ListApplicationsResponse(CallToolResult):
512+
"""Response model for list EMR Serverless applications operation."""
513+
514+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
515+
content: List[Content] = Field(..., description='Content of the response')
516+
applications: List[Dict[str, Any]] = Field(..., description='List of applications')
517+
count: int = Field(..., description='Number of applications found')
518+
next_token: Optional[str] = Field(None, description='Token for pagination')
519+
operation: str = Field(default='list-applications', description='Operation performed')
520+
521+
522+
class StartApplicationResponse(CallToolResult):
523+
"""Response model for start EMR Serverless application operation."""
524+
525+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
526+
content: List[Content] = Field(..., description='Content of the response')
527+
application_id: str = Field(..., description='ID of the started application')
528+
operation: str = Field(default='start-application', description='Operation performed')
529+
530+
531+
class StopApplicationResponse(CallToolResult):
532+
"""Response model for stop EMR Serverless application operation."""
533+
534+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
535+
content: List[Content] = Field(..., description='Content of the response')
536+
application_id: str = Field(..., description='ID of the stopped application')
537+
operation: str = Field(default='stop-application', description='Operation performed')
538+
539+
540+
class StartJobRunResponse(CallToolResult):
541+
"""Response model for start EMR Serverless job run operation."""
542+
543+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
544+
content: List[Content] = Field(..., description='Content of the response')
545+
application_id: str = Field(..., description='ID of the application')
546+
job_run_id: str = Field(..., description='ID of the started job run')
547+
arn: str = Field(..., description='ARN of the job run')
548+
operation: str = Field(default='start-job-run', description='Operation performed')
549+
550+
551+
class GetJobRunResponse(CallToolResult):
552+
"""Response model for get EMR Serverless job run operation."""
553+
554+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
555+
content: List[Content] = Field(..., description='Content of the response')
556+
job_run: Dict[str, Any] = Field(..., description='Job run details')
557+
operation: str = Field(default='get-job-run', description='Operation performed')
558+
559+
560+
class CancelJobRunResponse(CallToolResult):
561+
"""Response model for cancel EMR Serverless job run operation."""
562+
563+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
564+
content: List[Content] = Field(..., description='Content of the response')
565+
application_id: str = Field(..., description='ID of the application')
566+
job_run_id: str = Field(..., description='ID of the cancelled job run')
567+
operation: str = Field(default='cancel-job-run', description='Operation performed')
568+
569+
570+
class ListJobRunsResponse(CallToolResult):
571+
"""Response model for list EMR Serverless job runs operation."""
572+
573+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
574+
content: List[Content] = Field(..., description='Content of the response')
575+
job_runs: List[Dict[str, Any]] = Field(..., description='List of job runs')
576+
count: int = Field(..., description='Number of job runs found')
577+
next_token: Optional[str] = Field(None, description='Token for pagination')
578+
operation: str = Field(default='list-job-runs', description='Operation performed')
579+
580+
581+
class GetDashboardForJobRunResponse(CallToolResult):
582+
"""Response model for get dashboard for EMR Serverless job run operation."""
583+
584+
isError: bool = Field(default=False, description='Whether the operation resulted in an error')
585+
content: List[Content] = Field(..., description='Content of the response')
586+
url: str = Field(..., description='Dashboard URL for the job run')
587+
operation: str = Field(default='get-dashboard-for-job-run', description='Operation performed')

src/aws-dataprocessing-mcp-server/awslabs/aws_dataprocessing_mcp_server/server.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@
4545
from awslabs.aws_dataprocessing_mcp_server.handlers.emr.emr_ec2_steps_handler import (
4646
EMREc2StepsHandler,
4747
)
48+
from awslabs.aws_dataprocessing_mcp_server.handlers.emr.emr_serverless_application_handler import (
49+
EMRServerlessApplicationHandler,
50+
)
51+
from awslabs.aws_dataprocessing_mcp_server.handlers.emr.emr_serverless_job_run_handler import (
52+
EMRServerlessJobRunHandler,
53+
)
4854
from awslabs.aws_dataprocessing_mcp_server.handlers.glue.crawler_handler import (
4955
CrawlerHandler,
5056
)
@@ -372,6 +378,19 @@ def main():
372378
allow_write=allow_write,
373379
allow_sensitive_data_access=allow_sensitive_data_access,
374380
)
381+
382+
EMRServerlessApplicationHandler(
383+
mcp,
384+
allow_write=allow_write,
385+
allow_sensitive_data_access=allow_sensitive_data_access,
386+
)
387+
388+
EMRServerlessJobRunHandler(
389+
mcp,
390+
allow_write=allow_write,
391+
allow_sensitive_data_access=allow_sensitive_data_access,
392+
)
393+
375394
CommonResourceHandler(mcp, allow_write=allow_write)
376395

377396
# Run server

src/aws-dataprocessing-mcp-server/awslabs/aws_dataprocessing_mcp_server/utils/aws_helper.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
CUSTOM_TAGS_ENV_VAR,
2121
DEFAULT_RESOURCE_TAGS,
2222
EMR_CLUSTER_RESOURCE_TYPE,
23+
EMR_SERVERLESS_APPLICATION_RESOURCE_TYPE,
2324
MCP_CREATION_TIME_TAG_KEY,
2425
MCP_MANAGED_TAG_KEY,
2526
MCP_MANAGED_TAG_VALUE,
@@ -425,3 +426,64 @@ def verify_athena_data_catalog_managed_by_mcp(
425426
except Exception as e:
426427
result['error_message'] = f'Error getting data catalog: {str(e)}'
427428
return result
429+
430+
@classmethod
431+
def verify_emr_serverless_application_managed_by_mcp(
432+
cls,
433+
emr_serverless_client: Any,
434+
application_id: str,
435+
expected_resource_type: str = EMR_SERVERLESS_APPLICATION_RESOURCE_TYPE,
436+
) -> Dict[str, Any]:
437+
"""Verify if an EMR Serverless application is managed by the MCP server and has the expected resource type.
438+
439+
This method checks if the EMR Serverless application has the MCP managed tag and the correct resource type tag.
440+
441+
Args:
442+
emr_serverless_client: EMR Serverless boto3 client
443+
application_id: ID of the EMR Serverless application to verify
444+
expected_resource_type: The expected resource type value (default: EMR_SERVERLESS_APPLICATION_RESOURCE_TYPE)
445+
446+
Returns:
447+
Dictionary with verification result:
448+
- is_valid: True if verification passed, False otherwise
449+
- error_message: Error message if verification failed, None otherwise
450+
"""
451+
# If custom tags are enabled, skip verification
452+
if cls.is_custom_tags_enabled():
453+
return {'is_valid': True, 'error_message': None}
454+
455+
result = {'is_valid': False, 'error_message': None}
456+
457+
try:
458+
response = emr_serverless_client.get_application(applicationId=application_id)
459+
tags_dict = response.get('application', {}).get('tags', {})
460+
461+
# Convert tags dictionary to list format for verification
462+
tags_list = [{'Key': key, 'Value': value} for key, value in tags_dict.items()]
463+
464+
# Check if the resource is managed by MCP
465+
if not cls.verify_resource_managed_by_mcp(tags_list):
466+
result['error_message'] = (
467+
f'Application {application_id} is not managed by MCP (missing required tags)'
468+
)
469+
return result
470+
471+
# Check if the resource has the expected resource type
472+
actual_type = tags_dict.get(MCP_RESOURCE_TYPE_TAG_KEY, 'unknown')
473+
if (
474+
actual_type != expected_resource_type
475+
and actual_type != EMR_SERVERLESS_APPLICATION_RESOURCE_TYPE
476+
):
477+
result['error_message'] = (
478+
f'Application {application_id} has incorrect type (expected {expected_resource_type}, got {actual_type})'
479+
)
480+
return result
481+
482+
# All checks passed
483+
result['is_valid'] = True
484+
return result
485+
486+
except ClientError as e:
487+
# If we can't get the application information, return error
488+
result['error_message'] = f'Error retrieving application {application_id}: {str(e)}'
489+
return result

src/aws-dataprocessing-mcp-server/awslabs/aws_dataprocessing_mcp_server/utils/consts.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@
3131
EMR_INSTANCE_FLEET_RESOURCE_TYPE = 'EMRInstanceFleet'
3232
EMR_INSTANCE_GROUP_RESOURCE_TYPE = 'EMRInstanceGroup'
3333
EMR_STEPS_RESOURCE_TYPE = 'EMRSteps'
34+
EMR_SERVERLESS_APPLICATION_RESOURCE_TYPE = 'EMRServerlessApplication'
35+
EMR_SERVERLESS_JOB_RUN_RESOURCE_TYPE = 'EMRServerlessJobRun'

0 commit comments

Comments
 (0)