|
11 | 11 | import pytest
|
12 | 12 | from canaille import create_app
|
13 | 13 | from canaille.app import models
|
| 14 | +from canaille.app.session import SessionObject |
14 | 15 | from canaille.backends import Backend
|
15 | 16 | from canaille.core.models import Group
|
16 | 17 | from canaille.core.models import User
|
17 | 18 | from canaille.core.populate import fake_groups
|
18 | 19 | from canaille.core.populate import fake_users
|
19 | 20 | from canaille.oidc.basemodels import Token
|
20 |
| -from canaille.oidc.installation import generate_keypair |
21 | 21 | from flask import Flask
|
22 | 22 | from flask import g
|
| 23 | +from joserfc.jwk import JWKRegistry |
23 | 24 | from werkzeug.test import Client
|
24 | 25 |
|
25 | 26 |
|
@@ -55,25 +56,21 @@ def __init__(self, app: Flask, port: int, backend: Backend, logging: bool = Fals
|
55 | 56 | )
|
56 | 57 | self.models = models
|
57 | 58 | self.logged_user = None
|
58 |
| - self.login_datetime = None |
59 | 59 |
|
60 | 60 | @self.app.before_request
|
61 |
| - def logged_user(): |
| 61 | + def login(): |
62 | 62 | if self.logged_user:
|
63 |
| - g.user = self.logged_user |
64 |
| - else: |
65 |
| - try: |
66 |
| - del g.user |
67 |
| - except AttributeError: |
68 |
| - pass |
69 |
| - |
70 |
| - if self.login_datetime: |
71 |
| - g.last_login_datetime = self.login_datetime |
72 |
| - else: |
73 |
| - try: |
74 |
| - del g.last_login_datetime |
75 |
| - except AttributeError: |
76 |
| - pass |
| 63 | + now = datetime.datetime.now(datetime.timezone.utc) |
| 64 | + g.session = SessionObject( |
| 65 | + user=self.logged_user, last_login_datetime=now |
| 66 | + ) |
| 67 | + |
| 68 | + @self.app.after_request |
| 69 | + def logout(response): |
| 70 | + if self.logged_user: |
| 71 | + g.session = None |
| 72 | + |
| 73 | + return response |
77 | 74 |
|
78 | 75 | def _make_request_handler(self):
|
79 | 76 | server = self
|
@@ -144,12 +141,10 @@ def login(self, user: User):
|
144 | 141 | This allows to skip the connection screen.
|
145 | 142 | """
|
146 | 143 | self.logged_user = user
|
147 |
| - self.login_datetime = datetime.datetime.now(datetime.timezone.utc) |
148 | 144 |
|
149 | 145 | def logout(self):
|
150 | 146 | """Close the current user session if existing."""
|
151 | 147 | self.logged_user = None
|
152 |
| - self.login_datetime = None |
153 | 148 |
|
154 | 149 | def consent(self, user: User, client: Client | None = None):
|
155 | 150 | """Make a user consent to share data with OIDC clients.
|
@@ -188,13 +183,17 @@ def iam_server_port():
|
188 | 183 | @pytest.fixture(scope="session")
|
189 | 184 | def iam_configuration(tmp_path_factory, iam_server_port) -> dict[str, Any]:
|
190 | 185 | """Fixture for editing the configuration of :meth:`~pytest_iam.iam_server`."""
|
191 |
| - private_key, public_key = generate_keypair() |
192 | 186 | os.environ["AUTHLIB_INSECURE_TRANSPORT"] = "1"
|
| 187 | + |
| 188 | + jwk = JWKRegistry.generate_key("RSA", 1024) |
| 189 | + jwk.ensure_kid() |
| 190 | + |
193 | 191 | return {
|
194 | 192 | "TESTING": True,
|
195 | 193 | "ENV_FILE": None,
|
196 | 194 | "SECRET_KEY": str(uuid.uuid4()),
|
197 | 195 | "WTF_CSRF_ENABLED": False,
|
| 196 | + "PREFERRED_URL_SCHEME": "http", |
198 | 197 | "SERVER_NAME": f"localhost:{iam_server_port}",
|
199 | 198 | "CANAILLE": {
|
200 | 199 | "ENABLE_REGISTRATION": True,
|
@@ -222,10 +221,7 @@ def iam_configuration(tmp_path_factory, iam_server_port) -> dict[str, Any]:
|
222 | 221 | },
|
223 | 222 | "CANAILLE_OIDC": {
|
224 | 223 | "DYNAMIC_CLIENT_REGISTRATION_OPEN": True,
|
225 |
| - "JWT": { |
226 |
| - "PUBLIC_KEY": public_key, |
227 |
| - "PRIVATE_KEY": private_key, |
228 |
| - }, |
| 224 | + "ACTIVE_JWKS": [jwk.as_dict()], |
229 | 225 | },
|
230 | 226 | }
|
231 | 227 |
|
|
0 commit comments