1
+ """
2
+ Handler for OAuth 2.0 Authorization endpoint.
3
+
4
+ Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
5
+ """
6
+
7
+ import re
8
+ from urllib .parse import urlparse , urlunparse , urlencode
9
+ from typing import Any , Callable , Dict , List , Literal , Optional
10
+ from urllib .parse import urlencode , parse_qs
11
+
12
+ from fastapi import Request , Response
13
+ from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , ValidationError
14
+ from pydantic_core import Url
15
+ from starlette .responses import JSONResponse , RedirectResponse
16
+
17
+ from mcp .server .auth .errors import (
18
+ InvalidClientError ,
19
+ InvalidRequestError ,
20
+ UnsupportedResponseTypeError ,
21
+ ServerError ,
22
+ OAuthError ,
23
+ )
24
+ from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider
25
+ from mcp .shared .auth import OAuthClientInformationFull
26
+
27
+
28
+ class AuthorizationRequest (BaseModel ):
29
+ """
30
+ Model for the authorization request parameters.
31
+
32
+ Corresponds to request schema in authorizationHandler in src/server/auth/handlers/authorize.ts
33
+ """
34
+ client_id : str = Field (..., description = "The client ID" )
35
+ redirect_uri : AnyHttpUrl | None = Field (..., description = "URL to redirect to after authorization" )
36
+
37
+ response_type : Literal ["code" ] = Field (..., description = "Must be 'code' for authorization code flow" )
38
+ code_challenge : str = Field (..., description = "PKCE code challenge" )
39
+ code_challenge_method : Literal ["S256" ] = Field ("S256" , description = "PKCE code challenge method" )
40
+ state : Optional [str ] = Field (None , description = "Optional state parameter" )
41
+ scope : Optional [str ] = Field (None , description = "Optional scope parameter" )
42
+
43
+ class Config :
44
+ extra = "ignore"
45
+
46
+ def validate_scope (requested_scope : str | None , client : OAuthClientInformationFull ) -> list [str ] | None :
47
+ if requested_scope is None :
48
+ return None
49
+ requested_scopes = requested_scope .split (" " )
50
+ allowed_scopes = [] if client .scope is None else client .scope .split (" " )
51
+ for scope in requested_scopes :
52
+ if scope not in allowed_scopes :
53
+ raise InvalidRequestError (f"Client was not registered with scope { scope } " )
54
+ return requested_scopes
55
+
56
+ def validate_redirect_uri (auth_request : AuthorizationRequest , client : OAuthClientInformationFull ) -> AnyHttpUrl :
57
+ if auth_request .redirect_uri is not None :
58
+ # Validate redirect_uri against client's registered redirect URIs
59
+ if auth_request .redirect_uri not in client .redirect_uris :
60
+ raise InvalidRequestError (
61
+ f"Redirect URI '{ auth_request .redirect_uri } ' not registered for client"
62
+ )
63
+ return auth_request .redirect_uri
64
+ elif len (client .redirect_uris ) == 1 :
65
+ return client .redirect_uris [0 ]
66
+ else :
67
+ raise InvalidRequestError ("redirect_uri must be specified when client has multiple registered URIs" )
68
+
69
+ def create_authorization_handler (provider : OAuthServerProvider ) -> Callable :
70
+ """
71
+ Create a handler for the OAuth 2.0 Authorization endpoint.
72
+
73
+ Corresponds to authorizationHandler in src/server/auth/handlers/authorize.ts
74
+
75
+ """
76
+
77
+ async def authorization_handler (request : Request ) -> Response :
78
+ """
79
+ Handler for the OAuth 2.0 Authorization endpoint.
80
+ """
81
+ # Validate request parameters
82
+ try :
83
+ if request .method == "GET" :
84
+ auth_request = AuthorizationRequest .model_validate (request .query_params )
85
+ else :
86
+ auth_request = AuthorizationRequest .model_validate_json (await request .body ())
87
+ except ValidationError as e :
88
+ raise InvalidRequestError (str (e ))
89
+
90
+ # Get client information
91
+ try :
92
+ client = await provider .clients_store .get_client (auth_request .client_id )
93
+ except OAuthError as e :
94
+ # TODO: proper error rendering
95
+ raise InvalidClientError (str (e ))
96
+
97
+ if not client :
98
+ raise InvalidClientError (f"Client ID '{ auth_request .client_id } ' not found" )
99
+
100
+
101
+ # do validation which is dependent on the client configuration
102
+ redirect_uri = validate_redirect_uri (auth_request , client )
103
+ scopes = validate_scope (auth_request .scope , client )
104
+
105
+ auth_params = AuthorizationParams (
106
+ state = auth_request .state ,
107
+ scopes = scopes ,
108
+ code_challenge = auth_request .code_challenge ,
109
+ redirect_uri = redirect_uri ,
110
+ )
111
+
112
+ response = RedirectResponse (url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" })
113
+
114
+ try :
115
+ # Let the provider handle the authorization flow
116
+ await provider .authorize (client , auth_params , response )
117
+
118
+ return response
119
+ except Exception as e :
120
+ return RedirectResponse (
121
+ url = create_error_redirect (redirect_uri , e , auth_request .state ),
122
+ status_code = 302 ,
123
+ headers = {"Cache-Control" : "no-store" },
124
+ )
125
+
126
+ return authorization_handler
127
+
128
+ def create_error_redirect (redirect_uri : AnyUrl , error : Exception , state : Optional [str ]) -> str :
129
+ parsed_uri = urlparse (str (redirect_uri ))
130
+ if isinstance (error , OAuthError ):
131
+ query_params = {
132
+ "error" : error .error_code ,
133
+ "error_description" : str (error )
134
+ }
135
+ else :
136
+ query_params = {
137
+ "error" : "internal_error" ,
138
+ "error_description" : "An unknown error occurred"
139
+ }
140
+ # TODO: should we add error_uri?
141
+ # if error.error_uri:
142
+ # query_params["error_uri"] = str(error.error_uri)
143
+ if state :
144
+ query_params ["state" ] = state
145
+
146
+ new_query = urlencode (query_params )
147
+ if parsed_uri .query :
148
+ new_query = f"{ parsed_uri .query } &{ new_query } "
149
+
150
+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments