Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions aiopenapi3/v20/glue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import List, Union, cast
import json
import urllib.parse

import httpx
import pydantic
Expand All @@ -11,6 +10,11 @@

from .parameter import Parameter

try:
import httpx_auth
except:
httpx_auth = None


class Request(RequestBase):
@property
Expand Down Expand Up @@ -83,17 +87,23 @@ def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):

if ss.type == "basic":
value = cast(List[str], value)
self.req.auth = httpx.BasicAuth(*value)
self.req.auth = httpx_auth.Basic(*value) if httpx_auth else httpx.BasicAuth(*value)

value = cast(str, value)
if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value
if httpx_auth:
self.req.auth = httpx_auth.QueryApiKey(value, getattr(ss, "name", None))
else:
self.req.params[ss.name] = value

if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value
if httpx_auth:
self.req.auth = httpx_auth.HeaderApiKey(value, getattr(ss, "name", None))
else:
self.req.headers[ss.name] = value

def _prepare_parameters(self, provided):
provided = provided or dict()
Expand Down
121 changes: 100 additions & 21 deletions aiopenapi3/v30/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import urllib.parse

import httpx
try:
import httpx_auth
from httpx_auth.authentication import SupportMultiAuth
except:
httpx_auth = None
import inspect
import pydantic
import pydantic.json

Expand Down Expand Up @@ -77,33 +83,106 @@ def _prepare_security(self):

def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):
ss = self.root.components.securitySchemes[scheme]
if httpx_auth:
auth_methods = {
name.lower(): getattr(httpx_auth, name)
for name in httpx_auth.__all__
if inspect.isclass((class_ := getattr(httpx_auth, name)))
if issubclass(class_, httpx.Auth)
}
add_auths = []

if ss.type == "oauth2":
# NOTE: refresh_url is not currently supported by httpx_auth
# REF: https://github.com/Colin-b/httpx_auth/issues/17
if flow := getattr(ss.flows, "implicit", None):
add_auths.append(httpx_auth.OAuth2Implicit(
**value,
authorization_url=flow.authorizationUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "password", None):
add_auths.append(httpx_auth.OAuth2ResourceOwnerPasswordCredentials(
**value,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "clientCredentials", None):
add_auths.append(httpx_auth.OAuth2ClientCredentials(
**value,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "authorizationCode", None):
add_auths.append(httpx_auth.OAuth2AuthorizationCode(
**value,
authorization_url=flow.authorizationUrl,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))

if ss.type == "http":
if auth := auth_methods.get(ss.scheme_, None):
if isinstance(value, tuple):
add_auths.append(auth(*value))
if isinstance(value, dict):
add_auths.append(auth(**value))
if ss.scheme_ == "bearer":
add_auths.append(auth_methods["headerapikey"](
f"{ss.bearerFormat or 'Bearer'} {value}",
"Authorization"
))

value = cast(str, value)

if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value

if ss.type == "apiKey":
if auth := auth_methods.get((ss.in_+ss.type).lower(), None):
add_auths.append(auth(value, getattr(ss, "name", None)))

if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}

for auth in add_auths:
if self.req.auth and isinstance(self.req.auth, SupportMultiAuth):
self.req.auth += auth
else:
self.req.auth = auth
else:
if ss.type == "http" and ss.scheme_ == "basic":
self.req.auth = httpx.BasicAuth(*value)

if ss.type == "http" and ss.scheme_ == "basic":
self.req.auth = httpx.BasicAuth(*value)

if ss.type == "http" and ss.scheme_ == "digest":
self.req.auth = httpx.DigestAuth(*value)
if ss.type == "http" and ss.scheme_ == "digest":
self.req.auth = httpx.DigestAuth(*value)

value = cast(str, value)
if ss.type == "http" and ss.scheme_ == "bearer":
header = ss.bearerFormat or "Bearer {}"
self.req.headers["Authorization"] = header.format(value)
value = cast(str, value)
if ss.type == "http" and ss.scheme_ == "bearer":
header = ss.bearerFormat or "Bearer {}"
self.req.headers["Authorization"] = header.format(value)

if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value
if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value

if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value
if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value

if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value
if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value

if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}
if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}


def _prepare_parameters(self, provided):
"""
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
fastapi~=0.95.0
httpx~=0.23.3
httpx~=0.24.0
hypercorn~=0.14.3
pydantic~=1.10.7
pydantic[email]
pytest~=7.2.2
PyYAML~=6.0
uvloop~=0.17.0
uvloop~=0.17.0; sys_platform != 'win32'
yarl~=1.8.2