4
4
Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
5
5
"""
6
6
7
- import re
8
- from urllib .parse import urlparse , urlunparse , urlencode
9
- from typing import Any , Callable , Dict , List , Literal , Optional
10
- from urllib .parse import urlencode , parse_qs
7
+ from typing import Callable , Literal , Optional
8
+ from urllib .parse import parse_qs , urlencode , urlparse , urlunparse
11
9
12
- from starlette .requests import Request
13
- from starlette .responses import JSONResponse , RedirectResponse , Response
14
10
from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , ValidationError
15
- from pydantic_core import Url
11
+ from starlette .requests import Request
12
+ from starlette .responses import RedirectResponse , Response
16
13
17
14
from mcp .server .auth .errors import (
18
- InvalidClientError ,
15
+ InvalidClientError ,
19
16
InvalidRequestError ,
20
- UnsupportedResponseTypeError ,
21
- ServerError ,
22
17
OAuthError ,
23
18
)
24
19
from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider
28
23
class AuthorizationRequest (BaseModel ):
29
24
"""
30
25
Model for the authorization request parameters.
31
-
26
+
32
27
Corresponds to request schema in authorizationHandler in src/server/auth/handlers/authorize.ts
33
28
"""
29
+
34
30
client_id : str = Field (..., description = "The client ID" )
35
- redirect_uri : AnyHttpUrl | None = Field (..., description = "URL to redirect to after authorization" )
31
+ redirect_uri : AnyHttpUrl | None = Field (
32
+ ..., description = "URL to redirect to after authorization"
33
+ )
36
34
37
- response_type : Literal ["code" ] = Field (..., description = "Must be 'code' for authorization code flow" )
35
+ response_type : Literal ["code" ] = Field (
36
+ ..., description = "Must be 'code' for authorization code flow"
37
+ )
38
38
code_challenge : str = Field (..., description = "PKCE code challenge" )
39
- code_challenge_method : Literal ["S256" ] = Field ("S256" , description = "PKCE code challenge method, must be S256" )
39
+ code_challenge_method : Literal ["S256" ] = Field (
40
+ "S256" , description = "PKCE code challenge method, must be S256"
41
+ )
40
42
state : Optional [str ] = Field (None , description = "Optional state parameter" )
41
- scope : Optional [str ] = Field (None , description = "Optional scope; if specified, should be a space-separated list of scope strings" )
42
-
43
+ scope : Optional [str ] = Field (
44
+ None ,
45
+ description = "Optional scope; if specified, should be a space-separated list of scope strings" ,
46
+ )
47
+
43
48
class Config :
44
49
extra = "ignore"
45
50
46
- def validate_scope (requested_scope : str | None , client : OAuthClientInformationFull ) -> list [str ] | None :
51
+
52
+ def validate_scope (
53
+ requested_scope : str | None , client : OAuthClientInformationFull
54
+ ) -> list [str ] | None :
47
55
if requested_scope is None :
48
56
return None
49
57
requested_scopes = requested_scope .split (" " )
@@ -53,7 +61,10 @@ def validate_scope(requested_scope: str | None, client: OAuthClientInformationFu
53
61
raise InvalidRequestError (f"Client was not registered with scope { scope } " )
54
62
return requested_scopes
55
63
56
- def validate_redirect_uri (auth_request : AuthorizationRequest , client : OAuthClientInformationFull ) -> AnyHttpUrl :
64
+
65
+ def validate_redirect_uri (
66
+ auth_request : AuthorizationRequest , client : OAuthClientInformationFull
67
+ ) -> AnyHttpUrl :
57
68
if auth_request .redirect_uri is not None :
58
69
# Validate redirect_uri against client's registered redirect URIs
59
70
if auth_request .redirect_uri not in client .redirect_uris :
@@ -64,16 +75,19 @@ def validate_redirect_uri(auth_request: AuthorizationRequest, client: OAuthClien
64
75
elif len (client .redirect_uris ) == 1 :
65
76
return client .redirect_uris [0 ]
66
77
else :
67
- raise InvalidRequestError ("redirect_uri must be specified when client has multiple registered URIs" )
78
+ raise InvalidRequestError (
79
+ "redirect_uri must be specified when client has multiple registered URIs"
80
+ )
81
+
68
82
69
83
def create_authorization_handler (provider : OAuthServerProvider ) -> Callable :
70
84
"""
71
85
Create a handler for the OAuth 2.0 Authorization endpoint.
72
-
86
+
73
87
Corresponds to authorizationHandler in src/server/auth/handlers/authorize.ts
74
88
75
89
"""
76
-
90
+
77
91
async def authorization_handler (request : Request ) -> Response :
78
92
"""
79
93
Handler for the OAuth 2.0 Authorization endpoint.
@@ -91,74 +105,79 @@ async def authorization_handler(request: Request) -> Response:
91
105
auth_request = AuthorizationRequest .model_validate (params )
92
106
except ValidationError as e :
93
107
raise InvalidRequestError (str (e ))
94
-
108
+
95
109
# Get client information
96
110
try :
97
111
client = await provider .clients_store .get_client (auth_request .client_id )
98
112
except OAuthError as e :
99
113
# TODO: proper error rendering
100
114
raise InvalidClientError (str (e ))
101
-
115
+
102
116
if not client :
103
117
raise InvalidClientError (f"Client ID '{ auth_request .client_id } ' not found" )
104
-
105
-
118
+
106
119
# do validation which is dependent on the client configuration
107
120
redirect_uri = validate_redirect_uri (auth_request , client )
108
121
scopes = validate_scope (auth_request .scope , client )
109
-
122
+
110
123
auth_params = AuthorizationParams (
111
124
state = auth_request .state ,
112
125
scopes = scopes ,
113
126
code_challenge = auth_request .code_challenge ,
114
127
redirect_uri = redirect_uri ,
115
128
)
116
-
129
+
117
130
try :
118
131
# Let the provider handle the authorization flow
119
- authorization_code = await provider .create_authorization_code (client , auth_params )
120
- response = RedirectResponse (url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" })
121
-
132
+ authorization_code = await provider .create_authorization_code (
133
+ client , auth_params
134
+ )
135
+ response = RedirectResponse (
136
+ url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
137
+ )
138
+
122
139
# Redirect with code
123
140
parsed_uri = urlparse (str (auth_params .redirect_uri ))
124
141
query_params = [(k , v ) for k , vs in parse_qs (parsed_uri .query ) for v in vs ]
125
142
query_params .append (("code" , authorization_code ))
126
143
if auth_params .state :
127
144
query_params .append (("state" , auth_params .state ))
128
-
129
- redirect_url = urlunparse (parsed_uri ._replace (query = urlencode (query_params )))
145
+
146
+ redirect_url = urlunparse (
147
+ parsed_uri ._replace (query = urlencode (query_params ))
148
+ )
130
149
response .headers ["location" ] = redirect_url
131
-
150
+
132
151
return response
133
152
except Exception as e :
134
153
return RedirectResponse (
135
154
url = create_error_redirect (redirect_uri , e , auth_request .state ),
136
155
status_code = 302 ,
137
156
headers = {"Cache-Control" : "no-store" },
138
- )
139
-
157
+ )
158
+
140
159
return authorization_handler
141
160
142
- def create_error_redirect (redirect_uri : AnyUrl , error : Exception , state : Optional [str ]) -> str :
161
+
162
+ def create_error_redirect (
163
+ redirect_uri : AnyUrl , error : Exception , state : Optional [str ]
164
+ ) -> str :
143
165
parsed_uri = urlparse (str (redirect_uri ))
144
166
if isinstance (error , OAuthError ):
145
- query_params = {
146
- "error" : error .error_code ,
147
- "error_description" : str (error )
148
- }
167
+ query_params = {"error" : error .error_code , "error_description" : str (error )}
149
168
else :
150
169
query_params = {
151
170
"error" : "internal_error" ,
152
- "error_description" : "An unknown error occurred"
171
+ "error_description" : "An unknown error occurred" ,
153
172
}
154
173
# TODO: should we add error_uri?
155
174
# if error.error_uri:
156
175
# query_params["error_uri"] = str(error.error_uri)
157
176
if state :
158
177
query_params ["state" ] = state
159
-
178
+
160
179
new_query = urlencode (query_params )
161
180
if parsed_uri .query :
162
181
new_query = f"{ parsed_uri .query } &{ new_query } "
163
-
164
- return urlunparse (parsed_uri ._replace (query = new_query ))
182
+
183
+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments