diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4fb058a9..434d9e4c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -112,7 +112,7 @@ Closing message template: ``` Thanks for the contribution, however it appears to lack sufficient effort (eg. not consulting the documentation, stack trace, or method signatures) or contains unredacted AI generated code. To keep the issue/PR trackers focused and maintainable, we're closing this for now. Please review our contributing policies in the [CONTRIBUTING.md](https://github.com/Voyz/ibind/blob/master/CONTRIBUTING.md) file. -If you revise the contribution - focusing on the minimal relevant code, confirming it aligns with the library's API and demonstrating your attempt to tackle it - we'll be happy to take another look. +If you revise the contribution - focusing on the minimal relevant code, confirming it aligns with the library's API and demonstrating your attempt to tackle it - we'll be happy to take another look. ``` ## Code Style diff --git a/Makefile b/Makefile index 64644802..e6b19355 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ else PYTHONPATH=.:test $(PYTHON) -m pytest test/unit/ test/integration/ -v endif -.PHONY: test-unit +.PHONY: test-unit test-unit: ## Run only unit tests ifeq ($(OS),Windows_NT) cmd /c "set PYTHONPATH=.;test && $(PYTHON) -m pytest test/unit/ -v" @@ -63,4 +63,3 @@ check-all: lint scan format test ## Run all checks (lint, scan, format, test) .PHONY: help help: # Show help for each of the Makefile recipes. @grep -E '^[a-zA-Z0-9 -]+:.*#' Makefile | sort | while read -r l; do printf "\033[1;32m$$(echo $$l | cut -f 1 -d':')\033[00m:$$(echo $$l | cut -f 2- -d'#')\n"; done - diff --git a/README.md b/README.md index ebcccf73..d4c53e0a 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ wrong? [Create an issue and let us know!][issues]*

-IBind is an unofficial Python API client library for the [Interactive Brokers Client Portal Web API.][ibkr-docs] (recently rebranded to Web API 1.0 or CPAPI 1.0) It supports both REST and WebSocket APIs of the IBKR Web API 1.0. Now fully headless with [OAuth 1.0a][wiki-oauth1a] support. +IBind is an unofficial Python API client library for the [Interactive Brokers Client Portal Web API.][ibkr-docs] (recently rebranded to Web API 1.0 or CPAPI 1.0) It supports both REST and WebSocket APIs of the IBKR Web API 1.0. Now fully headless with support for [OAuth 1.0a][wiki-oauth1a] and OAuth 2.0. _Note: IBind currently supports only the Web API 1.0 since the [newer Web API][web-api] seems to be still in beta and is not fully documented. Some of its features may work, but it is recommended to use the Web API 1.0's documentation for the time being. Once a complete version of the new Web API is released IBind will be extended to support it._ @@ -32,7 +32,7 @@ pip install ibind ## Authentication -IBind supports fully headless authentication using [OAuth 1.0a][wiki-oauth1a]. This means no longer needing to run any type software to communicate with IBKR API. +IBind supports fully headless authentication using [OAuth 1.0a][wiki-oauth1a] and OAuth 2.0. This means no longer needing to run any type of software to communicate with IBKR API. Alternatively, use [IBeam][ibeam] along with this library for easier setup and maintenance of the CP Gateway. diff --git a/agents/CLAUDE.md b/agents/CLAUDE.md index 4ff77c80..5d1167dd 100644 --- a/agents/CLAUDE.md +++ b/agents/CLAUDE.md @@ -15,7 +15,7 @@ The library is structured around two main client classes: - **Purpose**: REST API client extending `RestClient` base class - **Mixins**: Functionality is organized into mixins in `ibind/client/ibkr_client_mixins/`: - `accounts_mixin.py` - Account operations - - `contract_mixin.py` - Contract/security operations + - `contract_mixin.py` - Contract/security operations - `marketdata_mixin.py` - Market data operations - `order_mixin.py` - Order management - `portfolio_mixin.py` - Portfolio operations @@ -108,4 +108,4 @@ The `examples/` directory contains comprehensive usage examples: - WebSocket client requires careful lifecycle management (start/stop, subscription handling) - Rate limiting and parallel request handling are built into the REST client - All API endpoints follow IBKR's REST API documentation structure -- Environment variables can be configured via `.env` files (auto-patched via `patch_dotenv()`) \ No newline at end of file +- Environment variables can be configured via `.env` files (auto-patched via `patch_dotenv()`) diff --git a/examples/rest_09_oauth2.py b/examples/rest_09_oauth2.py new file mode 100644 index 00000000..c2af063e --- /dev/null +++ b/examples/rest_09_oauth2.py @@ -0,0 +1,73 @@ +""" +REST OAuth 2.0. + +Showcases usage of OAuth 2.0 with IbkrClient. + +This example demonstrates authenticating using OAuth 2.0 and making some basic API calls. + +Using IbkrClient with OAuth 2.0 support will automatically handle generating the JWTs +and managing the SSO bearer token. You should be able to use all endpoints in the same +way as when not using OAuth. + +IMPORTANT: In order to use OAuth 2.0, you're required to set up the following +environment variables: + +- IBIND_USE_OAUTH: Set to True (or pass use_oauth=True to IbkrClient). +- IBIND_OAUTH2_CLIENT_ID: Your OAuth 2.0 Client ID. +- IBIND_OAUTH2_CLIENT_KEY_ID: Your OAuth 2.0 Client Key ID (kid). +- IBIND_OAUTH2_PRIVATE_KEY_PEM: Your OAuth 2.0 private key in PEM format. + To set this as an environment variable, you should replace newlines (\n) + in your PEM file with the literal characters '\\n'. + For example, if your key is: + ----BEGIN PRIVATE KEY----- + ABC\nDEF + -----END PRIVATE KEY----- + You would set the environment variable as: + IBIND_OAUTH2_PRIVATE_KEY_PEM="-----BEGIN PRIVATE KEY-----\\nABC\\nDEF\\n-----END PRIVATE KEY-----" +- IBIND_OAUTH2_USERNAME: Your IBKR username associated with the OAuth 2.0 app. + +Optionally, you can also set these if they differ from the defaults: +- IBIND_OAUTH2_TOKEN_URL: Defaults to 'https://api.ibkr.com/oauth2/api/v1/token'. +- IBIND_OAUTH2_SSO_SESSION_URL: Defaults to 'https://api.ibkr.com/gw/api/v1/sso-sessions'. +- IBIND_OAUTH2_AUDIENCE: Defaults to '/token'. +- IBIND_OAUTH2_SCOPE: Defaults to 'sso-sessions.write'. +- IBIND_OAUTH2_REST_URL: Defaults to 'https://api.ibkr.com/v1/api/'. +- IBIND_OAUTH2_IP_ADDRESS: Your public IP address. If not set, the library will attempt to fetch it. + +If you prefer setting these variables inline, you can pass an instance of the +OAuth2Config class as the 'oauth_config' parameter to the IbkrClient constructor. +This is especially useful if managing the PEM key via environment variables is cumbersome. + +Example of dynamic configuration: +from ibind import IbkrClient +from ibind.oauth.oauth2 import OAuth2Config + +oauth_cfg = OAuth2Config( + client_id='your_client_id', + client_key_id='your_client_key_id', + private_key_pem='-----BEGIN PRIVATE KEY-----\nYOUR KEY HERE\n-----END PRIVATE KEY-----', # Direct string + username='your_ibkr_username' + # ... any other overrides +) +client = IbkrClient(use_oauth=True, oauth_config=oauth_cfg) + +This example assumes environment variables are set for simplicity. +""" + +import os + +from ibind import IbkrClient, ibind_logs_initialize +from ibind.oauth.oauth2 import OAuth2Config + +ibind_logs_initialize() + +client = IbkrClient( + cacert=os.getenv('IBIND_CACERT', False), + use_oauth=True, + oauth_config=OAuth2Config(), +) + +try: + print(client.tickle().data) +finally: + client.close() diff --git a/ibind/client/ibkr_client.py b/ibind/client/ibkr_client.py index 8f38c40b..1b8da43d 100644 --- a/ibind/client/ibkr_client.py +++ b/ibind/client/ibkr_client.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import importlib.util import os import time @@ -78,16 +80,23 @@ def __init__( auto_recreate_session (bool, optional): Whether to automatically recreate the session on connection errors. Defaults to True. auto_register_shutdown (bool, optional): Whether to automatically register a shutdown handler for this client. Defaults to True. use_oauth (bool, optional): Whether to use OAuth authentication. Defaults to False. - oauth_config (OAuthConfig, optional): The configuration for the OAuth authentication. OAuth1aConfig is used if not specified. + oauth_config (OAuthConfig, optional): The configuration for the OAuth authentication. + OAuth1aConfig is used if not specified. """ self._tickler: Optional[Tickler] = None self._use_oauth = use_oauth + self.oauth_config = oauth_config if self._use_oauth: from ibind.oauth.oauth1a import OAuth1aConfig + from ibind.oauth.oauth2 import OAuth2Config + + if self.oauth_config is None: + self.oauth_config = OAuth1aConfig() + elif not isinstance(self.oauth_config, (OAuth1aConfig, OAuth2Config)): + raise ValueError(f'Unsupported OAuth configuration type: {type(self.oauth_config)}') - # cast to OAuth1aConfig for type checking, since currently 1.0a is the only version used - self.oauth_config = cast(OAuth1aConfig, oauth_config) if oauth_config is not None else OAuth1aConfig() + self.oauth_config = cast('OAuthConfig', self.oauth_config) url = url if url is not None and self.oauth_config.oauth_rest_url is None else self.oauth_config.oauth_rest_url if url is None: @@ -109,7 +118,7 @@ def __init__( self.logger.info('#################') self.logger.info( - f'New IbkrClient(base_url={self.base_url!r}, account_id={self.account_id!r}, ssl={self.cacert!r}, timeout={self._timeout}, max_retries={self._max_retries}, use_oauth={self._use_oauth})' + f'New IbkrClient(base_url={self.base_url!r}, account_id={self.account_id!r}, ssl={self.cacert!r}, timeout={self._timeout}, max_retries={self._max_retries}, use_oauth={self._use_oauth}, oauth_version={self.oauth_config.version() if self.oauth_config is not None else None})' ) if self._use_oauth: @@ -135,19 +144,32 @@ def _request(self, method: str, endpoint: str, base_url: str = None, extra_heade raise def _get_headers(self, request_method: str, request_url: str): - if (not self._use_oauth) or request_url == f'{self.base_url}{self.oauth_config.live_session_token_endpoint}': - # No need for extra headers if we don't use oauth or getting live session token + if not self._use_oauth or self.oauth_config is None: return {} - # get headers for endpoints other than live session token request - from ibind.oauth.oauth1a import generate_oauth_headers + from ibind.oauth.oauth1a import OAuth1aConfig, generate_oauth_headers + from ibind.oauth.oauth2 import OAuth2Config + + if isinstance(self.oauth_config, OAuth2Config): + if request_url in {self.oauth_config.token_url, self.oauth_config.sso_session_url}: + return {} + + if not self.oauth_config.has_sso_bearer_token(): + _LOGGER.error(f'{self}: OAuth 2.0 configured for {request_url}, but SSO bearer token is missing.') + return {} + + return {'Authorization': f'Bearer {self.oauth_config.sso_bearer_token}'} - headers = generate_oauth_headers( + if not isinstance(self.oauth_config, OAuth1aConfig): + raise ValueError(f'Unsupported OAuth configuration type: {type(self.oauth_config)}') + + if request_url == f'{self.base_url}{self.oauth_config.live_session_token_endpoint}': + return {} + + return generate_oauth_headers( oauth_config=self.oauth_config, request_method=request_method, request_url=request_url, live_session_token=self.live_session_token ) - return headers - def generate_live_session_token(self): """ Generates a new live session token for OAuth 1.0a authentication. @@ -194,14 +216,28 @@ def oauth_init(self, maintain_oauth: bool, init_brokerage_session: bool): """ _LOGGER.info(f'{self}: Initialising OAuth {self.oauth_config.version()}') + from ibind.oauth.oauth1a import OAuth1aConfig, validate_live_session_token + from ibind.oauth.oauth2 import OAuth2Config, authenticate_oauth2, establish_oauth2_brokerage_session + if importlib.util.find_spec('Crypto') is None: raise ImportError('Installation lacks OAuth support. Please install by using `pip install ibind[oauth]`') - # get live session token for OAuth authentication - self.generate_live_session_token() + if isinstance(self.oauth_config, OAuth2Config): + sso_token = authenticate_oauth2(self) + if not sso_token: + raise ExternalBrokerError('Failed to obtain OAuth 2.0 SSO Bearer Token during oauth_init') + + if init_brokerage_session: + establish_oauth2_brokerage_session(self) - # validate the live session token once - from ibind.oauth.oauth1a import validate_live_session_token + if maintain_oauth: + self.start_tickler() + return + + if not isinstance(self.oauth_config, OAuth1aConfig): + raise ValueError(f'Unsupported OAuth configuration type: {type(self.oauth_config)}') + + self.generate_live_session_token() success = validate_live_session_token( live_session_token=self.live_session_token, @@ -262,10 +298,19 @@ def oauth_shutdown(self): This method stops the Tickler process, which keeps the session alive, and logs out from the IBKR API to ensure a clean session termination. """ + if not self._use_oauth or self.oauth_config is None: + return + _LOGGER.info(f'{self}: Shutting down OAuth') self.stop_tickler() self.logout() + from ibind.oauth.oauth2 import OAuth2Config + + if isinstance(self.oauth_config, OAuth2Config): + self.oauth_config.sso_bearer_token = None + self.oauth_config.access_token = None + def handle_health_status(self, raise_exceptions: bool = False) -> bool: warnings.warn("'handle_health_status' is deprecated. Calling it on a frequent basis is not recommended as IBKR expects /tickle call at most every 60 seconds. Use 'handle_auth_status' which utilises authentication_status() instead of tickle(), and use Tickler or manually ensure you call tickle() on a 60-second interval.", DeprecationWarning, stacklevel=2) @@ -320,7 +365,7 @@ def _attempt_health_check(self, method: callable, raise_exceptions: bool = False elif 'An attempt was made to access a socket in a way forbidden by its access permissions' in str(e): _LOGGER.error('Connection to IBKR servers blocked during reauthentication. Check that nothing is blocking connectivity of the application') elif e.status_code == 410 and 'gone' in str(e): - _LOGGER.error(f'OAuth 410 gone: recreate a new live session token, or try a different server, eg. "1.api.ibkr.com", "2.api.ibkr.com", etc.') + _LOGGER.error('OAuth 410 gone: recreate a new live session token, or try a different server, eg. "1.api.ibkr.com", "2.api.ibkr.com", etc.') else: _LOGGER.error(f'Unknown error checking IBKR connection during reauthentication: {exception_to_string(e)}') @@ -330,4 +375,4 @@ def _attempt_health_check(self, method: callable, raise_exceptions: bool = False _LOGGER.error(f'Error reauthenticating OAuth during reauthentication: {exception_to_string(e)}') if raise_exceptions: raise - return False \ No newline at end of file + return False diff --git a/ibind/oauth/oauth2.py b/ibind/oauth/oauth2.py new file mode 100644 index 00000000..612c8fa0 --- /dev/null +++ b/ibind/oauth/oauth2.py @@ -0,0 +1,422 @@ +import base64 +import json +import logging +import math +import pprint +from dataclasses import dataclass, field +from typing import Optional, TYPE_CHECKING + +import requests +import time +from Crypto.PublicKey import RSA +from Crypto.Signature import PKCS1_v1_5 +from Crypto.Hash import SHA256 + +from ibind import var +from ibind.oauth import OAuthConfig +from ibind.support.errors import ExternalBrokerError +from ibind.support.logs import project_logger + +_LOGGER = project_logger(__file__) + +if TYPE_CHECKING: + from ibind.client.ibkr_client import IbkrClient + + +@dataclass +class OAuth2Config(OAuthConfig): + """ + Dataclass encapsulating OAuth 2.0 configuration parameters. + """ + + client_id: str = var.IBIND_OAUTH2_CLIENT_ID + """ OAuth 2.0 Client ID. """ + + client_key_id: str = var.IBIND_OAUTH2_CLIENT_KEY_ID + """ OAuth 2.0 Client Key ID. """ + + private_key_pem: Optional[str] = var.IBIND_OAUTH2_PRIVATE_KEY_PEM + """ OAuth 2.0 private key PEM content. """ + + private_key_path: Optional[str] = var.IBIND_OAUTH2_PRIVATE_KEY_PATH + """ Path to the OAuth 2.0 private key PEM file. """ + + username: Optional[str] = var.IBIND_OAUTH2_USERNAME + """ IBKR username used for the OAuth 2.0 credential claim. """ + + ip_address: Optional[str] = var.IBIND_OAUTH2_IP_ADDRESS + """ Public IP address for the OAuth 2.0 credential claim. """ + + token_url: str = field(default=var.IBIND_OAUTH2_TOKEN_URL or 'https://api.ibkr.com/oauth2/api/v1/token') + """ OAuth 2.0 token endpoint URL. """ + + sso_session_url: str = field(default=var.IBIND_OAUTH2_SSO_SESSION_URL or 'https://api.ibkr.com/gw/api/v1/sso-sessions') + """ OAuth 2.0 SSO session endpoint URL. """ + + audience: str = field(default=var.IBIND_OAUTH2_AUDIENCE or '/token') + """ OAuth 2.0 JWT audience. """ + + scope: str = field(default=var.IBIND_OAUTH2_SCOPE or 'sso-sessions.write') + """ OAuth 2.0 token scope. """ + + oauth_rest_url: str = var.IBIND_OAUTH2_REST_URL or var.IBIND_REST_URL or 'https://api.ibkr.com/v1/api/' + """ REST base URL for OAuth 2.0 authenticated requests. """ + + oauth_ws_url: str = var.IBIND_OAUTH2_WS_URL or var.IBIND_WS_URL or 'wss://api.ibkr.com/v1/api/ws' + """ WebSocket base URL for OAuth 2.0 authenticated requests. """ + + access_token: Optional[str] = field(default=None, init=False) + """ OAuth 2.0 access token returned by the token endpoint. """ + + sso_bearer_token: Optional[str] = field(default=None, init=False) + """ SSO bearer token returned by the IBKR gateway. """ + + def __post_init__(self) -> None: + if self.private_key_pem is None and self.private_key_path: + try: + with open(self.private_key_path, 'r', encoding='utf-8') as file: + self.private_key_pem = file.read() + except OSError as exc: + _LOGGER.error(f'Failed to load OAuth 2.0 private key from {self.private_key_path}: {exc}') + + def version(self): + return 2.0 + + def has_sso_bearer_token(self) -> bool: + """Checks if an SSO bearer token is present and non-empty.""" + return bool(hasattr(self, 'sso_bearer_token') and self.sso_bearer_token) + + def verify_config(self) -> None: + required_params = [ + 'client_id', + 'client_key_id', + 'private_key_pem', + 'username', + 'token_url', + 'sso_session_url', + 'audience', + 'scope' + ] + missing_params = [param for param in required_params if not getattr(self, param)] + if missing_params: + raise ValueError(f'OAuth2Config is missing required parameters: {", ".join(missing_params)}') + super().verify_config() + +# Original oauth2.py logic starts here +def _get_public_ip(): + """Fetches the public IP address from an external service.""" + try: + response = requests.get("https://api.ipify.org?format=json", timeout=5) + response.raise_for_status() + ip_address = response.json()["ip"] + _LOGGER.debug(f"Successfully fetched public IP address: {ip_address}") + return ip_address + except requests.exceptions.RequestException as e: + _LOGGER.error(f"Could not fetch public IP address: {e}") + return None + except json.JSONDecodeError as e: + _LOGGER.error(f"Could not parse IP address from response: {e}") + return None + except KeyError as e: + _LOGGER.error(f"Could not extract IP from response (KeyError: {e})") + return None + except Exception as e: + _LOGGER.error(f"An unexpected error occurred while fetching public IP: {e}") + return None + +class OAuth2Handler: + """ + Handles the OAuth 2.0 authentication flow with Interactive Brokers. + Adapted from an earlier proof-of-concept script (e.g., a standalone IBOAuthAuthenticator). + """ + def __init__(self, client: 'IbkrClient'): + self.client = client + if not self.client.oauth_config.private_key_pem: + raise ValueError("Private key PEM cannot be empty (accessed via client.oauth_config).") + try: + self.jwt_private_key = RSA.import_key( + self.client.oauth_config.private_key_pem.replace('\\\\n', '\\n') + ) + except Exception as e: + raise ValueError(f"Failed to import private key: {e}") + + def _base64_encode(self, val: bytes) -> str: + return base64.b64encode(val).decode().replace('+', '-').replace('/', '_').rstrip('=') + + def _make_jws(self, header: dict, claims: dict) -> str: + json_header = json.dumps(header, separators=(',', ':')).encode() + encoded_header = self._base64_encode(json_header) + json_claims = json.dumps(claims, separators=(',', ':')).encode() + encoded_claims = self._base64_encode(json_claims) + payload = f"{encoded_header}.{encoded_claims}" + md = SHA256.new(payload.encode()) + signer = PKCS1_v1_5.new(self.jwt_private_key) + signature = signer.sign(md) + encoded_signature = self._base64_encode(signature) + return payload + "." + encoded_signature + + def _compute_client_assertion(self, url_for_assertion: str) -> str: + now = math.floor(time.time()) + header = { + 'alg': 'RS256', + 'typ': 'JWT', + 'kid': self.client.oauth_config.client_key_id + } + if url_for_assertion == self.client.oauth_config.token_url: + claims = { + 'iss': self.client.oauth_config.client_id, + 'sub': self.client.oauth_config.client_id, + 'aud': self.client.oauth_config.audience, + 'exp': now + 60, + 'iat': now - 10 + } + elif url_for_assertion == self.client.oauth_config.sso_session_url: + claims = { + 'ip': self.client.oauth_config.ip_address, + 'credential': self.client.oauth_config.username, + 'iss': self.client.oauth_config.client_id, + 'exp': now + 86400, + 'iat': now + } + else: + raise ValueError(f"Unknown URL for client assertion: {url_for_assertion}") + assertion = self._make_jws(header, claims) + return assertion + + def _pretty_request_response(self, resp: requests.Response) -> str: + req = resp.request + rqh = '\\n'.join(f"{k}: {v}" for k, v in req.headers.items()) + rqh = rqh.replace(', ', '\\n ') + rqb = req.body if req.body else "" + if isinstance(rqb, bytes): + rqb = rqb.decode('utf-8', errors='replace') + try: + rsb = f"\\n{pprint.pformat(resp.json())}\\n" if resp.text else "" + except json.JSONDecodeError: + rsb = resp.text + resp_headers_to_print = ["Content-Type", "Content-Length", "Date", "Set-Cookie", "User-Agent", "Cookie", "Cache-Control", "Host"] + rsh = '\\n'.join([f"{k}: {v}" for k, v in resp.headers.items() if k in resp_headers_to_print]) + return_str = '\\n'.join([ + '-----------REQUEST-----------', + f"{req.method} {req.url}", + "", + rqh, + str(rqb), + "", + '-----------RESPONSE-----------', + f"{resp.status_code} {resp.reason}", + rsh, + f"{rsb}\\n", + "", + ]) + return return_str + + def get_access_token(self) -> Optional[str]: + """Gets an OAuth 2.0 access token.""" + url = self.client.oauth_config.token_url + client_assertion = self._compute_client_assertion(url) + form_data = { + 'grant_type': 'client_credentials', + 'scope': self.client.oauth_config.scope, + 'client_id': self.client.oauth_config.client_id, + 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + 'client_assertion': client_assertion + } + headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded' + } + + _LOGGER.debug(f"Requesting OAuth 2.0 Access Token from {url}") + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug(f"Access Token Request Headers (to be passed to client._request): {headers}") + _LOGGER.debug(f"Access Token Request Form Data (to be passed to client._request): {pprint.pformat(form_data)}") + + try: + result = self.client._request( + method='POST', + endpoint=url, + base_url='', + extra_headers=headers, + data=form_data + ) + if not result or not isinstance(result.data, dict): + _LOGGER.error(f"Failed to obtain OAuth 2.0 Access Token. Result: {result.data if result and hasattr(result, 'data') else 'Result object missing or data attribute missing'}") + return None + + token_data = result.data + access_token = token_data.get('access_token') + if not access_token: + _LOGGER.error(f"access_token not found in response: {token_data}") + return None + _LOGGER.info("Successfully obtained OAuth 2.0 Access Token.") + return access_token + except ExternalBrokerError as e: + _LOGGER.error(f"HTTP error obtaining OAuth 2.0 Access Token via client._request: {e}") + return None + except Exception as e: + _LOGGER.error(f"Unexpected error obtaining OAuth 2.0 Access Token via client._request: {e}", exc_info=True) + return None + + def get_sso_bearer_token(self, access_token: str) -> Optional[str]: + """Gets an SSO bearer token using a previously obtained access_token.""" + if not access_token: + _LOGGER.error("Cannot get SSO bearer token without an access token.") + return None + + url = self.client.oauth_config.sso_session_url + signed_request_assertion_jwt = self._compute_client_assertion(url) + + headers = { + 'Authorization': f'Bearer {access_token}', + 'Content-Type': 'application/jwt', + 'Accept': 'application/json' + } + + _LOGGER.debug(f"Requesting SSO Bearer Token from {url}") + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug(f"SSO Bearer Token Request Headers (to be passed to client._request): {headers}") + _LOGGER.debug(f"SSO Bearer Token Request Body (JWT Assertion, to be passed to client._request): {signed_request_assertion_jwt[:100]}...") + + try: + result = self.client._request( + method='POST', + endpoint=url, + base_url='', + extra_headers=headers, + data=signed_request_assertion_jwt + ) + if not result or not isinstance(result.data, dict): + _LOGGER.error(f"Failed to obtain SSO Bearer Token. Result: {result.data if result and hasattr(result, 'data') else 'Result object missing or data attribute missing'}") + return None + + sso_data = result.data + sso_bearer_token = sso_data.get('access_token') + if not sso_bearer_token: + _LOGGER.error(f"SSO Bearer Token ('access_token') not found in response: {sso_data}") + return None + _LOGGER.info("Successfully obtained SSO Bearer Token.") + return sso_bearer_token + except ExternalBrokerError as e: + _LOGGER.error(f"HTTP error obtaining SSO Bearer Token via client._request: {e}") + return None + except Exception as e: + _LOGGER.error(f"Unexpected error obtaining SSO Bearer Token via client._request: {e}", exc_info=True) + return None + + def authenticate(self) -> Optional[str]: + _LOGGER.debug("Starting OAuth 2.0 authentication flow within OAuth2Handler.") + access_token = self.get_access_token() + if not access_token: + _LOGGER.error("Authentication failed: Could not retrieve access token.") + return None + sso_bearer_token = self.get_sso_bearer_token(access_token) + if not sso_bearer_token: + _LOGGER.error("Authentication failed: Could not retrieve SSO bearer token.") + return None + self.client.oauth_config.access_token = access_token + self.client.oauth_config.sso_bearer_token = sso_bearer_token + _LOGGER.info("OAuth 2.0 authentication successful. Tokens stored in client.oauth_config.") + _LOGGER.debug(f"OAuth 2.0 Authentication successful. SSO Token: {self.client.oauth_config.sso_bearer_token[:20]}... (truncated)") + return sso_bearer_token + +def authenticate_oauth2(client: 'IbkrClient') -> Optional[str]: + """Main function to authenticate using OAuth 2.0 and return the SSO Bearer Token.""" + config = client.oauth_config + _LOGGER.info(f"Starting OAuth 2.0 authentication for client_id: {config.client_id}, username: {config.username}") + + config.verify_config() + + if not config.ip_address: + _LOGGER.info("IP address not configured, attempting to fetch public IP.") + config.ip_address = _get_public_ip() + if not config.ip_address: + _LOGGER.error("Failed to obtain public IP address. Cannot proceed with OAuth 2.0.") + return None + _LOGGER.info(f"Using auto-detected public IP: {config.ip_address}") + else: + _LOGGER.info(f"Using pre-configured IP address: {config.ip_address}") + + if not config.private_key_pem: + if var.IBIND_OAUTH2_PRIVATE_KEY_PATH: + _LOGGER.error(f"private_key_pem is not set, but IBIND_OAUTH2_PRIVATE_KEY_PATH ({var.IBIND_OAUTH2_PRIVATE_KEY_PATH}) was. This suggests a load failure from path in OAuth2Config.__post_init__.") + else: + _LOGGER.error("private_key_pem is not set and no private key path was configured via IBIND_OAUTH2_PRIVATE_KEY_PATH. Cannot proceed.") + return None + + try: + handler = OAuth2Handler(client=client) + sso_token = handler.authenticate() + if sso_token: + _LOGGER.info("OAuth 2.0 Authentication successful. SSO Token obtained and stored in client.oauth_config.") + return sso_token + else: + _LOGGER.error("OAuth 2.0 Authentication failed: handler.authenticate() returned None. Check OAuth2Handler logs for more details (e.g., issues in get_access_token or get_sso_bearer_token).") + return None + except ValueError as e: + _LOGGER.error(f"OAuth 2.0 authentication failed due to ValueError: {e}", exc_info=True) + return None + except Exception as e: + _LOGGER.error(f"An unexpected error occurred during OAuth 2.0 authentication process: {e}", exc_info=True) + return None + +def establish_oauth2_brokerage_session(client: 'IbkrClient') -> None: + """ + Establishes the brokerage session for an OAuth 2.0 authenticated client. + + This involves validating the SSO session and then initializing the brokerage session. + """ + _LOGGER.debug(f"{client}: OAuth 2.0: Attempting to establish brokerage session (/sso/validate and initialize).") + + try: + validation_result = client.validate() + _LOGGER.debug(f"{client}: /sso/validate result: {validation_result.data if validation_result else 'No result'}") + + sso_is_valid = False + if validation_result and hasattr(validation_result, 'data') and isinstance(validation_result.data, dict): + if validation_result.data.get('RESULT') is True: + sso_is_valid = True + _LOGGER.debug(f"{client}: /sso/validate deemed successful based on 'RESULT': True.") + elif validation_result.data.get('authenticated') is True: # Fallback check + sso_is_valid = True + _LOGGER.debug(f"{client}: /sso/validate deemed successful based on 'authenticated': True.") + + if not sso_is_valid: + _LOGGER.warning( + f"{client}: /sso/validate did not indicate a clear success. " + f"Cannot proceed with brokerage session initialization. " + f"Validation data: {validation_result.data if validation_result else 'No result'}" + ) + return + + _LOGGER.debug(f"{client}: /sso/validate successful. Now attempting to initialize brokerage session.") + try: + _LOGGER.debug(f"{client}: Calling initialize_brokerage_session(compete=True).") + init_result = client.initialize_brokerage_session(compete=True) + _LOGGER.debug(f"{client}: initialize_brokerage_session(compete=True) result: {init_result.data if init_result else 'No result'}") + + auth_status_after_init = client.authentication_status() + _LOGGER.debug(f"{client}: /iserver/auth/status (after compete=True init): {auth_status_after_init.data if auth_status_after_init else 'No result'}") + if not (auth_status_after_init and auth_status_after_init.data and auth_status_after_init.data.get('authenticated')): + _LOGGER.warning(f"{client}: Still not authenticated after compete=True init.") + + except ExternalBrokerError as e_init_compete_true: + _LOGGER.error(f"{client}: initialize_brokerage_session(compete=True) failed: {e_init_compete_true}") + if e_init_compete_true.status_code == 500 and "failed to generate sso dh token" in str(e_init_compete_true): + _LOGGER.warning(f"{client}: Retrying initialize_brokerage_session with compete=False due to DH token error.") + try: + init_result_false = client.initialize_brokerage_session(compete=False) + _LOGGER.debug(f"{client}: initialize_brokerage_session(compete=False) result: {init_result_false.data if init_result_false else 'No result'}") + + auth_status_after_init_false = client.authentication_status() + _LOGGER.debug(f"{client}: /iserver/auth/status (after compete=False init): {auth_status_after_init_false.data if auth_status_after_init_false else 'No result'}") + if not (auth_status_after_init_false and auth_status_after_init_false.data and auth_status_after_init_false.data.get('authenticated')): + _LOGGER.warning(f"{client}: Still not authenticated after compete=False init.") + except Exception as e_init_compete_false: + _LOGGER.error(f"{client}: initialize_brokerage_session(compete=False) also failed: {e_init_compete_false}") + # else: other error from compete=True, not the DH token one. We just log it above. + except Exception as e_init_generic: + _LOGGER.error(f"{client}: A generic error occurred during initialize_brokerage_session(compete=True): {e_init_generic}") + + except Exception as e_validate_sequence: + _LOGGER.error(f"{client}: Error during /sso/validate or subsequent brokerage session initialization sequence: {e_validate_sequence}") diff --git a/ibind/var.py b/ibind/var.py index 246710d8..9f8c54df 100644 --- a/ibind/var.py +++ b/ibind/var.py @@ -162,3 +162,41 @@ def to_bool(value): IBIND_OAUTH1A_REALM = os.getenv('IBIND_OAUTH1A_REALM', 'limited_poa') """ OAuth 1.0a connection type. This is generally set to "limited_poa", however should be set to "test_realm" when using the TESTCONS consumer key. """ + +##### OAuth 2.0 ##### + +IBIND_OAUTH2_CLIENT_ID = os.getenv('IBIND_OAUTH2_CLIENT_ID', None) +""" OAuth 2.0 Client ID. """ + +IBIND_OAUTH2_CLIENT_KEY_ID = os.getenv('IBIND_OAUTH2_CLIENT_KEY_ID', None) +""" OAuth 2.0 Client Key ID. """ + +IBIND_OAUTH2_TOKEN_URL = os.getenv('IBIND_OAUTH2_TOKEN_URL', None) # Defaults to 'https://api.ibkr.com/oauth2/api/v1/token' in OAuth2Config if None +""" OAuth 2.0 Token Endpoint URL. """ + +IBIND_OAUTH2_SSO_SESSION_URL = os.getenv('IBIND_OAUTH2_SSO_SESSION_URL', None) # Defaults to 'https://api.ibkr.com/gw/api/v1/sso-sessions' in OAuth2Config if None +""" OAuth 2.0 SSO Session Endpoint URL. """ + +IBIND_OAUTH2_AUDIENCE = os.getenv('IBIND_OAUTH2_AUDIENCE', None) # Defaults to '/token' in OAuth2Config if None +""" OAuth 2.0 Audience for JWT. """ + +IBIND_OAUTH2_SCOPE = os.getenv('IBIND_OAUTH2_SCOPE', None) # Defaults to 'sso-sessions.write' in OAuth2Config if None +""" OAuth 2.0 Scope. """ + +IBIND_OAUTH2_REST_URL = os.getenv('IBIND_OAUTH2_REST_URL', None) +""" IBKR REST API Base URL when using OAuth 2.0. Defaults to IBIND_REST_URL or https://api.ibkr.com/v1/api/ in OAuth2Config. """ + +IBIND_OAUTH2_WS_URL = os.getenv('IBIND_OAUTH2_WS_URL', None) +""" IBKR WebSocket API URL when using OAuth 2.0. Defaults to IBIND_WS_URL or wss://api.ibkr.com/v1/api/ws in OAuth2Config. """ + +IBIND_OAUTH2_IP_ADDRESS = os.getenv('IBIND_OAUTH2_IP_ADDRESS', None) +""" Pre-configured IP address for OAuth 2.0. If None, it will be auto-fetched. """ + +IBIND_OAUTH2_PRIVATE_KEY_PEM = os.getenv('IBIND_OAUTH2_PRIVATE_KEY_PEM') +""" OAuth 2.0 Private Key PEM string. This should contain the actual multi-line PEM content. """ + +IBIND_OAUTH2_PRIVATE_KEY_PATH = os.getenv('IBIND_OAUTH2_PRIVATE_KEY_PATH') +""" Path to the OAuth 2.0 Private Key PEM file. If provided and IBIND_OAUTH2_PRIVATE_KEY_PEM is not set, the key will be loaded from this file. """ + +IBIND_OAUTH2_USERNAME = os.getenv('IBIND_OAUTH2_USERNAME') +""" IBKR Username for OAuth 2.0 'credential' claim. """ \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7c039b66..71f5db4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,8 @@ requires-python = ">=3.8" license-files=["LICENSE"] dependencies = [ "requests>=2.31", - "websocket-client>=1.7" + "websocket-client>=1.7", + "pycryptodome>=3.21" # Essential for OAuth 2.0 JWT signing ] classifiers = [ "Development Status :: 4 - Beta", @@ -26,9 +27,7 @@ classifiers = [ keywords=['interactive brokers', 'rest api', 'python api', 'ibkr python api', 'ibkr web api', 'ib api', 'ibkr api', 'algo trading', 'algorithmic trading', 'quant', 'trading'] [project.optional-dependencies] -oauth = [ - 'pycryptodome>=3.21', - 'urllib3>=2.3', +dev = [ ] [project.urls] diff --git a/test/e2e/test_ibind_oauth2.py b/test/e2e/test_ibind_oauth2.py new file mode 100644 index 00000000..e59f95c2 --- /dev/null +++ b/test/e2e/test_ibind_oauth2.py @@ -0,0 +1,232 @@ +import logging +import os +import sys +from pprint import pformat +import pytest # Added for pytest +# import yaml # For loading exponencia config - No longer needed + +# --- Basic logging setup for E2E test visibility --- +logging.basicConfig( + level=logging.DEBUG, # Capture DEBUG and above + format='%(asctime)s|%(levelname)s| %(name)s: %(message)s', + stream=sys.stdout, # Output to stdout +) +# --- --------------------------------------------- --- + +# --- Determine Project Root and add to sys.path if necessary for local dev --- +# SCRIPT_DIR is the directory of this script (e.g., /path/to/ibind/test/e2e) +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +# TEST_DIR is the parent of SCRIPT_DIR (e.g., /path/to/ibind/test) +TEST_DIR = os.path.dirname(SCRIPT_DIR) +# PROJECT_ROOT is the parent of TEST_DIR (e.g., /path/to/ibind) +PROJECT_ROOT = os.path.dirname(TEST_DIR) + +# If ibind is not installed and we're running from a clone, +# adding PROJECT_ROOT to sys.path allows importing 'ibind' directly. +# This is often handled by test runners or virtual environments too. +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) +# --- ------------------------------------------------------------- --- + +# Attempt to load .env file from project root +try: + from dotenv import load_dotenv + dotenv_path_project_root = os.path.join(PROJECT_ROOT, '.env') + if os.path.exists(dotenv_path_project_root): + print(f"Loading environment variables from project root: {dotenv_path_project_root}") + load_dotenv(dotenv_path=dotenv_path_project_root) + else: + print(f".env file not found in project root ({PROJECT_ROOT}). Will rely on system environment variables.") +except ImportError: + print("python-dotenv not found. Will rely on system environment variables.") + +from ibind.client.ibkr_client import IbkrClient +from ibind.oauth.oauth2 import OAuth2Config +# from ibind import var # No longer directly needed here, OAuth2Config handles env vars +from ibind.support.logs import ibind_logs_initialize + +# Initialize ibind logging (globally for all tests in this module) +ibind_logs_initialize(log_level='DEBUG', log_to_console=True, log_to_file=False) + +# Configure basic logging for the test script itself - Pytest handles this. +# logger = logging.getLogger(__name__) +# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + +# --- Pytest Fixture for IbkrClient with OAuth 2.0 --- +@pytest.fixture(scope="module") +def oauth2_client(): + logger = logging.getLogger(__name__) # Logger for fixture setup + + logger.info("--- Setting up IBind OAuth 2.0 Client for E2E Tests ---") + account_id_to_test = os.getenv('IBIND_ACCOUNT_ID') + required_env_vars = { + 'IBIND_OAUTH2_CLIENT_ID': os.getenv('IBIND_OAUTH2_CLIENT_ID'), + 'IBIND_OAUTH2_CLIENT_KEY_ID': os.getenv('IBIND_OAUTH2_CLIENT_KEY_ID'), + # IBIND_OAUTH2_PRIVATE_KEY_PEM or IBIND_OAUTH2_PRIVATE_KEY_PATH is handled by OAuth2Config + 'IBIND_OAUTH2_USERNAME': os.getenv('IBIND_OAUTH2_USERNAME'), + 'IBIND_ACCOUNT_ID': account_id_to_test, + } + + missing_vars = [var_name for var_name, value in required_env_vars.items() if not value] + if missing_vars: + pytest.skip(f"Missing required environment variables for OAuth 2.0 E2E tests: {', '.join(missing_vars)}. Skipping tests.") + + # Additional check for private key, as OAuth2Config needs one or the other (PEM string or path) + # This is implicitly checked by OAuth2Config's verify_config, but an early skip is clearer. + if not os.getenv('IBIND_OAUTH2_PRIVATE_KEY_PEM') and not os.getenv('IBIND_OAUTH2_PRIVATE_KEY_PATH'): + pytest.skip("Neither IBIND_OAUTH2_PRIVATE_KEY_PEM nor IBIND_OAUTH2_PRIVATE_KEY_PATH is set. Skipping tests.") + + client_instance = None + try: + logger.info("Creating OAuth2Config (will load from environment variables via ibind.var)...") + oauth2_config = OAuth2Config() # OAuth2Config will load from var and handle path/pem logic + + # Perform verification early to ensure config is valid before client instantiation + try: + oauth2_config.verify_config() + except ValueError as ve: + pytest.skip(f"OAuth2Config verification failed: {ve}. Check your OAuth 2.0 environment variables. Skipping tests.") + + logger.info(f"OAuth2Config created. Client ID: {oauth2_config.client_id}") + logger.info(f"Target IBKR Account ID for test: {account_id_to_test}") + logger.info(f"Attempting to instantiate IbkrClient with OAuth 2.0 for account: {account_id_to_test}") + + client_instance = IbkrClient( + account_id=account_id_to_test, + use_oauth=True, + oauth_config=oauth2_config + ) + logger.info("IbkrClient instantiated successfully with OAuth 2.0.") + + if hasattr(client_instance.oauth_config, 'sso_bearer_token') and client_instance.oauth_config.sso_bearer_token: + logger.info(f"SSO Bearer Token obtained: {client_instance.oauth_config.sso_bearer_token[:20]}... (truncated)") + else: + # This should ideally be caught by client instantiation if token is critical for readiness + pytest.fail("SSO Bearer Token NOT FOUND in oauth_config after client instantiation.") + + yield client_instance + + except Exception as e: + logger.error(f"Error during oauth2_client fixture setup: {e}", exc_info=True) + pytest.fail(f"Failed to setup oauth2_client fixture: {e}") + + finally: + if client_instance: + logger.info("\\n--- Shutting down IbkrClient (fixture teardown) ---") + try: + client_instance.logout() + logger.info("IbkrClient logout successful.") + except Exception as e: + logger.error(f"Error during client.logout() in fixture teardown: {e}", exc_info=True) + logger.info("--- IBind OAuth 2.0 Client Fixture Teardown Complete ---") + +# --- Test Functions --- + +def test_connection_and_sso_token(oauth2_client): + logger = logging.getLogger(__name__) + logger.info("Test: Verifying client connection and SSO token presence.") + assert oauth2_client is not None, "OAuth2 client fixture should not be None" + assert oauth2_client._use_oauth, "Client should be configured to use OAuth" + assert isinstance(oauth2_client.oauth_config, OAuth2Config), "Client should have OAuth2Config" + assert oauth2_client.oauth_config.has_sso_bearer_token(), "SSO Bearer Token should be present after client init" + logger.info("Client connection and SSO token presence verified.") + +def test_tickle(oauth2_client): + logger = logging.getLogger(__name__) + logger.info("Test: Calling client.tickle()") + try: + tickle_result = oauth2_client.tickle() + logger.info(f"tickle() result: {pformat(tickle_result.data if tickle_result else None)}") + assert tickle_result is not None, "Tickle result should not be None" + # Add more specific assertions if tickle_result has a status or expected data structure + # For example, if it's a wrapper: assert tickle_result.is_ok + # If it contains data: assert 'session' in tickle_result.data and tickle_result.data['session'] == 'tickled' (example) + except Exception as e: + logger.error(f"Error calling client.tickle(): {e}", exc_info=True) + pytest.fail(f"client.tickle() raised an exception: {e}") + +def test_portfolio_accounts(oauth2_client): + logger = logging.getLogger(__name__) + logger.info("Test: Calling client.portfolio_accounts()") + try: + portfolio_accounts_result = oauth2_client.portfolio_accounts() + logger.info(f"portfolio_accounts() result: {pformat(portfolio_accounts_result.data if portfolio_accounts_result else None)}") + assert portfolio_accounts_result is not None, "Portfolio accounts result should not be None" + assert portfolio_accounts_result.data is not None, "Portfolio accounts data should not be None" + # Example: assert isinstance(portfolio_accounts_result.data, list) + except Exception as e: + logger.error(f"Error calling client.portfolio_accounts(): {e}", exc_info=True) + pytest.fail(f"client.portfolio_accounts() raised an exception: {e}") + +def test_portfolio_summary(oauth2_client): + logger = logging.getLogger(__name__) + account_id = oauth2_client.account_id + logger.info(f"Test: Calling client.portfolio_summary(account_id='{account_id}')") + try: + summary_result = oauth2_client.portfolio_summary(account_id=account_id) + logger.info(f"portfolio_summary() result: {pformat(summary_result.data if summary_result else None)}") + assert summary_result is not None, "Portfolio summary result should not be None" + assert summary_result.data is not None, "Portfolio summary data should not be None" + # Example: assert isinstance(summary_result.data, dict) + except Exception as e: + logger.error(f"Error calling client.portfolio_summary(): {e}", exc_info=True) + pytest.fail(f"client.portfolio_summary() raised an exception: {e}") + +def test_positions(oauth2_client): + logger = logging.getLogger(__name__) + account_id = oauth2_client.account_id + logger.info(f"Test: Calling client.positions(account_id='{account_id}', page=0)") + try: + positions_result = oauth2_client.positions(account_id=account_id, page=0) + logger.info(f"positions() result: {pformat(positions_result.data if positions_result else None)}") + assert positions_result is not None, "Positions result should not be None" + # Positions data can be a list, might be empty if no positions + # assert positions_result.data is not None + except Exception as e: + logger.error(f"Error calling client.positions(): {e}", exc_info=True) + pytest.fail(f"client.positions() raised an exception: {e}") + +def test_live_marketdata_snapshot(oauth2_client): + logger = logging.getLogger(__name__) + # This test can be problematic if /accounts hasn't been hit, as seen in previous logs + # It also depends on market hours for some data. + # Keeping it simple: just check if the call executes without error for now. + symbols_to_test = ["AAPL", "MSFT"] + fields_to_request = ["31", "84", "86"] # Last, Bid, Ask + logger.info(f"Test: Calling client.live_marketdata_snapshot_by_symbol(queries={symbols_to_test}, fields={fields_to_request})") + try: + # Ensure /portfolio/accounts is called first if that's a prerequisite + # This might have been implicitly called by other tests or client init, but explicit can be safer + # For now, assume client setup or previous tests handle this. + # accounts = oauth2_client.portfolio_accounts() + # if not accounts or not accounts.data: + # pytest.skip("Could not fetch accounts, skipping market data snapshot test.") + + snapshot_result = oauth2_client.live_marketdata_snapshot_by_symbol(queries=symbols_to_test, fields=fields_to_request) + logger.info(f"live_marketdata_snapshot_by_symbol result: {pformat(snapshot_result)}") # This method returns a dict + assert snapshot_result is not None, "Market data snapshot result should not be None" + assert isinstance(snapshot_result, dict), "Market data snapshot should be a dict" + # Example: for symbol in symbols_to_test: assert symbol in snapshot_result + except Exception as e: + # Check for the specific "Please query /accounts first" error + if "Please query /accounts first" in str(e): + pytest.skip(f"Skipping market data snapshot test due to API sequence requirement: {e}") + logger.error(f"Error calling client.live_marketdata_snapshot_by_symbol(): {e}", exc_info=True) + pytest.fail(f"client.live_marketdata_snapshot_by_symbol() raised an exception: {e}") + +def test_trades_history(oauth2_client): + logger = logging.getLogger(__name__) + account_id = oauth2_client.account_id + logger.info(f"Test: Calling client.trades(account_id='{account_id}', days='7')") + try: + trades_result = oauth2_client.trades(account_id=account_id, days="7") + logger.info(f"trades() result: {pformat(trades_result.data if trades_result else None)}") + assert trades_result is not None, "Trades result should not be None" + # Trades data can be a list, might be empty + # assert trades_result.data is not None + except Exception as e: + logger.error(f"Error calling client.trades(): {e}", exc_info=True) + pytest.fail(f"client.trades() raised an exception: {e}") + +# Removed main() function and if __name__ == "__main__": block +# Pytest will discover and run functions starting with test_ \ No newline at end of file diff --git a/test/unit/client/test_ibkr_client_u.py b/test/unit/client/test_ibkr_client_u.py index 79a15321..ac74e640 100644 --- a/test/unit/client/test_ibkr_client_u.py +++ b/test/unit/client/test_ibkr_client_u.py @@ -5,7 +5,7 @@ @pytest.fixture def client(): # Minimal config for IbkrClient, mock dependencies - c = IbkrClient(use_oauth=False) + c = IbkrClient(use_oauth=False, auto_register_shutdown=False) c.check_auth_status = MagicMock() c.stop_tickler = MagicMock() c.oauth_init = MagicMock() @@ -61,4 +61,4 @@ def test_handle_auth_status_not_healthy_oauth_success(client, caplog): ## Assert assert any("IBKR connection is not healthy. Attempting to re-establish OAuth authentication." in r.message for r in caplog.records) client.stop_tickler.assert_called_once_with(15) - client.oauth_init.assert_called_once_with(maintain_oauth=True, init_brokerage_session=True) \ No newline at end of file + client.oauth_init.assert_called_once_with(maintain_oauth=True, init_brokerage_session=True) diff --git a/test/unit/oauth/test_oauth2_u.py b/test/unit/oauth/test_oauth2_u.py new file mode 100644 index 00000000..77b34866 --- /dev/null +++ b/test/unit/oauth/test_oauth2_u.py @@ -0,0 +1,62 @@ +from pathlib import Path + +import pytest + +from ibind.client.ibkr_client import IbkrClient +from ibind.oauth.oauth2 import OAuth2Config + + +def make_oauth2_config(**overrides) -> OAuth2Config: + config = { + 'client_id': 'client-id', + 'client_key_id': 'key-id', + 'private_key_pem': 'dummy-private-key', + 'username': 'user-name', + } + config.update(overrides) + return OAuth2Config(**config) + + +def test_oauth2_config_loads_private_key_from_path(tmp_path: Path) -> None: + key_path = tmp_path / 'private.pem' + key_path.write_text('file-private-key', encoding='utf-8') + + config = make_oauth2_config(private_key_pem=None, private_key_path=str(key_path)) + + assert config.private_key_pem == 'file-private-key' + + +def test_oauth2_get_headers_skips_token_and_sso_requests() -> None: + oauth_config = make_oauth2_config() + oauth_config.sso_bearer_token = 'sso-token' # noqa: S105 + client = IbkrClient(use_oauth=False, auto_register_shutdown=False) + client._use_oauth = True + client.oauth_config = oauth_config + + assert client._get_headers('POST', oauth_config.token_url) == {} + assert client._get_headers('POST', oauth_config.sso_session_url) == {} + + +def test_oauth2_get_headers_adds_bearer_token_for_api_requests() -> None: + oauth_config = make_oauth2_config() + oauth_config.sso_bearer_token = 'sso-token' # noqa: S105 + client = IbkrClient(use_oauth=False, auto_register_shutdown=False) + client._use_oauth = True + client.oauth_config = oauth_config + + headers = client._get_headers('GET', 'https://api.ibkr.com/v1/api/portfolio/accounts') + + assert headers == {'Authorization': 'Bearer sso-token'} + + +def test_oauth2_get_headers_without_sso_token_returns_empty_headers(caplog: pytest.LogCaptureFixture) -> None: + oauth_config = make_oauth2_config() + client = IbkrClient(use_oauth=False, auto_register_shutdown=False) + client._use_oauth = True + client.oauth_config = oauth_config + + with caplog.at_level('ERROR', logger='ibind.oauth.oauth2'): + headers = client._get_headers('GET', 'https://api.ibkr.com/v1/api/portfolio/accounts') + + assert headers == {} + assert 'SSO bearer token is missing' in caplog.text diff --git a/test/unit/support/test_py_utils_u.py b/test/unit/support/test_py_utils_u.py index 5fb4a250..2202a35b 100644 --- a/test/unit/support/test_py_utils_u.py +++ b/test/unit/support/test_py_utils_u.py @@ -62,7 +62,7 @@ def test_ensure_list_arg_with_keyword_arg_non_list(): def test_ensure_list_arg_with_missing_arg(): """Raises TypeError when the decorated arg is missing.""" # Arrange - + # Act / Assert with pytest.raises(TypeError): sample_function() @@ -225,4 +225,4 @@ def test_wait_until_timeout(): # Assert assert result is False duration = time.time() - start_time - assert duration == pytest.approx(timeout, abs=0.02) \ No newline at end of file + assert duration == pytest.approx(timeout, abs=0.02)