Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions aiopenapi3/v20/glue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import List, Union, cast
import json
import urllib.parse

import httpx
import httpx_auth
import pydantic

from ..base import SchemaBase, ParameterBase
Expand Down Expand Up @@ -83,17 +82,17 @@ def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):

if ss.type == "basic":
value = cast(List[str], value)
self.req.auth = httpx.BasicAuth(*value)
self.req.auth = httpx_auth.Basic(*value)

value = cast(str, value)
if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value
httpx_auth.QueryApiKey(value, getattr(ss, "name", None))

if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value
httpx_auth.HeaderApiKey(value, getattr(ss, "name", None))

def _prepare_parameters(self, provided):
provided = provided or dict()
Expand Down
83 changes: 65 additions & 18 deletions aiopenapi3/v30/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import urllib.parse

import httpx
import httpx_auth
import inspect
import pydantic
import pydantic.json

Expand Down Expand Up @@ -77,33 +79,78 @@ def _prepare_security(self):

def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):
ss = self.root.components.securitySchemes[scheme]

if ss.type == "http" and ss.scheme_ == "basic":
self.req.auth = httpx.BasicAuth(*value)

if ss.type == "http" and ss.scheme_ == "digest":
self.req.auth = httpx.DigestAuth(*value)

auth_methods = {
name.lower(): getattr(httpx_auth, name)
for name in httpx_auth.__all__
if inspect.isclass((class_ := getattr(httpx_auth, name)))
if issubclass(class_, httpx.Auth)
}
add_auths = []

if ss.type == "oauth2":
# NOTE: refresh_url is not currently supported by httpx_auth
# REF: https://github.com/Colin-b/httpx_auth/issues/17
if flow := getattr(ss.flows, "implicit", None):
add_auths.append(httpx_auth.OAuth2Implicit(
**value,
authorization_url=flow.authorizationUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "password", None):
add_auths.append(httpx_auth.OAuth2ResourceOwnerPasswordCredentials(
**value,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "clientCredentials", None):
add_auths.append(httpx_auth.OAuth2ClientCredentials(
**value,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "authorizationCode", None):
add_auths.append(httpx_auth.OAuth2AuthorizationCode(
**value,
authorization_url=flow.authorizationUrl,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))

if ss.type == "http":
if auth := auth_methods.get(ss.scheme_, None):
if isinstance(value, tuple):
add_auths.append(auth(*value))
if isinstance(value, dict):
add_auths.append(auth(**value))
if ss.scheme_ == "bearer":
add_auths.append(auth_methods["headerapikey"](
f"{ss.bearerFormat or 'Bearer'} {value}",
"Authorization"
))

value = cast(str, value)
if ss.type == "http" and ss.scheme_ == "bearer":
header = ss.bearerFormat or "Bearer {}"
self.req.headers["Authorization"] = header.format(value)


if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value

if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value

if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value
if auth := auth_methods.get((ss.in_+ss.type).lower(), None):
add_auths.append(auth(value, getattr(ss, "name", None)))

if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}

for auth in add_auths:
if self.req.auth:
self.req.auth += auth
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually works?
I guess test_paths_security_combined should validate this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although conditionally. It's a feature of httpx_auth. Ref: https://github.com/Colin-b/httpx_auth/blob/60bb9a7f4beb4b370398ae8fcb7d4402daa0b0bd/httpx_auth/authentication.py#L93

I think the code should be fine as is since all of that auth objects will be subclasses of httpx_auth.authentication.SupportMultiAuth, however I will add checks for this condition to be safe.

else:
self.req.auth = auth


def _prepare_parameters(self, provided):
"""
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
fastapi~=0.95.0
httpx~=0.23.3
httpx~=0.24.0
httpx-auth~=0.17.0
hypercorn~=0.14.3
pydantic~=1.10.7
pydantic[email]
pytest~=7.2.2
PyYAML~=6.0
uvloop~=0.17.0
uvloop~=0.17.0; sys_platform != 'win32'
yarl~=1.8.2
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ install_requires =
pydantic[email]
yarl
httpx
httpx-auth
more-itertools
typing_extensions; python_version<"3.8"
pathlib3x; python_version<"3.9"
Expand Down