Skip to content
59 changes: 59 additions & 0 deletions examples/servers/everything-server/mcp_everything_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,65 @@ async def test_elicitation_sep1034_defaults(ctx: Context[ServerSession, None]) -
return f"Elicitation not supported or error: {str(e)}"


class EnumSchemasTestSchema(BaseModel):
"""Schema for testing enum schema variations (SEP-1330)"""

untitledSingle: str = Field(
description="Simple enum without titles", json_schema_extra={"enum": ["active", "inactive", "pending"]}
)
titledSingle: str = Field(
description="Enum with titled options (oneOf)",
json_schema_extra={
"oneOf": [
{"const": "low", "title": "Low Priority"},
{"const": "medium", "title": "Medium Priority"},
{"const": "high", "title": "High Priority"},
]
},
)
untitledMulti: list[str] = Field(
description="Multi-select without titles",
json_schema_extra={"items": {"type": "string", "enum": ["read", "write", "execute"]}},
)
titledMulti: list[str] = Field(
description="Multi-select with titled options",
json_schema_extra={
"items": {
"anyOf": [
{"const": "feature", "title": "New Feature"},
{"const": "bug", "title": "Bug Fix"},
{"const": "docs", "title": "Documentation"},
]
}
},
)
legacyEnum: str = Field(
description="Legacy enum with enumNames",
json_schema_extra={
"enum": ["small", "medium", "large"],
"enumNames": ["Small Size", "Medium Size", "Large Size"],
},
)


@mcp.tool()
async def test_elicitation_sep1330_enums(ctx: Context[ServerSession, None]) -> str:
"""Tests elicitation with enum schema variations per SEP-1330"""
try:
result = await ctx.elicit(
message="Please select values using different enum schema types", schema=EnumSchemasTestSchema
)

if result.action == "accept":
content = result.data.model_dump_json()
else:
content = "{}"

return f"Elicitation completed: action={result.action}, content={content}"
except Exception as e:
return f"Elicitation not supported or error: {str(e)}"


@mcp.tool()
def test_error_handling() -> str:
"""Tests error response handling"""
Expand Down
44 changes: 32 additions & 12 deletions src/mcp/server/elicitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from __future__ import annotations

import types
from collections.abc import Sequence
from typing import Generic, Literal, TypeVar, Union, get_args, get_origin

from pydantic import BaseModel
from pydantic.fields import FieldInfo

from mcp.server.session import ServerSession
from mcp.types import RequestId
Expand Down Expand Up @@ -43,22 +43,40 @@ class CancelledElicitation(BaseModel):
def _validate_elicitation_schema(schema: type[BaseModel]) -> None:
"""Validate that a Pydantic model only contains primitive field types."""
for field_name, field_info in schema.model_fields.items():
if not _is_primitive_field(field_info):
annotation = field_info.annotation

if annotation is None or annotation is types.NoneType: # pragma: no cover
continue
elif _is_primitive_field(annotation):
continue
elif _is_string_sequence(annotation):
continue
else:
raise TypeError(
f"Elicitation schema field '{field_name}' must be a primitive type "
f"{_ELICITATION_PRIMITIVE_TYPES} or Optional of these types. "
f"Complex types like lists, dicts, or nested models are not allowed."
f"{_ELICITATION_PRIMITIVE_TYPES}, a sequence of strings (list[str], etc.), "
f"or Optional of these types. Nested models and complex types are not allowed."
)


def _is_primitive_field(field_info: FieldInfo) -> bool:
"""Check if a field is a primitive type allowed in elicitation schemas."""
annotation = field_info.annotation
def _is_string_sequence(annotation: type) -> bool:
"""Check if annotation is a sequence of strings (list[str], Sequence[str], etc)."""
origin = get_origin(annotation)
# Check if it's a sequence-like type with str elements
if origin:
try:
if issubclass(origin, Sequence):
args = get_args(annotation)
# Should have single str type arg
return len(args) == 1 and args[0] is str
except TypeError: # pragma: no cover
# origin is not a class, so it can't be a subclass of Sequence
pass
return False

# Handle None type
if annotation is types.NoneType: # pragma: no cover
return True

def _is_primitive_field(annotation: type) -> bool:
"""Check if a field is a primitive type allowed in elicitation schemas."""
# Handle basic primitive types
if annotation in _ELICITATION_PRIMITIVE_TYPES:
return True
Expand All @@ -67,8 +85,10 @@ def _is_primitive_field(field_info: FieldInfo) -> bool:
origin = get_origin(annotation)
if origin is Union or origin is types.UnionType:
args = get_args(annotation)
# All args must be primitive types or None
return all(arg is types.NoneType or arg in _ELICITATION_PRIMITIVE_TYPES for arg in args)
# All args must be primitive types, None, or string sequences
return all(
arg is types.NoneType or arg in _ELICITATION_PRIMITIVE_TYPES or _is_string_sequence(arg) for arg in args
)

return False

Expand Down
2 changes: 1 addition & 1 deletion src/mcp/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,7 @@ class ElicitResult(Result):
- "cancel": User dismissed without making an explicit choice
"""

content: dict[str, str | int | float | bool | None] | None = None
content: dict[str, str | int | float | bool | list[str] | None] | None = None
"""
The submitted form data, only present when action is "accept".
Contains values matching the requested schema.
Expand Down
133 changes: 130 additions & 3 deletions tests/server/fastmcp/test_elicitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def tool(ctx: Context[ServerSession, None]) -> str: # pragma: no cover

# Test cases for invalid schemas
class InvalidListSchema(BaseModel):
names: list[str] = Field(description="List of names")
numbers: list[int] = Field(description="List of numbers")

class NestedModel(BaseModel):
value: str
Expand All @@ -139,7 +139,7 @@ async def elicitation_callback(
await client_session.initialize()

# Test both invalid schemas
for tool_name, field_name in [("invalid_list", "names"), ("nested_model", "nested")]:
for tool_name, field_name in [("invalid_list", "numbers"), ("nested_model", "nested")]:
result = await client_session.call_tool(tool_name, {})
assert len(result.content) == 1
assert isinstance(result.content[0], TextContent)
Expand Down Expand Up @@ -197,7 +197,7 @@ async def callback(context: RequestContext[ClientSession, None], params: ElicitR
# Test invalid optional field
class InvalidOptionalSchema(BaseModel):
name: str = Field(description="Name")
optional_list: list[str] | None = Field(default=None, description="Invalid optional list")
optional_list: list[int] | None = Field(default=None, description="Invalid optional list")

@mcp.tool(description="Tool with invalid optional field")
async def invalid_optional_tool(ctx: Context[ServerSession, None]) -> str: # pragma: no cover
Expand All @@ -220,6 +220,47 @@ async def elicitation_callback(
text_contains=["Validation failed:", "optional_list"],
)

# Test valid list[str] for multi-select enum
class ValidMultiSelectSchema(BaseModel):
name: str = Field(description="Name")
tags: list[str] = Field(description="Tags")

@mcp.tool(description="Tool with valid list[str] field")
async def valid_multiselect_tool(ctx: Context[ServerSession, None]) -> str:
result = await ctx.elicit(message="Please provide tags", schema=ValidMultiSelectSchema)
if result.action == "accept" and result.data:
return f"Name: {result.data.name}, Tags: {', '.join(result.data.tags)}"
return f"User {result.action}" # pragma: no cover

async def multiselect_callback(context: RequestContext[ClientSession, Any], params: ElicitRequestParams):
if "Please provide tags" in params.message:
return ElicitResult(action="accept", content={"name": "Test", "tags": ["tag1", "tag2"]})
return ElicitResult(action="decline") # pragma: no cover

await call_tool_and_assert(mcp, multiselect_callback, "valid_multiselect_tool", {}, "Name: Test, Tags: tag1, tag2")

# Test Optional[list[str]] for optional multi-select enum
class OptionalMultiSelectSchema(BaseModel):
name: str = Field(description="Name")
tags: list[str] | None = Field(default=None, description="Optional tags")

@mcp.tool(description="Tool with optional list[str] field")
async def optional_multiselect_tool(ctx: Context[ServerSession, None]) -> str:
result = await ctx.elicit(message="Please provide optional tags", schema=OptionalMultiSelectSchema)
if result.action == "accept" and result.data:
tags_str = ", ".join(result.data.tags) if result.data.tags else "none"
return f"Name: {result.data.name}, Tags: {tags_str}"
return f"User {result.action}" # pragma: no cover

async def optional_multiselect_callback(context: RequestContext[ClientSession, Any], params: ElicitRequestParams):
if "Please provide optional tags" in params.message:
return ElicitResult(action="accept", content={"name": "Test", "tags": ["tag1", "tag2"]})
return ElicitResult(action="decline") # pragma: no cover

await call_tool_and_assert(
mcp, optional_multiselect_callback, "optional_multiselect_tool", {}, "Name: Test, Tags: tag1, tag2"
)


@pytest.mark.anyio
async def test_elicitation_with_default_values():
Expand Down Expand Up @@ -274,3 +315,89 @@ async def callback_override(context: RequestContext[ClientSession, None], params
await call_tool_and_assert(
mcp, callback_override, "defaults_tool", {}, "Name: John, Age: 25, Subscribe: False, Email: [email protected]"
)


@pytest.mark.anyio
async def test_elicitation_with_enum_titles():
"""Test elicitation with enum schemas using oneOf/anyOf for titles."""
mcp = FastMCP(name="ColorPreferencesApp")

# Test single-select with titles using oneOf
class FavoriteColorSchema(BaseModel):
user_name: str = Field(description="Your name")
favorite_color: str = Field(
description="Select your favorite color",
json_schema_extra={
"oneOf": [
{"const": "red", "title": "Red"},
{"const": "green", "title": "Green"},
{"const": "blue", "title": "Blue"},
{"const": "yellow", "title": "Yellow"},
]
},
)

@mcp.tool(description="Single color selection")
async def select_favorite_color(ctx: Context[ServerSession, None]) -> str:
result = await ctx.elicit(message="Select your favorite color", schema=FavoriteColorSchema)
if result.action == "accept" and result.data:
return f"User: {result.data.user_name}, Favorite: {result.data.favorite_color}"
return f"User {result.action}" # pragma: no cover

# Test multi-select with titles using anyOf
class FavoriteColorsSchema(BaseModel):
user_name: str = Field(description="Your name")
favorite_colors: list[str] = Field(
description="Select your favorite colors",
json_schema_extra={
"items": {
"anyOf": [
{"const": "red", "title": "Red"},
{"const": "green", "title": "Green"},
{"const": "blue", "title": "Blue"},
{"const": "yellow", "title": "Yellow"},
]
}
},
)

@mcp.tool(description="Multiple color selection")
async def select_favorite_colors(ctx: Context[ServerSession, None]) -> str:
result = await ctx.elicit(message="Select your favorite colors", schema=FavoriteColorsSchema)
if result.action == "accept" and result.data:
return f"User: {result.data.user_name}, Colors: {', '.join(result.data.favorite_colors)}"
return f"User {result.action}" # pragma: no cover

# Test legacy enumNames format
class LegacyColorSchema(BaseModel):
user_name: str = Field(description="Your name")
color: str = Field(
description="Select a color",
json_schema_extra={"enum": ["red", "green", "blue"], "enumNames": ["Red", "Green", "Blue"]},
)

@mcp.tool(description="Legacy enum format")
async def select_color_legacy(ctx: Context[ServerSession, None]) -> str:
result = await ctx.elicit(message="Select a color (legacy format)", schema=LegacyColorSchema)
if result.action == "accept" and result.data:
return f"User: {result.data.user_name}, Color: {result.data.color}"
return f"User {result.action}" # pragma: no cover

async def enum_callback(context: RequestContext[ClientSession, Any], params: ElicitRequestParams):
if "colors" in params.message and "legacy" not in params.message:
return ElicitResult(action="accept", content={"user_name": "Bob", "favorite_colors": ["red", "green"]})
elif "color" in params.message:
if "legacy" in params.message:
return ElicitResult(action="accept", content={"user_name": "Charlie", "color": "green"})
else:
return ElicitResult(action="accept", content={"user_name": "Alice", "favorite_color": "blue"})
return ElicitResult(action="decline") # pragma: no cover

# Test single-select with titles
await call_tool_and_assert(mcp, enum_callback, "select_favorite_color", {}, "User: Alice, Favorite: blue")

# Test multi-select with titles
await call_tool_and_assert(mcp, enum_callback, "select_favorite_colors", {}, "User: Bob, Colors: red, green")

# Test legacy enumNames format
await call_tool_and_assert(mcp, enum_callback, "select_color_legacy", {}, "User: Charlie, Color: green")