Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/ruby_llm/mcp/adapters/ruby_llm_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def initialize(client, transport_type:, config: {})
:roots_list_change_notification,
:ping_response, :roots_list_response,
:sampling_create_message_response,
:error_response, :elicitation_response
:error_response, :elicitation_response,
:register_in_flight_request, :unregister_in_flight_request,
:cancel_in_flight_request

# Handle resource registration in adapter (public API concern)
def register_resource(resource)
Expand Down
4 changes: 3 additions & 1 deletion lib/ruby_llm/mcp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def initialize(name:, transport_type:, sdk: nil, adapter: nil, start: true, # ru
@adapter.start if start
end

def_delegators :@adapter, :alive?, :capabilities, :ping, :client_capabilities
def_delegators :@adapter, :alive?, :capabilities, :ping, :client_capabilities,
:register_in_flight_request, :unregister_in_flight_request,
:cancel_in_flight_request

def start
@adapter.start
Expand Down
1 change: 1 addition & 0 deletions lib/ruby_llm/mcp/elicitation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def initialize(coordinator, result)

def execute
success = @coordinator.elicitation_callback&.call(self)

if success
valid = validate_response
if valid
Expand Down
9 changes: 9 additions & 0 deletions lib/ruby_llm/mcp/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ class UnsupportedFeature < BaseError; end
class UnsupportedTransport < BaseError; end

class AdapterConfigurationError < BaseError; end

class RequestCancelled < BaseError
attr_reader :request_id

def initialize(message:, request_id:)
@request_id = request_id
super(message: message)
end
end
end
end
end
57 changes: 57 additions & 0 deletions lib/ruby_llm/mcp/native/cancellable_operation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

module RubyLLM
module MCP
module Native
# Wraps server-initiated requests to support cancellation
# Executes the request in a separate thread that can be terminated on cancellation
class CancellableOperation
attr_reader :request_id, :thread

def initialize(request_id)
@request_id = request_id
@cancelled = false
@mutex = Mutex.new
@thread = nil
@result = nil
@error = nil
end

def cancelled?
@mutex.synchronize { @cancelled }
end

def cancel
@mutex.synchronize { @cancelled = true }
if @thread&.alive?
@thread.raise(Errors::RequestCancelled.new(
message: "Request #{@request_id} was cancelled",
request_id: @request_id
))
end
end

# Execute a block in a separate thread
# This allows the thread to be terminated if cancellation is requested
# Returns the result of the block or re-raises any error that occurred
def execute(&)
@thread = Thread.new do
Thread.current.abort_on_exception = false
begin
@result = yield
rescue Errors::RequestCancelled, StandardError => e
@error = e
end
end

@thread.join
raise @error if @error && !@error.is_a?(Errors::RequestCancelled)

@result
ensure
@thread = nil
end
end
end
end
end
39 changes: 39 additions & 0 deletions lib/ruby_llm/mcp/native/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def initialize( # rubocop:disable Metrics/ParameterLists

@transport = nil
@capabilities = nil

# Track in-flight server-initiated requests for cancellation
@in_flight_requests = {}
@in_flight_mutex = Mutex.new
end

def request(body, **options)
Expand Down Expand Up @@ -316,6 +320,41 @@ def sampling_callback_enabled?
def transport
@transport ||= Native::Transport.new(@transport_type, self, config: @config)
end

# Register a server-initiated request that can be cancelled
# @param request_id [String] The ID of the request
# @param cancellable_operation [CancellableOperation, nil] The operation that can be cancelled
def register_in_flight_request(request_id, cancellable_operation = nil)
@in_flight_mutex.synchronize do
@in_flight_requests[request_id.to_s] = cancellable_operation
end
end

# Unregister a completed or cancelled request
# @param request_id [String] The ID of the request
def unregister_in_flight_request(request_id)
@in_flight_mutex.synchronize do
@in_flight_requests.delete(request_id.to_s)
end
end

# Cancel an in-flight server-initiated request
# @param request_id [String] The ID of the request to cancel
# @return [Boolean] true if the request was found and cancelled, false otherwise
def cancel_in_flight_request(request_id) # rubocop:disable Naming/PredicateMethod
operation = nil
@in_flight_mutex.synchronize do
operation = @in_flight_requests[request_id.to_s]
end

if operation.respond_to?(:cancel)
operation.cancel
true
else
RubyLLM::MCP.logger.warn("Request #{request_id} cannot be cancelled or was already completed")
false
end
end
end
end
end
Expand Down
48 changes: 31 additions & 17 deletions lib/ruby_llm/mcp/native/response_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,38 @@ def initialize(coordinator)
@coordinator = coordinator
end

def execute(result) # rubocop:disable Naming/PredicateMethod
if result.ping?
coordinator.ping_response(id: result.id)
true
elsif result.roots?
handle_roots_response(result)
true
elsif result.sampling?
handle_sampling_response(result)
true
elsif result.elicitation?
handle_elicitation_response(result)
def execute(result)
operation = CancellableOperation.new(result.id)
coordinator.register_in_flight_request(result.id, operation)

begin
# Execute in a separate thread that can be terminated on cancellation
operation.execute do
if result.ping?
coordinator.ping_response(id: result.id)
true
elsif result.roots?
handle_roots_response(result)
true
elsif result.sampling?
handle_sampling_response(result)
true
elsif result.elicitation?
handle_elicitation_response(result)
true
else
handle_unknown_request(result)
RubyLLM::MCP.logger.error("MCP client was sent unknown method type and \
could not respond: #{result.inspect}.")
false
end
end
rescue Errors::RequestCancelled => e
RubyLLM::MCP.logger.info("Request #{result.id} was cancelled: #{e.message}")
# Don't send response - cancellation means result is unused
true
else
handle_unknown_request(result)
RubyLLM::MCP.logger.error("MCP client was sent unknown method type and \
could not respond: #{result.inspect}.")
false
ensure
coordinator.unregister_in_flight_request(result.id)
end
end

Expand Down
19 changes: 18 additions & 1 deletion lib/ruby_llm/mcp/notification_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def execute(notification)
when "notifications/progress"
process_progress_message(notification)
when "notifications/cancelled"
# TODO: - do nothing at the moment until we support client operations
process_cancelled_notification(notification)
else
process_unknown_notification(notification)
end
Expand Down Expand Up @@ -74,6 +74,23 @@ def default_process_logging_message(notification, logger: RubyLLM::MCP.logger)
end
end

def process_cancelled_notification(notification)
request_id = notification.params["requestId"]
reason = notification.params["reason"] || "No reason provided"

RubyLLM::MCP.logger.info(
"Received cancellation for request #{request_id}: #{reason}"
)

success = client.cancel_in_flight_request(request_id)

unless success
RubyLLM::MCP.logger.debug(
"Request #{request_id} was not found or already completed"
)
end
end

def process_unknown_notification(notification)
message = "Unknown notification type: #{notification.type} params: #{notification.params.to_h}"
RubyLLM::MCP.logger.error(message)
Expand Down
53 changes: 53 additions & 0 deletions spec/fixtures/typescript-mcp/src/tools/client-interaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,57 @@ export function setupClientInteractionTools(server: McpServer) {
};
}
});

server.tool(
"sample_with_cancellation",
"Test cancellation by initiating a slow sampling request that can be cancelled",
{},
async ({}) => {
try {
// Start a sampling request that will take time
// The client should have a slow sampling callback configured
// The test will send a cancellation notification while this is in-flight
const result = await server.server.createMessage({
messages: [
{
role: "user" as const,
content: {
type: "text" as const,
text: "This request should be cancelled by the client",
},
},
],
model: "gpt-4o",
modelPreferences: {
hints: [{ name: "gpt-4o" }],
},
systemPrompt: "You are a helpful assistant.",
maxTokens: 100,
});

// If we get here, the request completed (wasn't cancelled)
return {
content: [
{
type: "text" as const,
text: `Cancellation test FAILED: Request completed when it should have been cancelled. Result: ${JSON.stringify(
result
)}`,
},
],
isError: true,
};
} catch (error: any) {
// An error is expected if cancellation worked
return {
content: [
{
type: "text" as const,
text: `Cancellation test PASSED: Request was cancelled (${error.message})`,
},
],
};
}
}
);
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading