From bbde543731394fbac7ef6845aaa851e2ad3719bc Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 10 Jul 2025 03:04:07 +0900 Subject: [PATCH 1/5] test(a2a/client/auth/credentials.py): add and refactor tests in test_auth_middleware.py to improve coverage Signed-off-by: Shingo OKAWA --- tests/client/test_auth_middleware.py | 415 ++++++++++----------------- 1 file changed, 148 insertions(+), 267 deletions(-) diff --git a/tests/client/test_auth_middleware.py b/tests/client/test_auth_middleware.py index 94c0abde..ec7394e9 100644 --- a/tests/client/test_auth_middleware.py +++ b/tests/client/test_auth_middleware.py @@ -1,3 +1,5 @@ +from collections.abc import Callable +from dataclasses import dataclass from typing import Any import httpx @@ -20,11 +22,13 @@ Role, SecurityScheme, SendMessageRequest, + SendMessageSuccessResponse, ) -# A simple mock interceptor for testing basic middleware functionality class HeaderInterceptor(ClientCallInterceptor): + """A simple mock interceptor for testing basic middleware functionality.""" + def __init__(self, header_name: str, header_value: str): self.header_name = header_name self.header_value = header_value @@ -43,72 +47,71 @@ async def intercept( return request_payload, http_kwargs -@pytest.mark.asyncio -@respx.mock -async def test_client_with_simple_interceptor(): - """ - Tests that a basic interceptor is called and successfully - modifies the outgoing request headers. - """ - # Arrange - test_url = 'http://fake-agent.com/rpc' - header_interceptor = HeaderInterceptor('X-Test-Header', 'Test-Value-123') - async with httpx.AsyncClient() as http_client: - client = A2AClient( - httpx_client=http_client, - url=test_url, - interceptors=[header_interceptor], - ) +def build_success_response() -> dict: + """Creates a valid JSON-RPC success response as dict.""" + return SendMessageSuccessResponse( + id='1', + jsonrpc='2.0', + result=Message( + kind='message', + messageId='message-id', + role=Role.agent, + parts=[], + ), + ).model_dump(mode='json') + + +def build_send_message_request() -> SendMessageRequest: + """Builds a minimal SendMessageRequest.""" + return SendMessageRequest( + id='1', + params=MessageSendParams( + message=Message( + messageId='msg1', + role=Role.user, + parts=[], + ) + ), + ) - # Mock the HTTP response with a minimal valid success response - minimal_success_response = { - 'jsonrpc': '2.0', - 'id': '1', - 'result': { - 'kind': 'message', - 'messageId': 'response-msg', - 'role': 'agent', - 'parts': [], - }, - } - respx.post(test_url).mock( - return_value=httpx.Response(200, json=minimal_success_response) - ) - # Act - await client.send_message( - request=SendMessageRequest( - id='1', - params=MessageSendParams( - message=Message( - messageId='msg1', - role=Role.user, - parts=[], - ) - ), - ) +async def send_message( + client: A2AClient, + url: str, + session_id: str | None = None, +) -> httpx.Request: + """Mocks the response and sends a message using the client.""" + respx.post(url).mock( + return_value=httpx.Response( + 200, + json=build_success_response(), ) + ) + context = ClientCallContext( + state={'sessionId': session_id} if session_id else {} + ) + await client.send_message( + request=build_send_message_request(), + context=context, + ) + return respx.calls.last.request - # Assert - assert len(respx.calls) == 1 - request = respx.calls.last.request - assert 'x-test-header' in request.headers - assert request.headers['x-test-header'] == 'Test-Value-123' + +@pytest.fixture +def store(): + store = InMemoryContextCredentialStore() + yield store @pytest.mark.asyncio -async def test_in_memory_context_credential_store(): +async def test_in_memory_context_credential_store(store): """ - Tests the functionality of the InMemoryContextCredentialStore to ensure - it correctly stores and retrieves credentials based on sessionId. + Verifies that InMemoryContextCredentialStore correctly stores and retrieves + credentials based on the session ID in the client context. """ - # Arrange - store = InMemoryContextCredentialStore() - session_id = 'test-session-123' + session_id = 'session-id' scheme_name = 'test-scheme' credential = 'test-token' - - # Act await store.set_credentials(session_id, scheme_name, credential) # Assert: Successful retrieval @@ -134,211 +137,119 @@ async def test_in_memory_context_credential_store(): ) assert retrieved_credential_empty is None + # Assert: Overwrite the credential when session_id already exists + new_credential = 'new-token' + await store.set_credentials(session_id, scheme_name, new_credential) + assert await store.get_credentials(scheme_name, context) == new_credential + @pytest.mark.asyncio @respx.mock -async def test_auth_interceptor_with_api_key(): +async def test_client_with_simple_interceptor(): """ - Tests the authentication flow with an API key in the header. + Ensures that a custom HeaderInterceptor correctly injects a static header + into outbound HTTP requests from the A2AClient. """ - # Arrange - test_url = 'http://apikey-agent.com/rpc' - session_id = 'user-session-2' - scheme_name = 'apiKeyAuth' - api_key = 'secret-api-key' - - cred_store = InMemoryContextCredentialStore() - await cred_store.set_credentials(session_id, scheme_name, api_key) - - auth_interceptor = AuthInterceptor(credential_service=cred_store) - - api_key_scheme_params = { - 'type': 'apiKey', - 'name': 'X-API-Key', - 'in': In.header, - } - - agent_card = AgentCard( - url=test_url, - name='ApiKeyBot', - description='A bot that requires an API Key', - version='1.0', - defaultInputModes=[], - defaultOutputModes=[], - skills=[], - capabilities=AgentCapabilities(), - security=[{scheme_name: []}], - securitySchemes={ - scheme_name: SecurityScheme( - root=APIKeySecurityScheme(**api_key_scheme_params) - ) - }, - ) + url = 'http://agent.com/rpc' + interceptor = HeaderInterceptor('X-Test-Header', 'Test-Value-123') async with httpx.AsyncClient() as http_client: client = A2AClient( - httpx_client=http_client, - agent_card=agent_card, - interceptors=[auth_interceptor], + httpx_client=http_client, url=url, interceptors=[interceptor] ) + request = await send_message(client, url) + assert request.headers['x-test-header'] == 'Test-Value-123' - minimal_success_response = { - 'jsonrpc': '2.0', - 'id': '1', - 'result': { - 'kind': 'message', - 'messageId': 'response-msg', - 'role': 'agent', - 'parts': [], - }, - } - respx.post(test_url).mock( - return_value=httpx.Response(200, json=minimal_success_response) - ) - - # Act - context = ClientCallContext(state={'sessionId': session_id}) - await client.send_message( - request=SendMessageRequest( - id='1', - params=MessageSendParams( - message=Message( - messageId='msg1', - role=Role.user, - parts=[], - ) - ), - ), - context=context, - ) - - # Assert - assert len(respx.calls) == 1 - request = respx.calls.last.request - assert 'x-api-key' in request.headers - assert request.headers['x-api-key'] == api_key - - -@pytest.mark.asyncio -@respx.mock -async def test_auth_interceptor_with_oauth2_scheme(): - """ - Tests the AuthInterceptor with an OAuth2 security scheme defined in AgentCard. - Ensures it correctly sets the Authorization: Bearer header. - """ - test_url = 'http://oauth-agent.com/rpc' - session_id = 'user-session-oauth' - scheme_name = 'myOAuthScheme' - access_token = 'secret-oauth-access-token' - cred_store = InMemoryContextCredentialStore() - await cred_store.set_credentials(session_id, scheme_name, access_token) +@dataclass +class AuthTestCase: + url: str + session_id: str + scheme_name: str + credential: str + security_scheme: Any + expected_header_key: str + expected_header_value_func: Callable[[str], str] + + +api_key_test_case = AuthTestCase( + url='http://agent.com/rpc', + session_id='session-id', + scheme_name='apikey', + credential='secret-api-key', + security_scheme=APIKeySecurityScheme( + **{ + 'type': 'apiKey', + 'name': 'X-API-Key', + 'in': In.header, + } + ), + expected_header_key='x-api-key', + expected_header_value_func=lambda c: c, +) - auth_interceptor = AuthInterceptor(credential_service=cred_store) - oauth_flows = OAuthFlows( - authorizationCode=AuthorizationCodeOAuthFlow( - authorizationUrl='http://provider.com/auth', - tokenUrl='http://provider.com/token', - scopes={'read': 'Read scope'}, - ) - ) - - agent_card = AgentCard( - url=test_url, - name='OAuthBot', - description='A bot that uses OAuth2', - version='1.0', - defaultInputModes=[], - defaultOutputModes=[], - skills=[], - capabilities=AgentCapabilities(), - security=[{scheme_name: ['read']}], - securitySchemes={ - scheme_name: SecurityScheme( - root=OAuth2SecurityScheme(type='oauth2', flows=oauth_flows) +oauth2_test_case = AuthTestCase( + url='http://agent.com/rpc', + session_id='session-id', + scheme_name='oauth2', + credential='secret-oauth-access-token', + security_scheme=OAuth2SecurityScheme( + type='oauth2', + flows=OAuthFlows( + authorizationCode=AuthorizationCodeOAuthFlow( + authorizationUrl='http://provider.com/auth', + tokenUrl='http://provider.com/token', + scopes={'read': 'Read scope'}, ) - }, - ) - - async with httpx.AsyncClient() as http_client: - client = A2AClient( - httpx_client=http_client, - agent_card=agent_card, - interceptors=[auth_interceptor], - ) - - minimal_success_response = { - 'jsonrpc': '2.0', - 'id': 'oauth_test_1', - 'result': { - 'kind': 'message', - 'messageId': 'response-msg-oauth', - 'role': 'agent', - 'parts': [], - }, - } - respx.post(test_url).mock( - return_value=httpx.Response(200, json=minimal_success_response) - ) + ), + ), + expected_header_key='Authorization', + expected_header_value_func=lambda c: f'Bearer {c}', +) - # Act - context = ClientCallContext(state={'sessionId': session_id}) - await client.send_message( - request=SendMessageRequest( - id='oauth_test_1', - params=MessageSendParams( - message=Message( - messageId='msg-oauth', - role=Role.user, - parts=[], - ) - ), - ), - context=context, - ) - # Assert - assert len(respx.calls) == 1 - request_sent = respx.calls.last.request - assert 'Authorization' in request_sent.headers - assert request_sent.headers['Authorization'] == f'Bearer {access_token}' +oidc_test_case = AuthTestCase( + url='http://agent.com/rpc', + session_id='session-id', + scheme_name='oidc', + credential='secret-oidc-id-token', + security_scheme=OpenIdConnectSecurityScheme( + type='openIdConnect', + openIdConnectUrl='http://provider.com/.well-known/openid-configuration', + ), + expected_header_key='Authorization', + expected_header_value_func=lambda c: f'Bearer {c}', +) @pytest.mark.asyncio +@pytest.mark.parametrize( + 'test_case', [api_key_test_case, oauth2_test_case, oidc_test_case] +) @respx.mock -async def test_auth_interceptor_with_oidc_scheme(): +async def test_auth_interceptor_variants(test_case, store): """ - Tests the AuthInterceptor with an OpenIdConnectSecurityScheme. - Ensures it correctly sets the Authorization: Bearer header. + Parametrized test verifying that AuthInterceptor correctly attaches credentials + based on the defined security scheme in the AgentCard. """ - # Arrange - test_url = 'http://oidc-agent.com/rpc' - session_id = 'user-session-oidc' - scheme_name = 'myOidcScheme' - id_token = 'secret-oidc-id-token' - - cred_store = InMemoryContextCredentialStore() - await cred_store.set_credentials(session_id, scheme_name, id_token) - - auth_interceptor = AuthInterceptor(credential_service=cred_store) - + await store.set_credentials( + test_case.session_id, test_case.scheme_name, test_case.credential + ) + auth_interceptor = AuthInterceptor(credential_service=store) agent_card = AgentCard( - url=test_url, - name='OidcBot', - description='A bot that uses OpenID Connect', + url=test_case.url, + name=f'{test_case.scheme_name}bot', + description=f'A bot that uses {test_case.scheme_name}', version='1.0', defaultInputModes=[], defaultOutputModes=[], skills=[], capabilities=AgentCapabilities(), - security=[{scheme_name: []}], + security=[{test_case.scheme_name: []}], securitySchemes={ - scheme_name: SecurityScheme( - root=OpenIdConnectSecurityScheme( - type='openIdConnect', - openIdConnectUrl='http://provider.com/.well-known/openid-configuration', - ) + test_case.scheme_name: SecurityScheme( + root=test_case.security_scheme ) }, ) @@ -349,39 +260,9 @@ async def test_auth_interceptor_with_oidc_scheme(): agent_card=agent_card, interceptors=[auth_interceptor], ) - - minimal_success_response = { - 'jsonrpc': '2.0', - 'id': 'oidc_test_1', - 'result': { - 'kind': 'message', - 'messageId': 'response-msg-oidc', - 'role': 'agent', - 'parts': [], - }, - } - respx.post(test_url).mock( - return_value=httpx.Response(200, json=minimal_success_response) + request = await send_message( + client, test_case.url, test_case.session_id ) - - # Act - context = ClientCallContext(state={'sessionId': session_id}) - await client.send_message( - request=SendMessageRequest( - id='oidc_test_1', - params=MessageSendParams( - message=Message( - messageId='msg-oidc', - role=Role.user, - parts=[], - ) - ), - ), - context=context, - ) - - # Assert - assert len(respx.calls) == 1 - request_sent = respx.calls.last.request - assert 'Authorization' in request_sent.headers - assert request_sent.headers['Authorization'] == f'Bearer {id_token}' + assert request.headers[ + test_case.expected_header_key + ] == test_case.expected_header_value_func(test_case.credential) From e0a0c5debab1f6aabb0df9d422868e04024d20b2 Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 10 Jul 2025 04:22:27 +0900 Subject: [PATCH 2/5] test(a2a/client/auth/interceptor.py): add tests in test_auth_middleware.py to improve coverage Signed-off-by: Shingo OKAWA --- src/a2a/client/auth/interceptor.py | 4 - tests/client/test_auth_middleware.py | 135 ++++++++++++++++++++++++++- 2 files changed, 130 insertions(+), 9 deletions(-) diff --git a/src/a2a/client/auth/interceptor.py b/src/a2a/client/auth/interceptor.py index a164f135..82919636 100644 --- a/src/a2a/client/auth/interceptor.py +++ b/src/a2a/client/auth/interceptor.py @@ -49,12 +49,8 @@ async def intercept( scheme_def_union = agent_card.securitySchemes.get( scheme_name ) - if not scheme_def_union: - continue scheme_def = scheme_def_union.root - headers = http_kwargs.get('headers', {}) - match scheme_def: # Case 1a: HTTP Bearer scheme with an if guard case HTTPAuthSecurityScheme() if ( diff --git a/tests/client/test_auth_middleware.py b/tests/client/test_auth_middleware.py index ec7394e9..9b241856 100644 --- a/tests/client/test_auth_middleware.py +++ b/tests/client/test_auth_middleware.py @@ -13,6 +13,7 @@ AgentCapabilities, AgentCard, AuthorizationCodeOAuthFlow, + HTTPAuthSecurityScheme, In, Message, MessageSendParams, @@ -103,6 +104,26 @@ def store(): yield store +@pytest.mark.asyncio +async def test_auth_interceptor_skips_when_no_agent_card(store): + """ + Tests that the AuthInterceptor does not modify the request when no AgentCard is provided. + """ + request_payload = {'foo': 'bar'} + http_kwargs = {'fizz': 'buzz'} + auth_interceptor = AuthInterceptor(credential_service=store) + + new_payload, new_kwargs = await auth_interceptor.intercept( + method_name='message/send', + request_payload=request_payload, + http_kwargs=http_kwargs, + agent_card=None, + context=ClientCallContext(state={}), + ) + assert new_payload == request_payload + assert new_kwargs == http_kwargs + + @pytest.mark.asyncio async def test_in_memory_context_credential_store(store): """ @@ -118,25 +139,21 @@ async def test_in_memory_context_credential_store(store): context = ClientCallContext(state={'sessionId': session_id}) retrieved_credential = await store.get_credentials(scheme_name, context) assert retrieved_credential == credential - # Assert: Retrieval with wrong session ID returns None wrong_context = ClientCallContext(state={'sessionId': 'wrong-session'}) retrieved_credential_wrong = await store.get_credentials( scheme_name, wrong_context ) assert retrieved_credential_wrong is None - # Assert: Retrieval with no context returns None retrieved_credential_none = await store.get_credentials(scheme_name, None) assert retrieved_credential_none is None - # Assert: Retrieval with context but no sessionId returns None empty_context = ClientCallContext(state={}) retrieved_credential_empty = await store.get_credentials( scheme_name, empty_context ) assert retrieved_credential_empty is None - # Assert: Overwrite the credential when session_id already exists new_credential = 'new-token' await store.set_credentials(session_id, scheme_name, new_credential) @@ -163,13 +180,24 @@ async def test_client_with_simple_interceptor(): @dataclass class AuthTestCase: + """ + Represents a test scenario for verifying authentication behavior in AuthInterceptor. + """ + url: str + """The endpoint URL of the agent to which the request is sent.""" session_id: str + """The client session ID used to fetch credentials from the credential store.""" scheme_name: str + """The name of the security scheme defined in the agent card.""" credential: str + """The actual credential value (e.g., API key, access token) to be injected.""" security_scheme: Any + """The security scheme object (e.g., APIKeySecurityScheme, OAuth2SecurityScheme, etc.) to define behavior.""" expected_header_key: str + """The expected HTTP header name to be set by the interceptor.""" expected_header_value_func: Callable[[str], str] + """A function that maps the credential to its expected header value (e.g., lambda c: f"Bearer {c}").""" api_key_test_case = AuthTestCase( @@ -223,9 +251,23 @@ class AuthTestCase: ) +bearer_test_case = AuthTestCase( + url='http://agent.com/rpc', + session_id='session-id', + scheme_name='bearer', + credential='bearer-token-123', + security_scheme=HTTPAuthSecurityScheme( + scheme='bearer', + ), + expected_header_key='Authorization', + expected_header_value_func=lambda c: f'Bearer {c}', +) + + @pytest.mark.asyncio @pytest.mark.parametrize( - 'test_case', [api_key_test_case, oauth2_test_case, oidc_test_case] + 'test_case', + [api_key_test_case, oauth2_test_case, oidc_test_case, bearer_test_case], ) @respx.mock async def test_auth_interceptor_variants(test_case, store): @@ -266,3 +308,86 @@ async def test_auth_interceptor_variants(test_case, store): assert request.headers[ test_case.expected_header_key ] == test_case.expected_header_value_func(test_case.credential) + + +@pytest.mark.asyncio +async def test_auth_interceptor_falls_back_on_unsupported_scheme(store): + """ + Tests that AuthInterceptor skips applying headers when the scheme type is unsupported. + This ensures the final return statement is hit. + """ + scheme_name = 'unknown' + session_id = 'session-id' + credential = 'ignored-token' + request_payload = {'foo': 'bar'} + http_kwargs = {'fizz': 'buzz'} + await store.set_credentials(session_id, scheme_name, credential) + auth_interceptor = AuthInterceptor(credential_service=store) + agent_card = AgentCard( + url='http://agent.com/rpc', + name='unknownbot', + description='A bot that uses unsupported scheme', + version='1.0', + defaultInputModes=[], + defaultOutputModes=[], + skills=[], + capabilities=AgentCapabilities(), + security=[{scheme_name: []}], + securitySchemes={ + 'digest': SecurityScheme( + root=HTTPAuthSecurityScheme( + scheme='digest', + type='http', + ), + ), + }, + ) + + new_payload, new_kwargs = await auth_interceptor.intercept( + method_name='message/send', + request_payload=request_payload, + http_kwargs=http_kwargs, + agent_card=agent_card, + context=ClientCallContext(state={'sessionId': session_id}), + ) + assert new_payload == request_payload + assert new_kwargs == http_kwargs + + +@pytest.mark.asyncio +async def test_auth_interceptor_skips_when_scheme_not_in_security_schemes( + store, +): + """ + Tests that AuthInterceptor skips a scheme if it's listed in security requirements + but not defined in securitySchemes. + """ + scheme_name = 'missing' + session_id = 'session-id' + credential = 'dummy-token' + request_payload = {'foo': 'bar'} + http_kwargs = {'fizz': 'buzz'} + await store.set_credentials(session_id, scheme_name, credential) + auth_interceptor = AuthInterceptor(credential_service=store) + agent_card = AgentCard( + url='http://agent.com/rpc', + name='missingbot', + description='A bot that uses missing scheme definition', + version='1.0', + defaultInputModes=[], + defaultOutputModes=[], + skills=[], + capabilities=AgentCapabilities(), + security=[{scheme_name: []}], + securitySchemes={}, + ) + + new_payload, new_kwargs = await auth_interceptor.intercept( + method_name='message/send', + request_payload=request_payload, + http_kwargs=http_kwargs, + agent_card=agent_card, + context=ClientCallContext(state={'sessionId': session_id}), + ) + assert new_payload == request_payload + assert new_kwargs == http_kwargs From 9ddd862ccb7b954a2558a8302d724a1e7a94f940 Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 10 Jul 2025 04:44:19 +0900 Subject: [PATCH 3/5] fix: lint Signed-off-by: Shingo OKAWA --- src/a2a/client/auth/interceptor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/a2a/client/auth/interceptor.py b/src/a2a/client/auth/interceptor.py index 82919636..a164f135 100644 --- a/src/a2a/client/auth/interceptor.py +++ b/src/a2a/client/auth/interceptor.py @@ -49,8 +49,12 @@ async def intercept( scheme_def_union = agent_card.securitySchemes.get( scheme_name ) + if not scheme_def_union: + continue scheme_def = scheme_def_union.root + headers = http_kwargs.get('headers', {}) + match scheme_def: # Case 1a: HTTP Bearer scheme with an if guard case HTTPAuthSecurityScheme() if ( From a0418d278feef42be20b7e8242f84a03a163aa7a Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 10 Jul 2025 04:54:49 +0900 Subject: [PATCH 4/5] test: remove duplicated test case Signed-off-by: Shingo OKAWA --- tests/client/test_auth_middleware.py | 44 ---------------------------- 1 file changed, 44 deletions(-) diff --git a/tests/client/test_auth_middleware.py b/tests/client/test_auth_middleware.py index 9b241856..8f31b249 100644 --- a/tests/client/test_auth_middleware.py +++ b/tests/client/test_auth_middleware.py @@ -310,50 +310,6 @@ async def test_auth_interceptor_variants(test_case, store): ] == test_case.expected_header_value_func(test_case.credential) -@pytest.mark.asyncio -async def test_auth_interceptor_falls_back_on_unsupported_scheme(store): - """ - Tests that AuthInterceptor skips applying headers when the scheme type is unsupported. - This ensures the final return statement is hit. - """ - scheme_name = 'unknown' - session_id = 'session-id' - credential = 'ignored-token' - request_payload = {'foo': 'bar'} - http_kwargs = {'fizz': 'buzz'} - await store.set_credentials(session_id, scheme_name, credential) - auth_interceptor = AuthInterceptor(credential_service=store) - agent_card = AgentCard( - url='http://agent.com/rpc', - name='unknownbot', - description='A bot that uses unsupported scheme', - version='1.0', - defaultInputModes=[], - defaultOutputModes=[], - skills=[], - capabilities=AgentCapabilities(), - security=[{scheme_name: []}], - securitySchemes={ - 'digest': SecurityScheme( - root=HTTPAuthSecurityScheme( - scheme='digest', - type='http', - ), - ), - }, - ) - - new_payload, new_kwargs = await auth_interceptor.intercept( - method_name='message/send', - request_payload=request_payload, - http_kwargs=http_kwargs, - agent_card=agent_card, - context=ClientCallContext(state={'sessionId': session_id}), - ) - assert new_payload == request_payload - assert new_kwargs == http_kwargs - - @pytest.mark.asyncio async def test_auth_interceptor_skips_when_scheme_not_in_security_schemes( store, From 3be426b90d48104c0d2a5c028dc8f16f8e201791 Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Fri, 11 Jul 2025 00:41:42 +0900 Subject: [PATCH 5/5] refactor: instantiate APIKeySecurityScheme with direct kwargs Signed-off-by: Shingo OKAWA --- tests/client/test_auth_middleware.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/client/test_auth_middleware.py b/tests/client/test_auth_middleware.py index 8f31b249..34db5b95 100644 --- a/tests/client/test_auth_middleware.py +++ b/tests/client/test_auth_middleware.py @@ -206,11 +206,9 @@ class AuthTestCase: scheme_name='apikey', credential='secret-api-key', security_scheme=APIKeySecurityScheme( - **{ - 'type': 'apiKey', - 'name': 'X-API-Key', - 'in': In.header, - } + type='apiKey', + name='X-API-Key', + in_=In.header, ), expected_header_key='x-api-key', expected_header_value_func=lambda c: c,