-
Notifications
You must be signed in to change notification settings - Fork 343
Add Support for Custom AuthManager implementation #2055
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
3ee34b7
8fbec86
5954550
92ad051
38042ca
ab75641
9c59d67
51dddf4
c951f83
779b49b
36b34e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -374,6 +374,94 @@ Specific headers defined by the RESTCatalog spec include: | |
| ------------------------------------ | ------------------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | ||
| `header.X-Iceberg-Access-Delegation` | `{vended-credentials,remote-signing}` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms | | ||
|
||
#### Authentication in RESTCatalog | ||
|
||
The RESTCatalog supports pluggable authentication via the `auth` configuration block. This allows you to specify which how the access token will be fetched and managed for use with the HTTP requests to the RESTCatalog server. The authentication method is selected by setting the `auth.type` property, and additional configuration can be provided as needed for each method. | ||
|
||
##### Supported Authentication Types | ||
|
||
- `noop`: No authentication (no Authorization header sent). | ||
- `basic`: HTTP Basic authentication. | ||
- `oauth2`: OAuth2 client credentials flow. | ||
- `legacyoauth2`: Legacy OAuth2 client credentials flow (Deprecated and will be removed in PyIceberg 1.0.0) | ||
- `custom`: Custom authentication manager (requires `auth.impl`). | ||
|
||
##### Configuration Properties | ||
|
||
The `auth` block is structured as follows: | ||
|
||
```yaml | ||
catalog: | ||
default: | ||
type: rest | ||
uri: http://rest-catalog/ws/ | ||
auth: | ||
type: <auth_type> | ||
<auth_type>: | ||
# Type-specific configuration | ||
impl: <custom_class_path> # Only for custom auth | ||
``` | ||
|
||
**Property Reference:** | ||
|
||
| Property | Required | Description | | ||
|------------------|----------|-------------------------------------------------------------------------------------------------| | ||
| `auth.type` | Yes | The authentication type to use (`noop`, `basic`, `oauth2`, or `custom`). | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. on review I was debating the necessity of a separate type config given an impl option, but I can see the appeal (we are free to change the implementation referred to by a given type). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have similar pattern to load "custom" implementation for catalog/file_io/location_provider :) |
||
| `auth.impl` | Conditionally | The fully qualified class path for a custom AuthManager. Required if `auth.type` is `custom`. | | ||
| `auth.basic` | If type is `basic` | Block containing `username` and `password` for HTTP Basic authentication. | | ||
| `auth.oauth2` | If type is `oauth2` | Block containing OAuth2 configuration (see below). | | ||
| `auth.custom` | If type is `custom` | Block containing configuration for the custom AuthManager. | | ||
|
||
##### Examples | ||
|
||
**No Authentication:** | ||
|
||
```yaml | ||
auth: | ||
type: noop | ||
``` | ||
|
||
**Basic Authentication:** | ||
|
||
```yaml | ||
auth: | ||
type: basic | ||
basic: | ||
username: myuser | ||
password: mypass | ||
``` | ||
|
||
**OAuth2 Authentication:** | ||
|
||
```yaml | ||
auth: | ||
type: oauth2 | ||
oauth2: | ||
client_id: my-client-id | ||
client_secret: my-client-secret | ||
token_url: https://auth.example.com/oauth/token | ||
scope: read | ||
refresh_margin: 60 # (optional) seconds before expiry to refresh | ||
expires_in: 3600 # (optional) fallback if server does not provide | ||
``` | ||
|
||
**Custom Authentication:** | ||
|
||
```yaml | ||
auth: | ||
type: custom | ||
impl: mypackage.module.MyAuthManager | ||
custom: | ||
property1: value1 | ||
property2: value2 | ||
``` | ||
|
||
##### Notes | ||
|
||
- If `auth.type` is `custom`, you **must** specify `auth.impl` with the full class path to your custom AuthManager. | ||
- If `auth.type` is not `custom`, specifying `auth.impl` is not allowed. | ||
- The configuration block under each type (e.g., `basic`, `oauth2`, `custom`) is passed as keyword arguments to the corresponding AuthManager. | ||
|
||
### SQL Catalog | ||
|
||
The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. The init_catalog_tables is optional and defaults to True. If it is set to False, the catalog tables will not be created when the SQLCatalog is initialized. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls): | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -134,6 +134,8 @@ class IdentifierKind(Enum): | |||||
SIGV4_SERVICE = "rest.signing-name" | ||||||
OAUTH2_SERVER_URI = "oauth2-server-uri" | ||||||
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode" | ||||||
AUTH = "auth" | ||||||
CUSTOM = "custom" | ||||||
|
||||||
NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8) | ||||||
|
||||||
|
@@ -247,7 +249,23 @@ def _create_session(self) -> Session: | |||||
elif ssl_client_cert := ssl_client.get(CERT): | ||||||
session.cert = ssl_client_cert | ||||||
|
||||||
session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session)) | ||||||
if auth_config := self.properties.get(AUTH): | ||||||
auth_type = auth_config.get("type") | ||||||
if auth_type is None: | ||||||
raise ValueError("auth.type must be defined") | ||||||
auth_type_config = auth_config.get(auth_type, {}) | ||||||
auth_impl = auth_config.get("impl") | ||||||
|
||||||
if auth_type is CUSTOM and not auth_impl: | ||||||
raise ValueError("auth.impl must be specified when using custom auth.type") | ||||||
|
||||||
if auth_type is not CUSTOM and auth_impl: | ||||||
sungwy marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comparison
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
raise ValueError("auth.impl can only be specified when using custom auth.type") | ||||||
|
||||||
session.auth = AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type, auth_type_config)) | ||||||
else: | ||||||
session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session)) | ||||||
|
||||||
# Set HTTP headers | ||||||
self._config_headers(session) | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,12 @@ | |
|
||
import base64 | ||
import importlib | ||
import threading | ||
import time | ||
from abc import ABC, abstractmethod | ||
from typing import Any, Dict, Optional, Type | ||
|
||
import requests | ||
from requests import HTTPError, PreparedRequest, Session | ||
from requests.auth import AuthBase | ||
|
||
|
@@ -42,11 +45,15 @@ def auth_header(self) -> Optional[str]: | |
|
||
|
||
class NoopAuthManager(AuthManager): | ||
"""Auth Manager implementation with no auth.""" | ||
|
||
def auth_header(self) -> Optional[str]: | ||
return None | ||
|
||
|
||
class BasicAuthManager(AuthManager): | ||
"""AuthManager implementation that supports basic password auth.""" | ||
|
||
def __init__(self, username: str, password: str): | ||
credentials = f"{username}:{password}" | ||
self._token = base64.b64encode(credentials.encode()).decode() | ||
|
@@ -56,6 +63,12 @@ def auth_header(self) -> str: | |
|
||
|
||
class LegacyOAuth2AuthManager(AuthManager): | ||
"""Legacy OAuth2 AuthManager implementation. | ||
|
||
This class exists for backward compatibility, and will be removed in | ||
PyIceberg 1.0.0 in favor of OAuth2AuthManager. | ||
""" | ||
|
||
_session: Session | ||
_auth_url: Optional[str] | ||
_token: Optional[str] | ||
|
@@ -109,6 +122,95 @@ def auth_header(self) -> str: | |
return f"Bearer {self._token}" | ||
|
||
|
||
class OAuth2TokenProvider: | ||
"""Thread-safe OAuth2 token provider with token refresh support.""" | ||
|
||
client_id: str | ||
client_secret: str | ||
token_url: str | ||
scope: Optional[str] | ||
refresh_margin: int | ||
expires_in: Optional[int] | ||
|
||
_token: Optional[str] | ||
_expires_at: int | ||
_lock: threading.Lock | ||
|
||
def __init__( | ||
self, | ||
client_id: str, | ||
client_secret: str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we let users provide these 2 separately? or parse them out from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are a couple of options in the oauth section
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. separated out to: #2244 |
||
token_url: str, | ||
scope: Optional[str] = None, | ||
refresh_margin: int = 60, | ||
expires_in: Optional[int] = None, | ||
): | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.token_url = token_url | ||
self.scope = scope | ||
self.refresh_margin = refresh_margin | ||
self.expires_in = expires_in | ||
|
||
self._token = None | ||
self._expires_at = 0 | ||
self._lock = threading.Lock() | ||
|
||
def _refresh_token(self) -> None: | ||
data = { | ||
"grant_type": "client_credentials", | ||
"client_id": self.client_id, | ||
"client_secret": self.client_secret, | ||
} | ||
if self.scope: | ||
data["scope"] = self.scope | ||
|
||
response = requests.post(self.token_url, data=data) | ||
response.raise_for_status() | ||
result = response.json() | ||
|
||
self._token = result["access_token"] | ||
expires_in = result.get("expires_in", self.expires_in) | ||
if expires_in is None: | ||
raise ValueError( | ||
"The expiration time of the Token must be provided by the Server in the Access Token Response in `expired_in` field, or by the PyIceberg Client." | ||
) | ||
self._expires_at = time.time() + expires_in - self.refresh_margin | ||
|
||
def get_token(self) -> str: | ||
with self._lock: | ||
if not self._token or time.time() >= self._expires_at: | ||
self._refresh_token() | ||
if self._token is None: | ||
raise ValueError("Authorization token is None after refresh") | ||
return self._token | ||
|
||
|
||
class OAuth2AuthManager(AuthManager): | ||
"""Auth Manager implementation that supports OAuth2 as defined in IETF RFC6749.""" | ||
|
||
def __init__( | ||
self, | ||
client_id: str, | ||
client_secret: str, | ||
token_url: str, | ||
scope: Optional[str] = None, | ||
refresh_margin: int = 60, | ||
expires_in: Optional[int] = None, | ||
): | ||
self.token_provider = OAuth2TokenProvider( | ||
client_id, | ||
client_secret, | ||
token_url, | ||
scope, | ||
refresh_margin, | ||
expires_in, | ||
) | ||
|
||
def auth_header(self) -> str: | ||
return f"Bearer {self.token_provider.get_token()}" | ||
|
||
|
||
class AuthManagerAdapter(AuthBase): | ||
"""A `requests.auth.AuthBase` adapter that integrates an `AuthManager` into a `requests.Session` to automatically attach the appropriate Authorization header to every request. | ||
|
||
|
@@ -187,3 +289,4 @@ def create(cls, class_or_name: str, config: Dict[str, Any]) -> AuthManager: | |
AuthManagerFactory.register("noop", NoopAuthManager) | ||
AuthManagerFactory.register("basic", BasicAuthManager) | ||
AuthManagerFactory.register("legacyoauth2", LegacyOAuth2AuthManager) | ||
AuthManagerFactory.register("oauth2", OAuth2AuthManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! A heads up, we recently merged a PR which added an "Authentication Options" section under the REST Catalog docs. Would be great to merge with what you have here.
See,
https://github.com/apache/iceberg-python/blob/main/mkdocs/docs/configuration.md#authentication-options