Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/asgardeo-ai/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "asgardeo_ai"
version = "0.1.0"
version = "0.2.1"
description = "Async Python SDK for Asgardeo AI agent authentication"
authors = ["Thilina Senarath <[email protected]>"]
license = "MIT"
Expand Down
8 changes: 2 additions & 6 deletions packages/asgardeo-ai/src/asgardeo_ai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@

from .agent_auth_manager import (
AgentAuthManager,
AgentConfig,
generate_state,
build_authorization_url,
AgentConfig
)

__version__ = "0.1.0"
__version__ = "0.2.1"

__all__ = [
"AgentAuthManager",
"AgentConfig",
"generate_state",
"build_authorization_url",
]
68 changes: 56 additions & 12 deletions packages/asgardeo-ai/src/asgardeo_ai/agent_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
AuthenticationError,
TokenError,
ValidationError,
generate_pkce_pair,
generate_state,
build_authorization_url
)

logger = logging.getLogger(__name__)
Expand All @@ -43,16 +46,6 @@ class AgentConfig:
agent_id: str
agent_secret: str


def generate_state() -> str:
"""Generate a secure random state parameter."""
return base64.urlsafe_b64encode(os.urandom(16)).decode('utf-8').rstrip('=')


def build_authorization_url(base_url: str, params: Dict[str, Any]) -> str:
"""Build authorization URL with parameters."""
return f"{base_url}?{urlencode(params)}"

class AgentAuthManager:
"""Agent-enhanced OAuth2 authentication manager for AI agents."""

Expand Down Expand Up @@ -91,7 +84,12 @@ async def get_agent_token(self, scopes: Optional[List[str]] = None) -> OAuthToke
self.config.scope = ' '.join(scopes)

# Start authentication flow
init_response = await native_client.authenticate()
code_verifier, code_challenge = generate_pkce_pair()
params = {
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
init_response = await native_client.authenticate(params=params)

if native_client.flow_status == FlowStatus.SUCCESS_COMPLETED:
auth_data = init_response.get('authData', {})
Expand Down Expand Up @@ -127,7 +125,7 @@ async def get_agent_token(self, scopes: Optional[List[str]] = None) -> OAuthToke
raise TokenError("No authorization code received from authentication flow.")

# Exchange code for token
token = await self.token_client.get_token('authorization_code', code=code)
token = await self.token_client.get_token('authorization_code', code=code, code_verifier=code_verifier)

# Restore original scope
if scopes:
Expand Down Expand Up @@ -180,12 +178,57 @@ def get_authorization_url(
auth_params
)
return auth_url, state

def get_authorization_url_with_pkce(
self,
scopes: List[str],
state: Optional[str] = None,
resource: Optional[str] = None,
**kwargs: Any,
) -> Tuple[str, str, str]:
"""Generate authorization URL for user authentication.

:param scopes: List of OAuth scopes to request
:param state: Optional state parameter (generated if not provided)
:param resource: Optional resource parameter
:param kwargs: Additional parameters for the authorization URL
:return: Tuple of (authorization_url, state)
"""
if not state:
state = generate_state()

code_verifier, code_challenge = generate_pkce_pair()

auth_params = {
"client_id": self.config.client_id,
"redirect_uri": self.config.redirect_uri,
"scope": " ".join(scopes),
"state": state,
"response_type": "code",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}

if resource:
auth_params["resource"] = resource

if self.agent_config:
auth_params["requested_actor"] = self.agent_config.agent_id

auth_params.update(kwargs)

auth_url = build_authorization_url(
f"{self.config.base_url}/oauth2/authorize",
auth_params
)
return auth_url, state, code_verifier

async def get_obo_token(
self,
auth_code: str,
agent_token: str,
scopes: Optional[List[str]] = None,
code_verifier: Optional[str] = None
) -> OAuthToken:
"""Get on-behalf-of (OBO) token for user using authorization code.

Expand All @@ -206,6 +249,7 @@ async def get_obo_token(
code=auth_code,
scope=scope_str,
actor_token=actor_token_val,
code_verifier=code_verifier
)
return token

Expand Down
2 changes: 1 addition & 1 deletion packages/asgardeo/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "asgardeo"
version = "0.1.0"
version = "0.2.1"
description = "Python SDK for Asgardeo"
authors = ["Thilina Senarath <[email protected]>"]
license = "MIT"
Expand Down
6 changes: 5 additions & 1 deletion packages/asgardeo/src/asgardeo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
TokenError,
ValidationError,
)
from .auth.util import generate_pkce_pair, generate_state, build_authorization_url

__version__ = "0.1.0"
__version__ = "0.2.1"

__all__ = [
"AsgardeoConfig",
Expand All @@ -38,4 +39,7 @@
"OAuthToken",
"TokenError",
"ValidationError",
"generate_pkce_pair",
"generate_state",
"build_authorization_url",
]
6 changes: 5 additions & 1 deletion packages/asgardeo/src/asgardeo/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
ValidationError,
)
from .client import AsgardeoNativeAuthClient, AsgardeoTokenClient
from .util import generate_pkce_pair, generate_state, build_authorization_url

__version__ = "0.1.0"
__version__ = "0.2.1"

__all__ = [
"AsgardeoConfig",
Expand All @@ -38,4 +39,7 @@
"OAuthToken",
"TokenError",
"ValidationError",
"generate_pkce_pair",
"generate_state",
"build_authorization_url",
]
10 changes: 8 additions & 2 deletions packages/asgardeo/src/asgardeo/auth/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Async Asgardeo authentication and token clients."""

import json
import logging
from typing import Any
from urllib.parse import urlencode

Expand All @@ -33,6 +34,7 @@
ValidationError,
)

logger = logging.getLogger(__name__)

class AsgardeoNativeAuthClient:
"""Async client for handling Asgardeo App Native Authentication flows.
Expand Down Expand Up @@ -72,12 +74,15 @@ async def _initiate_auth(
url = f"{self.base_url}/oauth2/authorize"
data = {
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"response_type": "code",
"redirect_uri": self.config.redirect_uri,
"scope": self.config.scope,
"response_mode": "direct",
}

# Only add client_secret if code_verifier is not in params (PKCE flow)
if not (params and "code_challenge" in params):
data["client_secret"] = self.config.client_secret
if state:
data["state"] = state
if params:
Expand Down Expand Up @@ -280,7 +285,8 @@ async def get_token(self, grant_type: str, **kwargs: Any) -> OAuthToken:
"""
url = f"{self.base_url}/oauth2/token"
data = {"grant_type": grant_type, "client_id": self.config.client_id}
if self.config.client_secret:

if self.config.client_secret and "code_verifier" not in kwargs:
data["client_secret"] = self.config.client_secret

if grant_type == "authorization_code":
Expand Down
36 changes: 36 additions & 0 deletions packages/asgardeo/src/asgardeo/auth/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import base64
import hashlib
import os
import secrets
from typing import Any, Dict, Tuple
from urllib.parse import urlencode


def generate_pkce_pair() -> Tuple[str, str]:
"""
Generate PKCE code verifier and code challenge pair
Returns:
Tuple of (code_verifier, code_challenge)
"""
# Generate code verifier (43-128 characters)
code_verifier = (
base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("utf-8").rstrip("=")
)

# Generate code challenge (SHA256 hash of verifier)
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest())
.decode("utf-8")
.rstrip("=")
)

return code_verifier, code_challenge

def generate_state() -> str:
"""Generate a secure random state parameter."""
return base64.urlsafe_b64encode(os.urandom(16)).decode('utf-8').rstrip('=')


def build_authorization_url(base_url: str, params: Dict[str, Any]) -> str:
"""Build authorization URL with parameters."""
return f"{base_url}?{urlencode(params)}"