|
| 1 | +from datetime import timedelta |
| 2 | + |
1 | 3 | from aiohttp.test_utils import make_mocked_request |
2 | 4 | from aiohttp.web import Application |
3 | | -from models_library.users import UserID |
| 5 | +from pytest_simcore.helpers.webserver_login import UserInfoDict |
4 | 6 | from simcore_service_webserver.login._confirmation_service import ( |
5 | 7 | get_or_create_confirmation_without_data, |
6 | 8 | is_confirmation_expired, |
|
12 | 14 |
|
13 | 15 |
|
14 | 16 | async def test_confirmation_token_workflow( |
15 | | - db: AsyncpgStorage, login_options: LoginOptions, user_id: UserID |
| 17 | + db: AsyncpgStorage, login_options: LoginOptions, registered_user: UserInfoDict |
16 | 18 | ): |
17 | 19 | # Step 1: Create a new confirmation token |
| 20 | + user_id = registered_user["id"] |
18 | 21 | action = "RESET_PASSWORD" |
19 | 22 | confirmation = await get_or_create_confirmation_without_data( |
20 | 23 | login_options, db, user_id=user_id, action=action |
@@ -42,13 +45,65 @@ async def test_confirmation_token_workflow( |
42 | 45 | "/auth/confirmation/{code}", lambda request: None, name="auth_confirmation" |
43 | 46 | ) |
44 | 47 | request = make_mocked_request( |
45 | | - "GET", "/auth/confirmation/{code}", app=app, headers={"Host": "example.com"} |
| 48 | + "GET", |
| 49 | + "http://example.com/auth/confirmation/{code}", |
| 50 | + app=app, |
| 51 | + headers={"Host": "example.com"}, |
46 | 52 | ) |
47 | | - request.scheme = "http" |
48 | 53 |
|
49 | 54 | # Create confirmation link |
50 | 55 | confirmation_link = make_confirmation_link(request, confirmation) |
51 | 56 |
|
52 | 57 | # Assertions |
53 | 58 | assert confirmation_link.startswith("http://example.com/auth/confirmation/") |
54 | 59 | assert confirmation["code"] in confirmation_link |
| 60 | + |
| 61 | + |
| 62 | +async def test_expired_confirmation_token( |
| 63 | + db: AsyncpgStorage, login_options: LoginOptions, registered_user: UserInfoDict |
| 64 | +): |
| 65 | + user_id = registered_user["id"] |
| 66 | + action = "CHANGE_EMAIL" |
| 67 | + |
| 68 | + # Create a brand new confirmation token |
| 69 | + confirmation_1 = await get_or_create_confirmation_without_data( |
| 70 | + login_options, db, user_id=user_id, action=action |
| 71 | + ) |
| 72 | + |
| 73 | + assert confirmation_1 is not None |
| 74 | + assert confirmation_1["user_id"] == user_id |
| 75 | + assert confirmation_1["action"] == action |
| 76 | + |
| 77 | + # Check that the token is not expired |
| 78 | + assert not is_confirmation_expired(login_options, confirmation_1) |
| 79 | + |
| 80 | + confirmation_2 = await get_or_create_confirmation_without_data( |
| 81 | + login_options, db, user_id=user_id, action=action |
| 82 | + ) |
| 83 | + |
| 84 | + assert confirmation_2 == confirmation_1 |
| 85 | + |
| 86 | + # Enforce ALL EXPIRED |
| 87 | + login_options.CHANGE_EMAIL_CONFIRMATION_LIFETIME = 0 |
| 88 | + assert login_options.get_confirmation_lifetime(action) == timedelta(seconds=0) |
| 89 | + |
| 90 | + confirmation_3 = await get_or_create_confirmation_without_data( |
| 91 | + login_options, db, user_id=user_id, action=action |
| 92 | + ) |
| 93 | + |
| 94 | + # when expired, it gets renewed |
| 95 | + assert confirmation_3 != confirmation_1 |
| 96 | + |
| 97 | + # now all have expired |
| 98 | + assert ( |
| 99 | + await validate_confirmation_code(confirmation_1["code"], db, login_options) |
| 100 | + is None |
| 101 | + ) |
| 102 | + assert ( |
| 103 | + await validate_confirmation_code(confirmation_2["code"], db, login_options) |
| 104 | + is None |
| 105 | + ) |
| 106 | + assert ( |
| 107 | + await validate_confirmation_code(confirmation_3["code"], db, login_options) |
| 108 | + is None |
| 109 | + ) |
0 commit comments