Skip to content

handle oauthlib errors on create token and revoke token requests #1210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Aleksander Vaskevich
Alessandro De Angelis
Alex Szabó
Allisson Azevedo
Andrej Zbín
Andrew Chen Wang
Anvesh Agarwal
Aristóbulo Meneses
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed
* Remove upper version bound on Django, to allow upgrading to Django 4.1.1 bugfix release.
* Handle oauthlib errors on create token requests

## [2.1.0] 2022-06-19

Expand Down
14 changes: 8 additions & 6 deletions oauth2_provider/oauth2_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,14 @@ def create_token_response(self, request):
uri, http_method, body, headers = self._extract_params(request)
extra_credentials = self._get_extra_credentials(request)

headers, body, status = self.server.create_token_response(
uri, http_method, body, headers, extra_credentials
)
uri = headers.get("Location", None)

return uri, headers, body, status
try:
headers, body, status = self.server.create_token_response(
uri, http_method, body, headers, extra_credentials
)
uri = headers.get("Location", None)
return uri, headers, body, status
except OAuth2Error as exc:
return None, exc.headers, exc.json, exc.status_code

def create_revocation_response(self, request):
"""
Expand Down
95 changes: 94 additions & 1 deletion tests/test_oauth2_backends.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import base64
import json

import pytest
from django.contrib.auth import get_user_model
from django.test import RequestFactory, TestCase
from django.utils.timezone import now, timedelta

from oauth2_provider.backends import get_oauthlib_core
from oauth2_provider.models import redirect_to_uri_allowed
from oauth2_provider.models import get_access_token_model, get_application_model, redirect_to_uri_allowed
from oauth2_provider.oauth2_backends import JSONOAuthLibCore, OAuthLibCore


Expand Down Expand Up @@ -50,6 +53,96 @@ def test_application_json_extract_params(self):
self.assertNotIn("password=123456", body)


UserModel = get_user_model()
ApplicationModel = get_application_model()
AccessTokenModel = get_access_token_model()


@pytest.mark.usefixtures("oauth2_settings")
class TestOAuthLibCoreBackendErrorHandling(TestCase):
def setUp(self):
self.factory = RequestFactory()
self.oauthlib_core = OAuthLibCore()
self.user = UserModel.objects.create_user("john", "[email protected]", "123456")
self.app = ApplicationModel.objects.create(
name="app",
client_id="app_id",
client_secret="app_secret",
client_type=ApplicationModel.CLIENT_CONFIDENTIAL,
authorization_grant_type=ApplicationModel.GRANT_PASSWORD,
user=self.user,
)

def tearDown(self):
self.user.delete()
self.app.delete()

def test_create_token_response_valid(self):
payload = (
"grant_type=password&username=john&password=123456&client_id=app_id&client_secret=app_secret"
)
request = self.factory.post(
"/o/token/",
payload,
content_type="application/x-www-form-urlencoded",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(b"john:123456").decode(),
)

uri, headers, body, status = self.oauthlib_core.create_token_response(request)
self.assertEqual(status, 200)

def test_create_token_response_query_params(self):
payload = (
"grant_type=password&username=john&password=123456&client_id=app_id&client_secret=app_secret"
)
request = self.factory.post(
"/o/token/?test=foo",
payload,
content_type="application/x-www-form-urlencoded",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(b"john:123456").decode(),
)
uri, headers, body, status = self.oauthlib_core.create_token_response(request)

self.assertEqual(status, 400)
self.assertDictEqual(
json.loads(body),
{"error": "invalid_request", "error_description": "URL query parameters are not allowed"},
)

def test_create_revocation_response_valid(self):
AccessTokenModel.objects.create(
user=self.user, token="tokstr", application=self.app, expires=now() + timedelta(days=365)
)
payload = "client_id=app_id&client_secret=app_secret&token=tokstr"
request = self.factory.post(
"/o/revoke_token/",
payload,
content_type="application/x-www-form-urlencoded",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(b"john:123456").decode(),
)
uri, headers, body, status = self.oauthlib_core.create_revocation_response(request)
self.assertEqual(status, 200)

def test_create_revocation_response_query_params(self):
token = AccessTokenModel.objects.create(
user=self.user, token="tokstr", application=self.app, expires=now() + timedelta(days=365)
)
payload = "client_id=app_id&client_secret=app_secret&token=tokstr"
request = self.factory.post(
"/o/revoke_token/?test=foo",
payload,
content_type="application/x-www-form-urlencoded",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(b"john:123456").decode(),
)
uri, headers, body, status = self.oauthlib_core.create_revocation_response(request)
self.assertEqual(status, 400)
self.assertDictEqual(
json.loads(body),
{"error": "invalid_request", "error_description": "URL query parameters are not allowed"},
)
token.delete()


class TestCustomOAuthLibCoreBackend(TestCase):
"""
Tests that the public API behaves as expected when we override
Expand Down