44Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
55"""
66
7- import re
8- from urllib .parse import urlparse , urlunparse , urlencode
9- from typing import Any , Callable , Dict , List , Literal , Optional
10- from urllib .parse import urlencode , parse_qs
7+ from typing import Literal
8+ from urllib .parse import urlencode , urlparse , urlunparse
119
12- from starlette .requests import Request
13- from starlette .responses import JSONResponse , RedirectResponse , Response
1410from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , ValidationError
15- from pydantic_core import Url
11+ from starlette .requests import Request
12+ from starlette .responses import RedirectResponse , Response
1613
1714from mcp .server .auth .errors import (
18- InvalidClientError ,
15+ InvalidClientError ,
1916 InvalidRequestError ,
20- UnsupportedResponseTypeError ,
21- ServerError ,
2217 OAuthError ,
2318)
19+ from mcp .server .auth .handlers .types import HandlerFn
2420from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider
25- from mcp .shared .auth import OAuthClientInformationFull
2621
2722
2823class AuthorizationRequest (BaseModel ):
2924 """
3025 Model for the authorization request parameters.
31-
32- Corresponds to request schema in authorizationHandler in src/server/auth/handlers/authorize.ts
26+
27+ Corresponds to request schema in authorizationHandler in
28+ src/server/auth/handlers/authorize.ts
3329 """
30+
3431 client_id : str = Field (..., description = "The client ID" )
35- redirect_uri : AnyHttpUrl | None = Field (..., description = "URL to redirect to after authorization" )
32+ redirect_uri : AnyHttpUrl | None = Field (
33+ ..., description = "URL to redirect to after authorization"
34+ )
3635
37- response_type : Literal ["code" ] = Field (..., description = "Must be 'code' for authorization code flow" )
36+ response_type : Literal ["code" ] = Field (
37+ ..., description = "Must be 'code' for authorization code flow"
38+ )
3839 code_challenge : str = Field (..., description = "PKCE code challenge" )
39- code_challenge_method : Literal ["S256" ] = Field ("S256" , description = "PKCE code challenge method" )
40- state : Optional [str ] = Field (None , description = "Optional state parameter" )
41- scope : Optional [str ] = Field (None , description = "Optional scope parameter" )
42-
40+ code_challenge_method : Literal ["S256" ] = Field (
41+ "S256" , description = "PKCE code challenge method"
42+ )
43+ state : str | None = Field (None , description = "Optional state parameter" )
44+ scope : str | None = Field (None , description = "Optional scope parameter" )
45+
4346 class Config :
4447 extra = "ignore"
4548
49+
4650def validate_scope (requested_scope : str | None , scope : str | None ) -> list [str ] | None :
4751 if requested_scope is None :
4852 return None
@@ -53,7 +57,10 @@ def validate_scope(requested_scope: str | None, scope: str | None) -> list[str]
5357 raise InvalidRequestError (f"Client was not registered with scope { scope } " )
5458 return requested_scopes
5559
56- def validate_redirect_uri (redirect_uri : AnyHttpUrl | None , redirect_uris : list [AnyHttpUrl ]) -> AnyHttpUrl :
60+
61+ def validate_redirect_uri (
62+ redirect_uri : AnyHttpUrl | None , redirect_uris : list [AnyHttpUrl ]
63+ ) -> AnyHttpUrl :
5764 if not redirect_uris :
5865 raise InvalidClientError ("Client has no registered redirect URIs" )
5966
@@ -67,16 +74,19 @@ def validate_redirect_uri(redirect_uri: AnyHttpUrl | None, redirect_uris: list[A
6774 elif len (redirect_uris ) == 1 :
6875 return redirect_uris [0 ]
6976 else :
70- raise InvalidRequestError ("redirect_uri must be specified when client has multiple registered URIs" )
77+ raise InvalidRequestError (
78+ "redirect_uri must be specified when client has multiple registered URIs"
79+ )
80+
7181
72- def create_authorization_handler (provider : OAuthServerProvider ) -> Callable :
82+ def create_authorization_handler (provider : OAuthServerProvider ) -> HandlerFn :
7383 """
7484 Create a handler for the OAuth 2.0 Authorization endpoint.
75-
85+
7686 Corresponds to authorizationHandler in src/server/auth/handlers/authorize.ts
7787
7888 """
79-
89+
8090 async def authorization_handler (request : Request ) -> Response :
8191 """
8292 Handler for the OAuth 2.0 Authorization endpoint.
@@ -94,65 +104,64 @@ async def authorization_handler(request: Request) -> Response:
94104 auth_request = AuthorizationRequest .model_validate (params )
95105 except ValidationError as e :
96106 raise InvalidRequestError (str (e ))
97-
107+
98108 # Get client information
99- try :
100- client = await provider .clients_store .get_client (auth_request .client_id )
101- except OAuthError as e :
102- # TODO: proper error rendering
103- raise InvalidClientError (str (e ))
104-
109+ client = await provider .clients_store .get_client (auth_request .client_id )
110+
105111 if not client :
106112 raise InvalidClientError (f"Client ID '{ auth_request .client_id } ' not found" )
107-
108-
113+
109114 # do validation which is dependent on the client configuration
110- redirect_uri = validate_redirect_uri (auth_request .redirect_uri , client .redirect_uris )
115+ redirect_uri = validate_redirect_uri (
116+ auth_request .redirect_uri , client .redirect_uris
117+ )
111118 scopes = validate_scope (auth_request .scope , client .scope )
112-
119+
113120 auth_params = AuthorizationParams (
114121 state = auth_request .state ,
115122 scopes = scopes ,
116123 code_challenge = auth_request .code_challenge ,
117124 redirect_uri = redirect_uri ,
118125 )
119-
120- response = RedirectResponse (url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" })
121-
126+
127+ response = RedirectResponse (
128+ url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
129+ )
130+
122131 try :
123132 # Let the provider handle the authorization flow
124133 await provider .authorize (client , auth_params , response )
125-
134+
126135 return response
127136 except Exception as e :
128137 return RedirectResponse (
129138 url = create_error_redirect (redirect_uri , e , auth_request .state ),
130139 status_code = 302 ,
131140 headers = {"Cache-Control" : "no-store" },
132- )
133-
141+ )
142+
134143 return authorization_handler
135144
136- def create_error_redirect (redirect_uri : AnyUrl , error : Exception , state : Optional [str ]) -> str :
145+
146+ def create_error_redirect (
147+ redirect_uri : AnyUrl , error : Exception , state : str | None
148+ ) -> str :
137149 parsed_uri = urlparse (str (redirect_uri ))
138150 if isinstance (error , OAuthError ):
139- query_params = {
140- "error" : error .error_code ,
141- "error_description" : str (error )
142- }
151+ query_params = {"error" : error .error_code , "error_description" : str (error )}
143152 else :
144153 query_params = {
145154 "error" : "internal_error" ,
146- "error_description" : "An unknown error occurred"
155+ "error_description" : "An unknown error occurred" ,
147156 }
148157 # TODO: should we add error_uri?
149158 # if error.error_uri:
150159 # query_params["error_uri"] = str(error.error_uri)
151160 if state :
152161 query_params ["state" ] = state
153-
162+
154163 new_query = urlencode (query_params )
155164 if parsed_uri .query :
156165 new_query = f"{ parsed_uri .query } &{ new_query } "
157-
158- return urlunparse (parsed_uri ._replace (query = new_query ))
166+
167+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments