Skip to content

Commit 0393fb7

Browse files
committed
adding tests
1 parent 082daec commit 0393fb7

File tree

3 files changed

+716
-0
lines changed

3 files changed

+716
-0
lines changed

tests/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Author: Harsha Krishnareddy
4+
Email: c0mpiler@ins8s.dev
5+
"""

tests/conftest.py

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Author: Harsha Krishnareddy
4+
Email: c0mpiler@ins8s.dev
5+
"""
6+
import importlib
7+
import json
8+
import os
9+
import threading
10+
import time
11+
from http.server import HTTPServer
12+
from pathlib import Path
13+
from unittest.mock import MagicMock, patch
14+
15+
import pytest
16+
import requests
17+
from jose import jwk
18+
from jose import jwt as jose_jwt
19+
20+
# --- IMPORTANT: Import modules using the NEW package name 'novactl' --- #
21+
from novactl import auth, config, main # Import main as well
22+
23+
# --- Constants for Mocking ---
24+
TEST_SERVICE_NAME = "novactl-test"
25+
# Use derived username logic from config for consistency
26+
TEST_KEYRING_USERNAME = config._derive_keyring_username(TEST_SERVICE_NAME)
27+
TEST_REALM = "ibm-eda"
28+
TEST_CLIENT_ID = "eda-ui"
29+
TEST_SERVER_URL = "https://auth.eda.res.ibm.com:8443/auth"
30+
MOCK_KID = "test-key-id"
31+
32+
# --- Test Key Pair Generation --- #
33+
private_key_pem, public_key_pem = None, None
34+
mock_public_jwk = None
35+
try:
36+
from cryptography.hazmat.primitives import serialization
37+
from cryptography.hazmat.primitives.asymmetric import rsa
38+
39+
_private_key_obj = rsa.generate_private_key(public_exponent=65537, key_size=2048)
40+
_public_key_obj = _private_key_obj.public_key()
41+
private_key_pem = _private_key_obj.private_bytes(
42+
encoding=serialization.Encoding.PEM,
43+
format=serialization.PrivateFormat.TraditionalOpenSSL,
44+
encryption_algorithm=serialization.NoEncryption(),
45+
)
46+
public_key_pem = _public_key_obj.public_bytes(
47+
encoding=serialization.Encoding.PEM,
48+
format=serialization.PublicFormat.SubjectPublicKeyInfo,
49+
)
50+
mock_public_jwk = jwk.construct(_public_key_obj, algorithm="RS256").to_dict()
51+
mock_public_jwk["kid"] = MOCK_KID
52+
mock_public_jwk["use"] = "sig"
53+
except ImportError:
54+
print("WARNING: cryptography library not found, JWT tests will be limited.")
55+
mock_public_jwk = {
56+
"kty": "RSA", "alg": "RS256", "kid": MOCK_KID, "use": "sig",
57+
"n": "dummy", "e": "AQAB",
58+
}
59+
60+
# --- Base Test Configuration Data --- #
61+
TEST_CONFIG_DATA = {
62+
"keycloak_server_url": TEST_SERVER_URL,
63+
"keycloak_realm": TEST_REALM,
64+
"keycloak_client_id": TEST_CLIENT_ID,
65+
"keyring_service_name": TEST_SERVICE_NAME,
66+
"additional_scopes": "openid email profile offline_access",
67+
"token_leeway_seconds": 10,
68+
"ssl_verify": True,
69+
"log_level": "DEBUG",
70+
"console_log_level": "DEBUG",
71+
}
72+
73+
# --- Fixtures --- #
74+
75+
@pytest.fixture(scope="function", autouse=True)
76+
def mock_config_and_reload_modules(monkeypatch):
77+
"""Mocks config loading, derives values, and reloads affected modules."""
78+
# Start with default config and override with test data
79+
current_test_config = config.DEFAULT_CONFIG.copy()
80+
current_test_config.update(TEST_CONFIG_DATA)
81+
82+
# Manually derive values that load_config would normally handle
83+
current_test_config["keyring_username"] = config._derive_keyring_username(
84+
current_test_config["keyring_service_name"]
85+
)
86+
current_test_config["ssl_verify"] = config._process_ssl_verify(
87+
current_test_config["ssl_verify"]
88+
)
89+
server_url = current_test_config["keycloak_server_url"].rstrip("/")
90+
realm = current_test_config["keycloak_realm"]
91+
if server_url and realm:
92+
base_oidc_path = f"{server_url}/realms/{realm}/protocol/openid-connect"
93+
current_test_config["oidc_token_endpoint"] = f"{base_oidc_path}/token"
94+
current_test_config["oidc_logout_endpoint"] = f"{base_oidc_path}/logout"
95+
current_test_config["oidc_jwks_uri"] = f"{base_oidc_path}/certs"
96+
current_test_config["oidc_issuer"] = f"{server_url}/realms/{realm}"
97+
current_test_config["oidc_device_auth_endpoint"] = f"{base_oidc_path}/auth/device"
98+
99+
# --- Mock Config Module --- #
100+
mock_load = MagicMock(return_value=current_test_config)
101+
monkeypatch.setattr(config, "load_config", mock_load)
102+
monkeypatch.setattr(config, "_config", current_test_config)
103+
# Ensure get_config_value uses the mocked config
104+
def mock_get_config_value(key, default=None):
105+
return current_test_config.get(key, default)
106+
monkeypatch.setattr(config, "get_config_value", mock_get_config_value)
107+
# Mock logging setup within config as it might run on import
108+
monkeypatch.setattr(config, "_configure_logging", MagicMock())
109+
110+
# --- Reload Auth Module --- #
111+
# Reload to make it pick up the mocked config functions
112+
importlib.reload(auth)
113+
114+
# --- Mock Dependencies within Auth Module --- #
115+
# Mock JWKS fetching within the reloaded auth module
116+
mock_jwks_fetch = MagicMock(return_value={MOCK_KID: mock_public_jwk})
117+
monkeypatch.setattr(auth, "_get_public_keys", mock_jwks_fetch)
118+
auth._public_keys = None # Reset cache
119+
auth._jwks_uri = current_test_config.get("oidc_jwks_uri")
120+
121+
# Mock time.sleep used in polling
122+
mock_sleep = MagicMock()
123+
monkeypatch.setattr(auth.time, "sleep", mock_sleep)
124+
125+
# Mock Rich Progress used in polling
126+
mock_progress_instance = MagicMock()
127+
mock_progress_instance.add_task.return_value = 1 # Dummy task ID
128+
mock_progress_instance.update = MagicMock()
129+
mock_progress_class = MagicMock()
130+
mock_progress_class.return_value.__enter__.return_value = mock_progress_instance
131+
mock_progress_class.return_value.__exit__.return_value = False # No exception on exit
132+
monkeypatch.setattr(auth, "Progress", mock_progress_class)
133+
134+
# --- Reload Main Module --- #
135+
# Reload main to pick up reloaded auth/config
136+
importlib.reload(main)
137+
138+
# Yield the prepared config for tests to use
139+
yield current_test_config
140+
141+
# --- Cleanup --- #
142+
# Reset caches and module state after test
143+
config._config = None
144+
config._config_schema = None
145+
config._logging_configured = False
146+
auth._public_keys = None
147+
auth._jwks_uri = None
148+
# Consider reloading original modules if necessary, though pytest usually isolates tests
149+
150+
151+
@pytest.fixture(scope="function", autouse=True)
152+
def mock_keyring(monkeypatch):
153+
"""Mocks the keyring module used by auth.py."""
154+
keyring_store: Dict[tuple, str] = {}
155+
156+
def set_password(service, username, password):
157+
keyring_store[(service, username)] = password
158+
159+
def get_password(service, username):
160+
return keyring_store.get((service, username))
161+
162+
def delete_password(service, username):
163+
# Simulate PasswordDeleteError if not found
164+
try:
165+
from keyring.errors import PasswordDeleteError
166+
if (service, username) not in keyring_store:
167+
raise PasswordDeleteError("Not found")
168+
keyring_store.pop((service, username), None)
169+
except ImportError:
170+
keyring_store.pop((service, username), None) # Fallback if error class not available
171+
172+
# Patch the keyring functions within the reloaded auth module
173+
monkeypatch.setattr(auth.keyring, "set_password", set_password)
174+
monkeypatch.setattr(auth.keyring, "get_password", get_password)
175+
monkeypatch.setattr(auth.keyring, "delete_password", delete_password)
176+
177+
# Dynamically patch PasswordDeleteError if keyring library is available
178+
try:
179+
from keyring.errors import PasswordDeleteError
180+
monkeypatch.setattr(auth.keyring.errors, "PasswordDeleteError", PasswordDeleteError)
181+
except (ImportError, AttributeError):
182+
# If keyring or its errors module isn't available, skip patching the error class
183+
pass
184+
185+
yield keyring_store # Provide the store to tests for inspection/manipulation
186+
keyring_store.clear() # Clear store after test
187+
188+
189+
@pytest.fixture(scope="function")
190+
def mock_requests(requests_mock, mock_config_and_reload_modules):
191+
"""Sets up common Keycloak endpoint mocks for device flow using requests_mock."""
192+
# Get config from the other fixture
193+
cfg = mock_config_and_reload_modules
194+
195+
# --- Endpoint URLs from Config --- #
196+
device_auth_url = cfg.get("oidc_device_auth_endpoint")
197+
token_url = cfg.get("oidc_token_endpoint")
198+
jwks_uri = cfg.get("oidc_jwks_uri")
199+
200+
if not all([device_auth_url, token_url, jwks_uri]):
201+
pytest.fail("Missing OIDC endpoint configuration in mock_config")
202+
203+
# --- Mock JWKS Endpoint --- #
204+
requests_mock.get(jwks_uri, json={"keys": [mock_public_jwk] if mock_public_jwk else []})
205+
206+
# --- Token Generation Helper --- #
207+
def generate_mock_token(
208+
payload_override=None, expiry_offset_secs=3600, token_type="Bearer"
209+
):
210+
"""Generates a mock JWT token signed with the test private key."""
211+
if private_key_pem is None:
212+
# Fallback for limited testing without cryptography
213+
timestamp = int(time.time())
214+
return f"mock_{token_type}_token_{timestamp}"
215+
216+
now = int(time.time())
217+
iat = int(payload_override.get("iat", now)) if payload_override else now
218+
payload = {
219+
"iss": cfg["oidc_issuer"],
220+
"sub": "mock-user-id",
221+
"aud": cfg["keycloak_client_id"],
222+
"exp": iat + expiry_offset_secs,
223+
"iat": iat,
224+
"jti": f"jwt-id-{iat}-{token_type}",
225+
"typ": token_type,
226+
"azp": cfg["keycloak_client_id"],
227+
"preferred_username": "mockuser",
228+
"email": "mockuser@test.local", "email_verified": True,
229+
"name": "Mock User",
230+
"realm_access": {"roles": ["default-role", "realm-test-role"]},
231+
"resource_access": {cfg["keycloak_client_id"]: {"roles": ["client-test-role"]}},
232+
}
233+
# Add at_hash for ID tokens if access token can be generated consistently
234+
if token_type.lower() == "id":
235+
try:
236+
# Generate a *consistent* access token based on the same iat
237+
consistent_access_token = generate_mock_token(payload_override={'iat': iat}, token_type="Bearer")
238+
from jose.utils import calculate_at_hash
239+
# Use the correct algorithm specified in the header (RS256)
240+
payload['at_hash'] = calculate_at_hash(consistent_access_token, jwk.ALGORITHMS.RS256)
241+
except Exception as e:
242+
print(f"Warning: Could not generate at_hash for mock ID token: {e}")
243+
244+
if payload_override: payload.update(payload_override)
245+
# Recalculate expiry if iat was overridden
246+
if payload_override and 'iat' in payload_override: payload['exp'] = payload_override['iat'] + expiry_offset_secs
247+
248+
headers = {"alg": "RS256", "typ": "JWT", "kid": MOCK_KID}
249+
return jose_jwt.encode(payload, private_key_pem, algorithm="RS256", headers=headers)
250+
251+
# --- Token Response Generation Helper --- #
252+
def generate_token_response(
253+
iat_override=None, access_token_expiry=300, refresh_token_expiry=1800, include_refresh=True
254+
):
255+
"""Generates a dictionary mimicking a successful token endpoint response."""
256+
now = int(time.time())
257+
iat = iat_override if iat_override is not None else now
258+
access_payload = {"iat": iat}
259+
id_payload = {"iat": iat} # Nonce not relevant for device flow
260+
261+
response = {
262+
"access_token": generate_mock_token(payload_override=access_payload, expiry_offset_secs=access_token_expiry, token_type="Bearer"),
263+
"expires_in": access_token_expiry,
264+
"refresh_expires_in": refresh_token_expiry,
265+
"token_type": "Bearer",
266+
"id_token": generate_mock_token(payload_override=id_payload, expiry_offset_secs=access_token_expiry, token_type="ID"),
267+
"session_state": f"mock-session-state-{iat}",
268+
"scope": cfg.get("additional_scopes", "openid email profile"),
269+
}
270+
if include_refresh and "offline_access" in response["scope"]:
271+
response["refresh_token"] = f"mock_refresh_token_{iat}"
272+
return response
273+
274+
# --- Mock Device Authorization Endpoint --- #
275+
mock_device_response = {
276+
"device_code": "mock-device-code-123",
277+
"user_code": "ABCD-EFGH",
278+
"verification_uri": f"{cfg['oidc_issuer']}/device", # Example URI
279+
"verification_uri_complete": f"{cfg['oidc_issuer']}/device?user_code=ABCD-EFGH",
280+
"expires_in": 180, # 3 minutes
281+
"interval": 5, # Default polling interval
282+
}
283+
requests_mock.post(device_auth_url, json=mock_device_response)
284+
285+
# --- Mock Token Endpoint (Default: Pending) --- #
286+
# *** CORRECTED: Use standard requests_mock.post ***
287+
requests_mock.post(token_url, json={"error": "authorization_pending"}, status_code=400)
288+
289+
# --- Test Helpers --- #
290+
def set_token_endpoint_success(response_data=None):
291+
"""Updates the token endpoint mock to return a successful response."""
292+
if response_data is None:
293+
response_data = generate_token_response()
294+
# Use requests_mock directly, it handles overwriting previous mocks for the same URL/method
295+
requests_mock.post(token_url, json=response_data, status_code=200)
296+
297+
def set_token_endpoint_error(error_code="invalid_grant", status=400):
298+
"""Updates the token endpoint mock to return an error response."""
299+
requests_mock.post(token_url, json={"error": error_code}, status_code=status)
300+
301+
# Provide mocks and helpers to the test function
302+
test_helpers = {
303+
"generate_token": generate_mock_token,
304+
"generate_token_response": generate_token_response,
305+
"device_auth_url": device_auth_url,
306+
"token_url": token_url,
307+
"jwks_uri": jwks_uri,
308+
"mock_device_response": mock_device_response,
309+
"set_token_endpoint_success": set_token_endpoint_success,
310+
"set_token_endpoint_error": set_token_endpoint_error,
311+
}
312+
yield requests_mock, test_helpers # Yield requests_mock and the helpers dict
313+
314+
315+
@pytest.fixture(autouse=True)
316+
def reset_auth_caches(mock_keyring): # Depends on mock_keyring fixture
317+
"""Resets auth state and clears keyring before each test."""
318+
# Clear auth module caches
319+
auth._public_keys = None
320+
auth._jwks_uri = None
321+
# Clear the mocked keyring store via the fixture
322+
mock_keyring.clear()
323+
# Reset config cache (handled by mock_config_and_reload_modules fixture)
324+
325+
@pytest.fixture
326+
def runner():
327+
"""Provides a Typer CliRunner instance."""
328+
from typer.testing import CliRunner
329+
# Ensure the main module's app instance is used
330+
return CliRunner(mix_stderr=False) # Use the app from the reloaded main module

0 commit comments

Comments
 (0)