Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions todo/tests/unit/views/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,51 +131,7 @@ def test_get_handles_callback_successfully(self, mock_create_user, mock_handle_c
self.assertNotIn("oauth_state", request.session)


class GoogleAuthStatusViewTests(APISimpleTestCase):
def setUp(self):
super().setUp()
self.client = APIClient()
self.url = reverse("google_status")

def test_get_returns_401_when_no_access_token(self):
response = self.client.get(self.url)

self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
self.assertEqual(response.data["message"], AuthErrorMessages.NO_ACCESS_TOKEN)
self.assertEqual(response.data["authenticated"], False)
self.assertEqual(response.data["statusCode"], status.HTTP_401_UNAUTHORIZED)

@patch("todo.utils.google_jwt_utils.validate_google_access_token")
@patch("todo.services.user_service.UserService.get_user_by_id")
def test_get_returns_user_info_when_authenticated(self, mock_get_user, mock_validate_token):
user_id = str(ObjectId())
user_data = {
"user_id": user_id,
"google_id": "test_google_id",
"email": "[email protected]",
"name": "Test User",
}
mock_validate_token.return_value = user_data

mock_user = Mock()
mock_user.id = ObjectId(user_id)
mock_user.google_id = "test_google_id"
mock_user.email_id = "[email protected]"
mock_user.name = "Test User"
type(mock_user).id = PropertyMock(return_value=ObjectId(user_id))

mock_get_user.return_value = mock_user

tokens = generate_google_token_pair(user_data)
self.client.cookies["ext-access"] = tokens["access_token"]

response = self.client.get(self.url, HTTP_ACCEPT="application/json")

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["data"]["user"]["id"], user_id)
self.assertEqual(response.data["data"]["user"]["email"], mock_user.email_id)
self.assertEqual(response.data["data"]["user"]["name"], mock_user.name)
self.assertEqual(response.data["data"]["user"]["google_id"], mock_user.google_id)


class GoogleRefreshViewTests(APISimpleTestCase):
Expand Down
2 changes: 0 additions & 2 deletions todo/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
GoogleCallbackView,
GoogleRefreshView,
GoogleLogoutView,
GoogleAuthStatusView,
)

urlpatterns = [
Expand All @@ -15,7 +14,6 @@
path("health", HealthView.as_view(), name="health"),
path("auth/google/login/", GoogleLoginView.as_view(), name="google_login"),
path("auth/google/callback/", GoogleCallbackView.as_view(), name="google_callback"),
path("auth/google/status/", GoogleAuthStatusView.as_view(), name="google_status"),
path("auth/google/refresh/", GoogleRefreshView.as_view(), name="google_refresh"),
path("auth/google/logout/", GoogleLogoutView.as_view(), name="google_logout"),
]
270 changes: 44 additions & 226 deletions todo/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
from rest_framework.response import Response
from rest_framework.request import Request
from rest_framework import status
from django.http import HttpResponseRedirect, HttpResponse
from django.http import HttpResponseRedirect
from django.conf import settings
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample, OpenApiResponse
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiResponse
from drf_spectacular.types import OpenApiTypes
from todo.services.google_oauth_service import GoogleOAuthService
from todo.services.user_service import UserService
from todo.utils.google_jwt_utils import (
validate_google_refresh_token,
validate_google_access_token,
generate_google_access_token,
generate_google_token_pair,
)
Expand All @@ -19,7 +18,6 @@
from todo.exceptions.google_auth_exceptions import (
GoogleAuthException,
GoogleTokenExpiredError,
GoogleTokenInvalidError,
GoogleTokenMissingError,
GoogleAPIException,
)
Expand Down Expand Up @@ -70,15 +68,6 @@ def get(self, request: Request):


class GoogleCallbackView(APIView):
"""
This class has two implementations:
1. Current active implementation (temporary) - For testing and development
2. Commented implementation - For frontend integration (to be used later)

The temporary implementation processes the OAuth callback directly and shows a success page.
The frontend implementation will redirect to the frontend and process the callback via POST request.
"""

@extend_schema(
operation_id="google_callback",
summary="Handle Google OAuth callback",
Expand Down Expand Up @@ -114,12 +103,33 @@ class GoogleCallbackView(APIView):
},
)
def get(self, request: Request):
if "error" in request.query_params:
error = request.query_params.get("error")
raise GoogleAuthException(error)

code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")

frontend_callback = f"{settings.FRONTEND_URL}/auth/callback"

if error:
return HttpResponseRedirect(f"{frontend_callback}?error={error}")
elif code and state:
return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
else:
return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")
Comment on lines +112 to +117
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Simplify conditional structure by removing unnecessary elif.

After a return statement, subsequent conditions don't need elif.

Apply this diff to improve code readability:

 if error:
     return HttpResponseRedirect(f"{frontend_callback}?error={error}")
-elif code and state:
+if code and state:
     return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
-else:
-    return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")
+return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if error:
return HttpResponseRedirect(f"{frontend_callback}?error={error}")
elif code and state:
return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
else:
return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")
if error:
return HttpResponseRedirect(f"{frontend_callback}?error={error}")
if code and state:
return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 112-117: Unnecessary "elif" after "return", remove the leading "el" from "elif"

(R1705)

🤖 Prompt for AI Agents
In todo/views/auth.py around lines 112 to 117, simplify the conditional
structure by replacing the `elif` with a simple `if` after the first `return`
statement, since the `return` already exits the function. This improves
readability by removing unnecessary `elif` and using separate `if` statements
for each condition.

Comment on lines 105 to +117
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing OAuth State Validation in GET Callback category Security

Tell me more
What is the issue?

The GET callback handler doesn't validate the oauth_state from the session before redirecting to the frontend, which could lead to CSRF vulnerabilities.

Why this matters

An attacker could craft a malicious state parameter, bypass the state validation, and potentially execute a CSRF attack on the frontend callback endpoint.

Suggested change ∙ Feature Preview

Add state validation before redirecting:

def get(self, request: Request):
    code = request.query_params.get("code")
    state = request.query_params.get("state")
    error = request.query_params.get("error")
    stored_state = request.session.get("oauth_state")

    frontend_callback = f"{settings.FRONTEND_URL}/auth/callback"

    if error:
        return HttpResponseRedirect(f"{frontend_callback}?error={error}")
    elif code and state:
        if not stored_state or stored_state != state:
            return HttpResponseRedirect(f"{frontend_callback}?error=invalid_state")
        return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
    else:
        return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.


@extend_schema(
operation_id="google_callback_post",
summary="Handle Google OAuth callback (POST)",
description="Processes the OAuth callback from Google via POST request",
tags=["auth"],
responses={
200: OpenApiResponse(description="OAuth callback processed successfully"),
400: OpenApiResponse(description="Bad request - invalid parameters"),
500: OpenApiResponse(description="Internal server error"),
},
)
def post(self, request: Request):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Add docstring to document the POST method.

The method lacks documentation explaining its purpose and parameters.

Add a docstring after the method definition:

 def post(self, request: Request):
+    """
+    Process Google OAuth callback via POST request.
+    
+    Validates authorization code and state, creates/updates user,
+    and returns authentication tokens in cookies.
+    
+    Args:
+        request: HTTP request containing 'code' and 'state' in body
+        
+    Returns:
+        Response with user data and token expiry information
+        
+    Raises:
+        GoogleAuthException: If code is missing or state is invalid
+        GoogleAPIException: If OAuth processing fails
+    """
     code = request.data.get("code")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def post(self, request: Request):
def post(self, request: Request):
"""
Process Google OAuth callback via POST request.
Validates authorization code and state, creates/updates user,
and returns authentication tokens in cookies.
Args:
request: HTTP request containing 'code' and 'state' in body
Returns:
Response with user data and token expiry information
Raises:
GoogleAuthException: If code is missing or state is invalid
GoogleAPIException: If OAuth processing fails
"""
code = request.data.get("code")
...
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 130-130: Missing function or method docstring

(C0116)

🤖 Prompt for AI Agents
In todo/views/auth.py at line 130, the post method lacks a docstring explaining
its purpose and parameters. Add a concise docstring immediately after the method
definition that describes what the POST method does and outlines its input
parameters, specifically the request argument.

code = request.data.get("code")
state = request.data.get("state")

if not code:
raise GoogleAuthException("No authorization code received from Google")
Expand All @@ -128,9 +138,6 @@ def get(self, request: Request):
if not stored_state or stored_state != state:
raise GoogleAuthException("Invalid state parameter")

return self._handle_callback_directly(code, request)

def _handle_callback_directly(self, code, request):
try:
google_data = GoogleOAuthService.handle_callback(code)
user = UserService.create_or_update_user(google_data)
Expand All @@ -144,62 +151,24 @@ def _handle_callback_directly(self, code, request):
}
)

wants_json = (
"application/json" in request.headers.get("Accept", "").lower()
or request.query_params.get("format") == "json"
)

if wants_json:
response = Response(
{
"statusCode": status.HTTP_200_OK,
"message": AppMessages.GOOGLE_LOGIN_SUCCESS,
"data": {
"user": {
"id": str(user.id),
"name": user.name,
"email": user.email_id,
"google_id": user.google_id,
},
"tokens": {
"access_token_expires_in": tokens["expires_in"],
"refresh_token_expires_in": settings.GOOGLE_JWT["REFRESH_TOKEN_LIFETIME"],
},
response = Response(
{
"statusCode": status.HTTP_200_OK,
"message": AppMessages.GOOGLE_LOGIN_SUCCESS,
"data": {
"user": {
"id": str(user.id),
"name": user.name,
"email": user.email_id,
"google_id": user.google_id,
},
"tokens": {
"access_token_expires_in": tokens["expires_in"],
"refresh_token_expires_in": settings.GOOGLE_JWT["REFRESH_TOKEN_LIFETIME"],
},
}
)
else:
response = HttpResponse(f"""
<html>
<head><title>✅ Login Successful</title></head>
<body style="font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px;">
<h1>✅ Google OAuth Login Successful!</h1>

<h2>🧑‍💻 User Info:</h2>
<ul>
<li><strong>ID:</strong> {user.id}</li>
<li><strong>Name:</strong> {user.name}</li>
<li><strong>Email:</strong> {user.email_id}</li>
<li><strong>Google ID:</strong> {user.google_id}</li>
</ul>

<h2>🍪 Authentication Cookies Set:</h2>
<ul>
<li><strong>Access Token:</strong> ext-access (expires in {tokens['expires_in']} seconds)</li>
<li><strong>Refresh Token:</strong> ext-refresh (expires in 7 days)</li>
</ul>

<h2>🧪 Test Other Endpoints:</h2>
<ul>
<li><a href="/v1/auth/google/status/">Check Auth Status</a></li>
<li><a href="/v1/auth/google/refresh/">Refresh Token</a></li>
<li><a href="/v1/auth/google/logout/">Logout</a></li>
</ul>

<p><strong>Google OAuth integration is working perfectly!</strong></p>
</body>
</html>
""")
},
}
)

self._set_auth_cookies(response, tokens)
request.session.pop("oauth_state", None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent OAuth State Cleanup category Security

Tell me more
What is the issue?

The oauth_state is only cleared in the POST handler but not in the GET handler, which could lead to the state being reused.

Why this matters

If the state isn't cleared after use in the GET handler, it remains in the session and could be reused in subsequent requests, potentially allowing replay attacks.

Suggested change ∙ Feature Preview

Add state cleanup in the GET handler after validation:

def get(self, request: Request):
    # ... existing code ...
    elif code and state:
        if not stored_state or stored_state != state:
            return HttpResponseRedirect(f"{frontend_callback}?error=invalid_state")
        request.session.pop("oauth_state", None)
        return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check now

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please share the updated code that handles the OAuth state cleanup in the GET handler? This will help verify the security fix.

Expand All @@ -225,157 +194,6 @@ def _set_auth_cookies(self, response, tokens):
)


# Frontend integration implementation (to be used later)
"""
class GoogleCallbackViewFrontend(APIView):
def get(self, request: Request):
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")

frontend_callback = f"{settings.FRONTEND_URL}/auth/callback"

if error:
return HttpResponseRedirect(f"{frontend_callback}?error={error}")
elif code and state:
return HttpResponseRedirect(f"{frontend_callback}?code={code}&state={state}")
else:
return HttpResponseRedirect(f"{frontend_callback}?error=missing_parameters")

def post(self, request: Request):
code = request.data.get("code")
state = request.data.get("state")

if not code:
formatted_errors = [
ApiErrorDetail(
source={ApiErrorSource.PARAMETER: "code"},
title=ApiErrors.VALIDATION_ERROR,
detail=ApiErrors.INVALID_AUTH_CODE,
)
]
error_response = ApiErrorResponse(
statusCode=400,
message=ApiErrors.INVALID_AUTH_CODE,
errors=formatted_errors
)
return Response(
data=error_response.model_dump(mode="json", exclude_none=True),
status=status.HTTP_400_BAD_REQUEST
)

stored_state = request.session.get("oauth_state")
if not stored_state or stored_state != state:
formatted_errors = [
ApiErrorDetail(
source={ApiErrorSource.PARAMETER: "state"},
title=ApiErrors.VALIDATION_ERROR,
detail=ApiErrors.INVALID_STATE_PARAMETER,
)
]
error_response = ApiErrorResponse(
statusCode=400,
message=ApiErrors.INVALID_STATE_PARAMETER,
errors=formatted_errors
)
return Response(
data=error_response.model_dump(mode="json", exclude_none=True),
status=status.HTTP_400_BAD_REQUEST
)

google_data = GoogleOAuthService.handle_callback(code)
user = UserService.create_or_update_user(google_data)

tokens = generate_google_token_pair(
{
"user_id": str(user.id),
"google_id": user.google_id,
"email": user.email_id,
"name": user.name,
}
)

response = Response({
"statusCode": status.HTTP_200_OK,
"message": AppMessages.GOOGLE_LOGIN_SUCCESS,
"data": {
"user": {
"id": str(user.id),
"name": user.name,
"email": user.email_id,
"google_id": user.google_id,
},
"tokens": {
"access_token_expires_in": tokens["expires_in"],
"refresh_token_expires_in": settings.GOOGLE_JWT["REFRESH_TOKEN_LIFETIME"]
}
}
})

self._set_auth_cookies(response, tokens)
request.session.pop("oauth_state", None)

return response

def _get_cookie_config(self):
return {
"path": "/",
"domain": settings.GOOGLE_COOKIE_SETTINGS.get("COOKIE_DOMAIN"),
"secure": settings.GOOGLE_COOKIE_SETTINGS.get("COOKIE_SECURE", False),
"httponly": True,
"samesite": settings.GOOGLE_COOKIE_SETTINGS.get("COOKIE_SAMESITE", "Lax"),
}

def _set_auth_cookies(self, response, tokens):
config = self._get_cookie_config()
response.set_cookie("ext-access", tokens["access_token"], max_age=tokens["expires_in"], **config)
response.set_cookie(
"ext-refresh", tokens["refresh_token"], max_age=settings.GOOGLE_JWT["REFRESH_TOKEN_LIFETIME"], **config
)
"""


class GoogleAuthStatusView(APIView):
@extend_schema(
operation_id="google_auth_status",
summary="Check authentication status",
description="Check if the user is authenticated and return user information",
tags=["auth"],
responses={
200: OpenApiResponse(description="Authentication status retrieved successfully"),
401: OpenApiResponse(description="Unauthorized - invalid or missing token"),
500: OpenApiResponse(description="Internal server error"),
},
)
def get(self, request: Request):
access_token = request.COOKIES.get("ext-access")

if not access_token:
raise GoogleTokenMissingError(AuthErrorMessages.NO_ACCESS_TOKEN)

try:
payload = validate_google_access_token(access_token)
user = UserService.get_user_by_id(payload["user_id"])
except Exception as e:
raise GoogleTokenInvalidError(str(e))

return Response(
{
"statusCode": status.HTTP_200_OK,
"message": "Authentication status retrieved successfully",
"data": {
"authenticated": True,
"user": {
"id": str(user.id),
"email": user.email_id,
"name": user.name,
"google_id": user.google_id,
},
},
}
)


class GoogleRefreshView(APIView):
@extend_schema(
operation_id="google_refresh_token",
Expand Down
Loading
Loading