diff --git a/dev/environment b/dev/environment index 2f3ae12e94ff..6022e8f8abb4 100644 --- a/dev/environment +++ b/dev/environment @@ -59,6 +59,7 @@ TOKEN_PASSWORD_SECRET="an insecure password reset secret key" TOKEN_EMAIL_SECRET="an insecure email verification secret key" TOKEN_TWO_FACTOR_SECRET="an insecure two-factor auth secret key" TOKEN_REMEMBER_DEVICE_SECRET="an insecure remember device auth secret key" +TOKEN_CONFIRM_LOGIN_SECRET="an insecure confirm login auth secret key" WAREHOUSE_LEGACY_DOMAIN=pypi.python.org diff --git a/tests/common/db/accounts.py b/tests/common/db/accounts.py index 468e1bd73f11..19913cd2b0a0 100644 --- a/tests/common/db/accounts.py +++ b/tests/common/db/accounts.py @@ -14,8 +14,10 @@ TermsOfServiceEngagement, User, UserTermsOfServiceEngagement, + UserUniqueLogin, ) +from ...common.constants import REMOTE_ADDR from .base import WarehouseFactory fake = faker.Faker() @@ -130,3 +132,11 @@ class Meta: # TODO: Replace when factory_boy supports `unique`. # See https://github.com/FactoryBoy/factory_boy/pull/997 name = factory.Sequence(lambda _: fake.unique.user_name()) + + +class UserUniqueLoginFactory(WarehouseFactory): + class Meta: + model = UserUniqueLogin + + user = factory.SubFactory(UserFactory) + ip_address = REMOTE_ADDR diff --git a/tests/functional/manage/test_views.py b/tests/functional/manage/test_views.py index 4e53ea6e2b81..8b66246b4e7d 100644 --- a/tests/functional/manage/test_views.py +++ b/tests/functional/manage/test_views.py @@ -11,13 +11,14 @@ from webob.multidict import MultiDict from warehouse.accounts.interfaces import IPasswordBreachedService, IUserService +from warehouse.accounts.models import UniqueLoginStatus from warehouse.manage import views from warehouse.manage.views import organizations as org_views from warehouse.organizations.interfaces import IOrganizationService from warehouse.organizations.models import OrganizationType from warehouse.utils.otp import _get_totp -from ...common.db.accounts import EmailFactory, UserFactory +from ...common.db.accounts import EmailFactory, UserFactory, UserUniqueLoginFactory class TestManageAccount: @@ -52,6 +53,9 @@ def test_changing_password_succeeds(self, webtest, socket_enabled): with_terms_of_service_agreement=True, clear_pwd="password", ) + UserUniqueLoginFactory.create( + user=user, ip_address="1.2.3.4", status=UniqueLoginStatus.CONFIRMED + ) # visit login page login_page = webtest.get("/account/login/", status=HTTPStatus.OK) diff --git a/tests/unit/accounts/test_core.py b/tests/unit/accounts/test_core.py index 14cd5dde9c9f..d4ea5fe44428 100644 --- a/tests/unit/accounts/test_core.py +++ b/tests/unit/accounts/test_core.py @@ -167,6 +167,11 @@ def test_includeme(monkeypatch): pretend.call( TokenServiceFactory(name="two_factor"), ITokenService, name="two_factor" ), + pretend.call( + TokenServiceFactory(name="confirm_login"), + ITokenService, + name="confirm_login", + ), pretend.call( TokenServiceFactory(name="remember_device"), ITokenService, diff --git a/tests/unit/accounts/test_models.py b/tests/unit/accounts/test_models.py index d13f097f7347..9f2b32ef3433 100644 --- a/tests/unit/accounts/test_models.py +++ b/tests/unit/accounts/test_models.py @@ -7,7 +7,13 @@ from pyramid.authorization import Authenticated -from warehouse.accounts.models import Email, RecoveryCode, User, UserFactory, WebAuthn +from warehouse.accounts.models import ( + Email, + RecoveryCode, + User, + UserFactory, + WebAuthn, +) from warehouse.authnz import Permissions from warehouse.utils.security_policy import principals_for @@ -15,6 +21,7 @@ EmailFactory as DBEmailFactory, UserEventFactory as DBUserEventFactory, UserFactory as DBUserFactory, + UserUniqueLoginFactory, ) from ...common.db.packaging import ( ProjectFactory as DBProjectFactory, @@ -309,3 +316,14 @@ def test_user_projects_is_ordered_by_name(self, db_session): DBRoleFactory.create(project=project3, user=user) assert user.projects == [project2, project3, project1] + + +class TestUserUniqueLogin: + def test_repr(self, db_session): + unique_login = UserUniqueLoginFactory.create() + assert ( + repr(unique_login) + == f"" + ) diff --git a/tests/unit/accounts/test_views.py b/tests/unit/accounts/test_views.py index d1c89a35789f..6f45e3726ceb 100644 --- a/tests/unit/accounts/test_views.py +++ b/tests/unit/accounts/test_views.py @@ -36,7 +36,11 @@ TooManyFailedLogins, TooManyPasswordResetRequests, ) -from warehouse.accounts.models import TermsOfServiceEngagement +from warehouse.accounts.models import ( + TermsOfServiceEngagement, + UniqueLoginStatus, + UserUniqueLogin, +) from warehouse.accounts.views import ( REMEMBER_DEVICE_COOKIE, two_factor_and_totp_validate, @@ -61,7 +65,11 @@ from warehouse.packaging.models import Role, RoleInvitation from warehouse.rate_limiting.interfaces import IRateLimiter -from ...common.db.accounts import EmailFactory, UserFactory +from ...common.db.accounts import ( + EmailFactory, + UserFactory, + UserUniqueLoginFactory, +) from ...common.db.ip_addresses import IpAddressFactory from ...common.db.organizations import ( OrganizationFactory, @@ -325,7 +333,7 @@ def test_post_invalid_returns_form( @pytest.mark.parametrize("with_user", [True, False]) def test_post_validate_redirects( - self, monkeypatch, pyramid_request, pyramid_services, metrics, with_user + self, monkeypatch, db_request, pyramid_services, metrics, with_user ): remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) monkeypatch.setattr(views, "remember", remember) @@ -351,23 +359,21 @@ def test_post_validate_redirects( breach_service, IPasswordBreachedService, None ) - pyramid_request.method = "POST" - pyramid_request.session = pretend.stub( + db_request.method = "POST" + db_request.session = pretend.stub( items=lambda: [("a", "b"), ("foo", "bar")], update=new_session.update, invalidate=pretend.call_recorder(lambda: None), new_csrf_token=pretend.call_recorder(lambda: None), ) - pyramid_request._unauthenticated_userid = ( - str(uuid.uuid4()) if with_user else None - ) + db_request._unauthenticated_userid = str(uuid.uuid4()) if with_user else None - pyramid_request.registry.settings = {"sessions.secret": "dummy_secret"} - pyramid_request.session.record_auth_timestamp = pretend.call_recorder( + db_request.registry.settings = {"sessions.secret": "dummy_secret"} + db_request.session.record_auth_timestamp = pretend.call_recorder( lambda *args: None ) - pyramid_request.session.record_password_timestamp = lambda timestamp: None + db_request.session.record_password_timestamp = lambda timestamp: None form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), @@ -376,25 +382,25 @@ def test_post_validate_redirects( ) form_class = pretend.call_recorder(lambda d, **kw: form_obj) - pyramid_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") + db_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") now = datetime.datetime.now(datetime.UTC) with freezegun.freeze_time(now): - result = views.login(pyramid_request, _form_class=form_class) + result = views.login(db_request, _form_class=form_class) assert metrics.increment.calls == [] assert isinstance(result, HTTPSeeOther) - assert pyramid_request.route_path.calls == [pretend.call("manage.projects")] + assert db_request.route_path.calls == [pretend.call("manage.projects")] assert result.headers["Location"] == "/the-redirect" assert result.headers["Set-Cookie"].startswith("user_id__insecure=") assert result.headers["foo"] == "bar" assert form_class.calls == [ pretend.call( - pyramid_request.POST, - request=pyramid_request, + db_request.POST, + request=db_request, user_service=user_service, breach_service=breach_service, check_password_metrics_tags=["method:auth", "auth_method:login_form"], @@ -407,7 +413,7 @@ def test_post_validate_redirects( assert user.record_event.calls == [ pretend.call( tag=EventTag.Account.LoginSuccess, - request=pyramid_request, + request=db_request, additional={"two_factor_method": None, "two_factor_label": None}, ) ] @@ -417,18 +423,17 @@ def test_post_validate_redirects( else: assert new_session == {"a": "b", "foo": "bar"} - assert remember.calls == [pretend.call(pyramid_request, str(user_id))] - assert pyramid_request.session.invalidate.calls == [pretend.call()] - assert pyramid_request.session.new_csrf_token.calls == [pretend.call()] - assert pyramid_request.session.record_auth_timestamp.calls == [pretend.call()] + assert remember.calls == [pretend.call(db_request, str(user_id))] + assert db_request.session.invalidate.calls == [pretend.call()] + assert db_request.session.new_csrf_token.calls == [pretend.call()] + assert db_request.session.record_auth_timestamp.calls == [pretend.call()] - def test_post_validate_flash_tos(self, pyramid_request, pyramid_services): - user = pretend.stub( - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) + def test_post_validate_flash_tos(self, db_request, pyramid_services): + user = UserFactory.create() + user.record_event = pretend.call_recorder(lambda *a, **kw: None) user_service = pretend.stub( get_user=pretend.call_recorder(lambda userid: user), - find_userid=pretend.call_recorder(lambda username: 1), + find_userid=pretend.call_recorder(lambda username: user.id), update_user=lambda *a, **k: None, has_two_factor=lambda userid: False, get_password_timestamp=lambda userid: 0, @@ -444,21 +449,21 @@ def test_post_validate_flash_tos(self, pyramid_request, pyramid_services): breach_service, IPasswordBreachedService, None ) - pyramid_request.method = "POST" + db_request.method = "POST" - pyramid_request.session.record_auth_timestamp = pretend.call_recorder( + db_request.session.record_auth_timestamp = pretend.call_recorder( lambda *args: None ) - pyramid_request.session.record_password_timestamp = lambda timestamp: None - pyramid_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + db_request.session.record_password_timestamp = lambda timestamp: None + db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) security_policy = pretend.stub( identity=lambda r: None, remember=lambda r, u, **kw: [], reset=pretend.call_recorder(lambda r: None), ) - pyramid_request.registry.queryUtility = lambda iface: security_policy - pyramid_request.registry.settings = {"terms.revision": "the-revision"} + db_request.registry.queryUtility = lambda iface: security_policy + db_request.registry.settings = {"terms.revision": "the-revision"} form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), @@ -466,11 +471,11 @@ def test_post_validate_flash_tos(self, pyramid_request, pyramid_services): password=pretend.stub(data="password"), ) form_class = pretend.call_recorder(lambda d, **kw: form_obj) - pyramid_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") + db_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") - views.login(pyramid_request, _form_class=form_class) + views.login(db_request, _form_class=form_class) - assert pyramid_request.session.flash.calls == [ + assert db_request.session.flash.calls == [ pretend.call( ( "Please review our updated " @@ -481,7 +486,7 @@ def test_post_validate_flash_tos(self, pyramid_request, pyramid_services): ) ] assert user_service.record_tos_engagement.calls == [ - pretend.call(1, "the-revision", TermsOfServiceEngagement.Flashed) + pretend.call(user.id, "the-revision", TermsOfServiceEngagement.Flashed) ] @pytest.mark.parametrize( @@ -491,14 +496,14 @@ def test_post_validate_flash_tos(self, pyramid_request, pyramid_services): [("/security/", "/security/"), ("http://example.com", "/the-redirect")], ) def test_post_validate_no_redirects( - self, pyramid_request, pyramid_services, expected_next_url, observed_next_url + self, db_request, pyramid_services, expected_next_url, observed_next_url ): - user = pretend.stub( - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) + user = UserFactory.create() + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) user_service = pretend.stub( get_user=pretend.call_recorder(lambda userid: user), - find_userid=pretend.call_recorder(lambda username: 1), + find_userid=pretend.call_recorder(lambda username: user.id), update_user=lambda *a, **k: None, has_two_factor=lambda userid: False, get_password_timestamp=lambda userid: 0, @@ -511,20 +516,20 @@ def test_post_validate_no_redirects( breach_service, IPasswordBreachedService, None ) - pyramid_request.method = "POST" - pyramid_request.POST["next"] = expected_next_url + db_request.method = "POST" + db_request.POST["next"] = expected_next_url - pyramid_request.session.record_auth_timestamp = pretend.call_recorder( + db_request.session.record_auth_timestamp = pretend.call_recorder( lambda *args: None ) - pyramid_request.session.record_password_timestamp = lambda timestamp: None + db_request.session.record_password_timestamp = lambda timestamp: None security_policy = pretend.stub( identity=lambda r: None, remember=lambda r, u, **kw: [], reset=pretend.call_recorder(lambda r: None), ) - pyramid_request.registry.queryUtility = lambda iface: security_policy + db_request.registry.queryUtility = lambda iface: security_policy form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), @@ -532,21 +537,21 @@ def test_post_validate_no_redirects( password=pretend.stub(data="password"), ) form_class = pretend.call_recorder(lambda d, **kw: form_obj) - pyramid_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") + db_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") - result = views.login(pyramid_request, _form_class=form_class) + result = views.login(db_request, _form_class=form_class) assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == observed_next_url assert user.record_event.calls == [ pretend.call( tag=EventTag.Account.LoginSuccess, - request=pyramid_request, + request=db_request, additional={"two_factor_method": None, "two_factor_label": None}, ) ] - assert pyramid_request.session.record_auth_timestamp.calls == [pretend.call()] - assert security_policy.reset.calls == [pretend.call(pyramid_request)] + assert db_request.session.record_auth_timestamp.calls == [pretend.call()] + assert security_policy.reset.calls == [pretend.call(db_request)] def test_redirect_authenticated_user(self): pyramid_request = pretend.stub(user=pretend.stub()) @@ -608,6 +613,121 @@ def test_two_factor_auth( ("Location", "/account/two-factor"), ] + def test_login_with_remembered_device_confirms_unique_login( + self, monkeypatch, db_request, pyramid_services + ): + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + monkeypatch.setattr(views, "_check_remember_device_token", lambda r, uid: True) + + user = UserFactory.create() + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user.id), + update_user=pretend.call_recorder(lambda *a, **kw: None), + get_user=pretend.call_recorder(lambda userid: user), + has_two_factor=lambda userid: True, + get_password_timestamp=lambda userid: 0, + needs_tos_flash=lambda userid, revision: False, + ) + breach_service = pretend.stub(check_password=lambda password, tags=None: False) + + pyramid_services.register_service(user_service, IUserService, None) + pyramid_services.register_service( + breach_service, IPasswordBreachedService, None + ) + + UserUniqueLoginFactory.create( + user=user, + ip_address=db_request.remote_addr, + status=UniqueLoginStatus.PENDING, + ) + + db_request.method = "POST" + db_request.session = pretend.stub( + items=lambda: [], + update=lambda d: None, + invalidate=pretend.call_recorder(lambda: None), + new_csrf_token=pretend.call_recorder(lambda: None), + record_auth_timestamp=pretend.call_recorder(lambda: None), + record_password_timestamp=lambda ts: None, + ) + db_request.registry.settings = {"sessions.secret": "dummy_secret"} + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + username=pretend.stub(data=user.username), + password=pretend.stub(data="password"), + ) + form_class = pretend.call_recorder(lambda d, **kw: form_obj) + db_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") + + views.login(db_request, _form_class=form_class) + + unique_login = ( + db_request.db.query(UserUniqueLogin) + .filter(UserUniqueLogin.user == user) + .one() + ) + assert unique_login.status == UniqueLoginStatus.CONFIRMED + + def test_login_updates_last_used(self, monkeypatch, db_request, pyramid_services): + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + + user = UserFactory.create() + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user.id), + update_user=pretend.call_recorder(lambda *a, **kw: None), + get_user=pretend.call_recorder(lambda userid: user), + has_two_factor=lambda userid: False, + get_password_timestamp=lambda userid: 0, + needs_tos_flash=lambda userid, revision: False, + ) + breach_service = pretend.stub(check_password=lambda password, tags=None: False) + + pyramid_services.register_service(user_service, IUserService, None) + pyramid_services.register_service( + breach_service, IPasswordBreachedService, None + ) + + # Create a unique login with a timestamp in the distant past. + past_timestamp = datetime.datetime(1970, 1, 1) + UserUniqueLoginFactory.create( + user=user, + ip_address=db_request.remote_addr, + status=UniqueLoginStatus.CONFIRMED, + last_used=past_timestamp, + ) + + db_request.method = "POST" + db_request.session = pretend.stub( + items=lambda: [], + update=lambda d: None, + invalidate=pretend.call_recorder(lambda: None), + new_csrf_token=pretend.call_recorder(lambda: None), + record_auth_timestamp=pretend.call_recorder(lambda: None), + record_password_timestamp=lambda ts: None, + ) + db_request.registry.settings = {"sessions.secret": "dummy_secret"} + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + username=pretend.stub(data=user.username), + password=pretend.stub(data="password"), + ) + form_class = pretend.call_recorder(lambda d, **kw: form_obj) + db_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") + + # Simulate the login. + views.login(db_request, _form_class=form_class) + + unique_login = db_request.db.query(UserUniqueLogin).one() + assert unique_login.last_used > past_timestamp + class TestTwoFactor: def test_get_two_factor_data_invalid_after_login(self, pyramid_request): @@ -847,22 +967,40 @@ def test_get_returns_recovery_code_status(self, pyramid_request, redirect_url): def test_totp_auth( self, monkeypatch, - pyramid_request, + db_request, redirect_url, has_recovery_codes, remember_device, + make_email_renderers, + metrics, ): + make_email_renderers("unrecognized-login") remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) monkeypatch.setattr(views, "remember", remember) _remember_device = pretend.call_recorder(lambda *a, **kw: None) monkeypatch.setattr(views, "_remember_device", _remember_device) - query_params = {"userid": str(1)} + user = UserFactory.create( + with_verified_primary_email=True, + username="testuser", + name="Test User", + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + monkeypatch.setattr( + type(user), + "has_recovery_codes", + property(lambda u: has_recovery_codes), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} if redirect_url: query_params["redirect_to"] = redirect_url - token_service = pretend.stub( + two_factor_token_service = pretend.stub( loads=pretend.call_recorder( lambda *args, **kwargs: ( query_params, @@ -870,16 +1008,8 @@ def test_totp_auth( ) ) ) - - user = pretend.stub( - last_login=( - datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) - ), - has_recovery_codes=has_recovery_codes, - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) user_service = pretend.stub( - find_userid=pretend.call_recorder(lambda username: 1), + find_userid=pretend.call_recorder(lambda username: user.id), get_user=pretend.call_recorder(lambda userid: user), update_user=lambda *a, **k: None, has_totp=lambda userid: True, @@ -892,13 +1022,13 @@ def test_totp_auth( new_session = {} - pyramid_request.find_service = lambda interface, **kwargs: { - ITokenService: token_service, + db_request.find_service = lambda interface, **kwargs: { + ITokenService: two_factor_token_service, IUserService: user_service, }[interface] - pyramid_request.method = "POST" - pyramid_request.session = pretend.stub( + db_request.method = "POST" + db_request.session = pretend.stub( items=lambda: [("a", "b"), ("foo", "bar")], update=new_session.update, invalidate=pretend.call_recorder(lambda: None), @@ -906,11 +1036,11 @@ def test_totp_auth( get_password_timestamp=lambda userid: 0, ) - pyramid_request.session.record_auth_timestamp = pretend.call_recorder( + db_request.session.record_auth_timestamp = pretend.call_recorder( lambda *args: None ) - pyramid_request.session.record_password_timestamp = lambda timestamp: None - pyramid_request.registry.settings = {"remember_device.days": 30} + db_request.session.record_password_timestamp = lambda timestamp: None + db_request.registry.settings = {"remember_device.days": 30} form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), @@ -918,46 +1048,48 @@ def test_totp_auth( remember_device=pretend.stub(data=remember_device), ) form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) - pyramid_request.route_path = pretend.call_recorder( - lambda a: "/account/two-factor" - ) - pyramid_request.params = pretend.stub( + db_request.route_path = pretend.call_recorder(lambda a: "/account/two-factor") + db_request.params = pretend.stub( get=pretend.call_recorder(lambda k: query_params.get(k)) ) - pyramid_request.user = user + db_request.user = user + + UserUniqueLoginFactory.create( + user=user, + ip_address=db_request.remote_addr, + status=UniqueLoginStatus.CONFIRMED, + ) send_email = pretend.call_recorder(lambda *a: None) monkeypatch.setattr(views, "send_recovery_code_reminder_email", send_email) - result = views.two_factor_and_totp_validate( - pyramid_request, _form_class=form_class - ) + result = views.two_factor_and_totp_validate(db_request, _form_class=form_class) - token_expected_data = {"userid": str(1)} + token_expected_data = {"userid": str(user.id)} if redirect_url: token_expected_data["redirect_to"] = redirect_url assert isinstance(result, HTTPSeeOther) - assert remember.calls == [pretend.call(pyramid_request, str(1))] - assert pyramid_request.session.invalidate.calls == [pretend.call()] - assert pyramid_request.session.new_csrf_token.calls == [pretend.call()] + assert remember.calls == [pretend.call(db_request, str(user.id))] + assert db_request.session.invalidate.calls == [pretend.call()] + assert db_request.session.new_csrf_token.calls == [pretend.call()] assert user.record_event.calls == [ pretend.call( tag=EventTag.Account.LoginSuccess, - request=pyramid_request, + request=db_request, additional={"two_factor_method": "totp", "two_factor_label": "totp"}, ) ] - assert pyramid_request.session.record_auth_timestamp.calls == [pretend.call()] + assert db_request.session.record_auth_timestamp.calls == [pretend.call()] assert send_email.calls == ( - [] if has_recovery_codes else [pretend.call(pyramid_request, user)] + [] if has_recovery_codes else [pretend.call(db_request, user)] ) assert _remember_device.calls == ( [] if not remember_device - else [pretend.call(pyramid_request, result, str(1), "totp")] + else [pretend.call(db_request, result, str(user.id), "totp")] ) def test_totp_auth_already_authed(self): @@ -972,61 +1104,401 @@ def test_totp_auth_already_authed(self): assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "redirect_to" - def test_totp_form_invalid(self): - token_data = {"userid": 1} - token_service = pretend.stub( + def test_totp_auth_no_unique_login( + self, + monkeypatch, + db_request, + make_email_renderers, + metrics, + ): + make_email_renderers("unrecognized-login") + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + + _remember_device = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "_remember_device", _remember_device) + + user = UserFactory.create( + with_verified_primary_email=True, + username="testuser", + name="Test User", + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + monkeypatch.setattr( + type(user), + "has_recovery_codes", + property(lambda u: False), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} + + confirm_login_token_service = pretend.stub( + dumps=pretend.call_recorder(lambda d: "fake_token") + ) + two_factor_token_service = pretend.stub( loads=pretend.call_recorder( lambda *args, **kwargs: ( - token_data, + query_params, datetime.datetime.now(datetime.UTC), ) ) ) - user_service = pretend.stub( - get_user=pretend.call_recorder( - lambda userid: pretend.stub( - last_login=( - datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) - ) - ) - ), + find_userid=pretend.call_recorder(lambda username: user.id), + get_user=pretend.call_recorder(lambda userid: user), + update_user=lambda *a, **k: None, has_totp=lambda userid: True, has_webauthn=lambda userid: False, has_recovery_codes=lambda userid: False, - check_totp_value=lambda userid, totp_value: False, + check_totp_value=lambda userid, totp_value: True, + get_password_timestamp=lambda userid: 0, + needs_tos_flash=lambda userid, revision: False, ) - request = pretend.stub( - POST={}, - method="POST", - session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), - identity=None, - route_path=pretend.call_recorder(lambda p: "redirect_to"), - find_service=lambda interface, **kwargs: { - ITokenService: token_service, - IUserService: user_service, - }[interface], - query_string=pretend.stub(), - registry=pretend.stub(settings={"remember_device.days": 30}), + new_session = {} + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: { + "confirm_login": confirm_login_token_service, + "two_factor": two_factor_token_service, + None: two_factor_token_service, + }, + IUserService: { + None: user_service, + }, + }[interface][name] + + db_request.method = "POST" + db_request.session = pretend.stub( + items=lambda: [("a", "b"), ("foo", "bar")], + update=new_session.update, + invalidate=pretend.call_recorder(lambda: None), + new_csrf_token=pretend.call_recorder(lambda: None), + get_password_timestamp=lambda userid: 0, + ) + + db_request.session.record_auth_timestamp = pretend.call_recorder( + lambda *args: None + ) + db_request.session.record_password_timestamp = lambda timestamp: None + db_request.registry.settings = {"remember_device.days": 30} + db_request.headers["User-Agent"] = ( + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 " + "Firefox/15.0.1" ) form_obj = pretend.stub( - validate=pretend.call_recorder(lambda: False), + validate=pretend.call_recorder(lambda: True), totp_value=pretend.stub(data="test-otp-secret"), + remember_device=pretend.stub(data=False), ) - form_class = pretend.call_recorder(lambda *a, **kw: form_obj) + form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) + db_request.route_path = pretend.call_recorder( + lambda a: "/account/confirm-login" + ) + db_request.params = pretend.stub( + get=pretend.call_recorder(lambda k: query_params.get(k)) + ) + db_request.user = user - result = views.two_factor_and_totp_validate(request, _form_class=form_class) + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_unrecognized_login_email", send_email) - assert token_service.loads.calls == [ - pretend.call(request.query_string, return_timestamp=True) - ] - assert result == {"totp_form": form_obj, "remember_device_days": 30} + result = views.two_factor_and_totp_validate(db_request, _form_class=form_class) - def test_two_factor_token_missing_userid(self, pyramid_request): - token_service = pretend.stub( - loads=pretend.call_recorder(lambda *a, **kw: ({}, None)) + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/account/confirm-login" + assert send_email.calls == [ + pretend.call( + db_request, + user, + ip_address=db_request.remote_addr, + user_agent="Firefox (Ubuntu)", + token="fake_token", + ) + ] + + @pytest.mark.parametrize("ua_string", [None, "no bueno", "Python-urllib/3.7"]) + def test_totp_auth_no_unique_login_bad_user_agent( + self, + monkeypatch, + db_request, + make_email_renderers, + metrics, + ua_string, + ): + make_email_renderers("unrecognized-login") + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + + _remember_device = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "_remember_device", _remember_device) + + user = UserFactory.create( + with_verified_primary_email=True, + username="testuser", + name="Test User", + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + monkeypatch.setattr( + type(user), + "has_recovery_codes", + property(lambda u: False), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} + + confirm_login_token_service = pretend.stub( + dumps=pretend.call_recorder(lambda d: "fake_token") + ) + two_factor_token_service = pretend.stub( + loads=pretend.call_recorder( + lambda *args, **kwargs: ( + query_params, + datetime.datetime.now(datetime.UTC), + ) + ) + ) + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user.id), + get_user=pretend.call_recorder(lambda userid: user), + update_user=lambda *a, **k: None, + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, + has_recovery_codes=lambda userid: False, + check_totp_value=lambda userid, totp_value: True, + get_password_timestamp=lambda userid: 0, + needs_tos_flash=lambda userid, revision: False, + ) + + new_session = {} + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: { + "confirm_login": confirm_login_token_service, + "two_factor": two_factor_token_service, + None: two_factor_token_service, + }, + IUserService: { + None: user_service, + }, + }[interface][name] + + db_request.method = "POST" + db_request.session = pretend.stub( + items=lambda: [("a", "b"), ("foo", "bar")], + update=new_session.update, + invalidate=pretend.call_recorder(lambda: None), + new_csrf_token=pretend.call_recorder(lambda: None), + get_password_timestamp=lambda userid: 0, + ) + + db_request.session.record_auth_timestamp = pretend.call_recorder( + lambda *args: None + ) + db_request.session.record_password_timestamp = lambda timestamp: None + db_request.registry.settings = {"remember_device.days": 30} + if ua_string: + db_request.headers["User-Agent"] = ua_string + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + totp_value=pretend.stub(data="test-otp-secret"), + remember_device=pretend.stub(data=False), + ) + form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) + db_request.route_path = pretend.call_recorder( + lambda a: "/account/confirm-login" + ) + db_request.params = pretend.stub( + get=pretend.call_recorder(lambda k: query_params.get(k)) + ) + db_request.user = user + + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_unrecognized_login_email", send_email) + + result = views.two_factor_and_totp_validate(db_request, _form_class=form_class) + + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/account/confirm-login" + assert send_email.calls == [ + pretend.call( + db_request, + user, + ip_address=db_request.remote_addr, + user_agent=ua_string or "Unknown User-Agent", + token="fake_token", + ) + ] + + def test_totp_auth_redirect_with_pending_unique_login( + self, + monkeypatch, + db_request, + make_email_renderers, + ): + make_email_renderers("unrecognized-login") + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + + _remember_device = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "_remember_device", _remember_device) + + user = UserFactory.create( + with_verified_primary_email=True, + username="testuser", + name="Test User", + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + monkeypatch.setattr( + type(user), + "has_recovery_codes", + property(lambda u: False), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} + + two_factor_token_service = pretend.stub( + loads=pretend.call_recorder( + lambda *args, **kwargs: ( + query_params, + datetime.datetime.now(datetime.UTC), + ) + ) + ) + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user.id), + get_user=pretend.call_recorder(lambda userid: user), + update_user=lambda *a, **k: None, + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, + has_recovery_codes=lambda userid: False, + check_totp_value=lambda userid, totp_value: True, + get_password_timestamp=lambda userid: 0, + needs_tos_flash=lambda userid, revision: False, + ) + + new_session = {} + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: { + "two_factor": two_factor_token_service, + None: two_factor_token_service, + }, + IUserService: { + None: user_service, + }, + }[interface][name] + + db_request.method = "POST" + db_request.session = pretend.stub( + items=lambda: [("a", "b"), ("foo", "bar")], + update=new_session.update, + invalidate=pretend.call_recorder(lambda: None), + new_csrf_token=pretend.call_recorder(lambda: None), + get_password_timestamp=lambda userid: 0, + ) + + db_request.session.record_auth_timestamp = pretend.call_recorder( + lambda *args: None + ) + db_request.session.record_password_timestamp = lambda timestamp: None + db_request.registry.settings = {"remember_device.days": 30} + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + totp_value=pretend.stub(data="test-otp-secret"), + remember_device=pretend.stub(data=False), + ) + form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) + db_request.route_path = pretend.call_recorder( + lambda a: "/account/confirm-login" + ) + db_request.params = pretend.stub( + get=pretend.call_recorder(lambda k: query_params.get(k)) + ) + db_request.user = user + + UserUniqueLoginFactory.create( + user=user, + ip_address=db_request.remote_addr, + status=UniqueLoginStatus.PENDING, + ) + + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_unrecognized_login_email", send_email) + + result = views.two_factor_and_totp_validate(db_request, _form_class=form_class) + + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/account/confirm-login" + assert send_email.calls == [] + + def test_totp_form_invalid(self): + token_data = {"userid": 1} + token_service = pretend.stub( + loads=pretend.call_recorder( + lambda *args, **kwargs: ( + token_data, + datetime.datetime.now(datetime.UTC), + ) + ) + ) + + user_service = pretend.stub( + get_user=pretend.call_recorder( + lambda userid: pretend.stub( + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ) + ) + ), + has_totp=lambda userid: True, + has_webauthn=lambda userid: False, + has_recovery_codes=lambda userid: False, + check_totp_value=lambda userid, totp_value: False, + ) + + request = pretend.stub( + POST={}, + method="POST", + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), + identity=None, + route_path=pretend.call_recorder(lambda p: "redirect_to"), + find_service=lambda interface, **kwargs: { + ITokenService: token_service, + IUserService: user_service, + }[interface], + query_string=pretend.stub(), + registry=pretend.stub(settings={"remember_device.days": 30}), + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: False), + totp_value=pretend.stub(data="test-otp-secret"), + ) + form_class = pretend.call_recorder(lambda *a, **kw: form_obj) + + result = views.two_factor_and_totp_validate(request, _form_class=form_class) + + assert token_service.loads.calls == [ + pretend.call(request.query_string, return_timestamp=True) + ] + assert result == {"totp_form": form_obj, "remember_device_days": 30} + + def test_two_factor_token_missing_userid(self, pyramid_request): + token_service = pretend.stub( + loads=pretend.call_recorder(lambda *a, **kw: ({}, None)) ) pyramid_request.session = pretend.stub( @@ -1368,10 +1840,270 @@ def test_two_factor_token_invalid(self, pyramid_request): pretend.call("Invalid or expired two factor login.", queue="error") ] - def test_get_returns_form(self, pyramid_request): - query_params = {"userid": 1} + def test_get_returns_form(self, pyramid_request): + query_params = {"userid": 1} + + token_service = pretend.stub( + loads=pretend.call_recorder( + lambda *args, **kwargs: ( + query_params, + datetime.datetime.now(datetime.UTC), + ) + ) + ) + + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: 1), + get_user=pretend.call_recorder( + lambda userid: pretend.stub( + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ) + ) + ), + update_user=lambda *a, **k: None, + has_totp=lambda uid: True, + has_webauthn=lambda uid: False, + has_recovery_codes=lambda uid: False, + ) + + pyramid_request.find_service = lambda interface, **kwargs: { + ITokenService: token_service, + IUserService: user_service, + }[interface] + pyramid_request.query_string = pretend.stub() + + form_obj = pretend.stub() + form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) + + result = views.recovery_code(pyramid_request, _form_class=form_class) + + assert token_service.loads.calls == [ + pretend.call(pyramid_request.query_string, return_timestamp=True) + ] + assert result == {"form": form_obj} + assert form_class.calls == [ + pretend.call( + pyramid_request.POST, + request=pyramid_request, + user_id=1, + user_service=user_service, + ) + ] + + @pytest.mark.parametrize("redirect_url", ["test_redirect_url", None]) + def test_recovery_code_auth_with_confirmed_unique_login( + self, monkeypatch, db_request, redirect_url + ): + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + + user = UserFactory.create( + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + + query_params = {"userid": str(user_id)} + if redirect_url: + query_params["redirect_to"] = redirect_url + + token_service = pretend.stub( + loads=pretend.call_recorder( + lambda *args, **kwargs: ( + query_params, + datetime.datetime.now(datetime.UTC), + ) + ) + ) + + user_service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user_id), + get_user=pretend.call_recorder(lambda userid: user), + update_user=lambda *a, **k: None, + has_recovery_codes=lambda userid: True, + check_recovery_code=lambda userid, recovery_code_value: True, + get_password_timestamp=lambda userid: 0, + needs_tos_flash=lambda userid, revision: False, + ) + + new_session = {} + + db_request.find_service = lambda interface, **kwargs: { + ITokenService: token_service, + IUserService: user_service, + }[interface] + + db_request.method = "POST" + db_request.session = pretend.stub( + items=lambda: [("a", "b"), ("foo", "bar")], + update=new_session.update, + invalidate=pretend.call_recorder(lambda: None), + new_csrf_token=pretend.call_recorder(lambda: None), + flash=pretend.call_recorder(lambda message, queue: None), + ) + + db_request.set_property( + lambda r: str(uuid.uuid4()), name="unauthenticated_userid" + ) + db_request.session.record_auth_timestamp = pretend.call_recorder( + lambda *args: None + ) + db_request.session.record_password_timestamp = lambda timestamp: None + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + recovery_code_value=pretend.stub(data="recovery-code"), + ) + form_class = pretend.call_recorder(lambda d, **kw: form_obj) + db_request.route_path = pretend.call_recorder(lambda a: "/account/two-factor") + db_request.params = pretend.stub( + get=pretend.call_recorder(lambda k: query_params.get(k)) + ) + + UserUniqueLoginFactory.create( + user=user, + ip_address=db_request.remote_addr, + status=UniqueLoginStatus.CONFIRMED, + ) + + result = views.recovery_code(db_request, _form_class=form_class) + + token_expected_data = {"userid": str(user_id)} + if redirect_url: + token_expected_data["redirect_to"] = redirect_url + + assert isinstance(result, HTTPSeeOther) + assert result.headers["Set-Cookie"].startswith("user_id__insecure=") + + assert remember.calls == [pretend.call(db_request, str(user_id))] + assert db_request.session.invalidate.calls == [pretend.call()] + assert db_request.session.new_csrf_token.calls == [pretend.call()] + assert user.record_event.calls == [ + pretend.call( + tag=EventTag.Account.LoginSuccess, + request=db_request, + additional={ + "two_factor_method": "recovery-code", + "two_factor_label": None, + }, + ), + pretend.call( + tag=EventTag.Account.RecoveryCodesUsed, + request=db_request, + ), + ] + assert db_request.session.flash.calls == [ + pretend.call( + "Recovery code accepted. The supplied code cannot be used again.", + queue="success", + ) + ] + assert db_request.session.record_auth_timestamp.calls == [pretend.call()] + + def test_recovery_code_auth_no_unique_login( + self, monkeypatch, db_request, make_email_renderers + ): + make_email_renderers("unrecognized-login") + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) + + user = UserFactory.create( + with_verified_primary_email=True, + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} + + confirm_login_token_service = pretend.stub( + dumps=pretend.call_recorder(lambda d: "fake_token") + ) + two_factor_token_service = pretend.stub( + loads=pretend.call_recorder( + lambda *args, **kwargs: ( + query_params, + datetime.datetime.now(datetime.UTC), + ) + ) + ) + user_service = pretend.stub( + get_user=pretend.call_recorder(lambda userid: user), + check_recovery_code=lambda userid, recovery_code_value: True, + ) + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: { + "confirm_login": confirm_login_token_service, + "two_factor": two_factor_token_service, + None: two_factor_token_service, + }, + IUserService: { + None: user_service, + }, + }[interface][name] + + db_request.method = "POST" + db_request.headers["User-Agent"] = ( + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 " + "Firefox/15.0.1" + ) + + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + recovery_code_value=pretend.stub(data="recovery-code"), + ) + form_class = pretend.call_recorder(lambda d, **kw: form_obj) + db_request.route_path = pretend.call_recorder( + lambda a: "/account/confirm-login" + ) + db_request.params = pretend.stub( + get=pretend.call_recorder(lambda k: query_params.get(k)) + ) + + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_unrecognized_login_email", send_email) + + result = views.recovery_code(db_request, _form_class=form_class) + + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/account/confirm-login" + assert send_email.calls == [ + pretend.call( + db_request, + user, + ip_address=db_request.remote_addr, + user_agent="Firefox (Ubuntu)", + token="fake_token", + ) + ] + + @pytest.mark.parametrize("ua_string", [None, "no bueno", "Python-urllib/3.7"]) + def test_recovery_code_auth_no_unique_login_bad_user_agent( + self, monkeypatch, db_request, make_email_renderers, ua_string + ): + make_email_renderers("unrecognized-login") + remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) + monkeypatch.setattr(views, "remember", remember) - token_service = pretend.stub( + user = UserFactory.create( + with_verified_primary_email=True, + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} + + confirm_login_token_service = pretend.stub( + dumps=pretend.call_recorder(lambda d: "fake_token") + ) + two_factor_token_service = pretend.stub( loads=pretend.call_recorder( lambda *args, **kwargs: ( query_params, @@ -1379,56 +2111,73 @@ def test_get_returns_form(self, pyramid_request): ) ) ) - user_service = pretend.stub( - find_userid=pretend.call_recorder(lambda username: 1), - get_user=pretend.call_recorder( - lambda userid: pretend.stub( - last_login=( - datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) - ) - ) - ), - update_user=lambda *a, **k: None, - has_totp=lambda uid: True, - has_webauthn=lambda uid: False, - has_recovery_codes=lambda uid: False, + get_user=pretend.call_recorder(lambda userid: user), + check_recovery_code=lambda userid, recovery_code_value: True, ) - pyramid_request.find_service = lambda interface, **kwargs: { - ITokenService: token_service, - IUserService: user_service, - }[interface] - pyramid_request.query_string = pretend.stub() + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: { + "confirm_login": confirm_login_token_service, + "two_factor": two_factor_token_service, + None: two_factor_token_service, + }, + IUserService: { + None: user_service, + }, + }[interface][name] - form_obj = pretend.stub() - form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) + db_request.method = "POST" + if ua_string: + db_request.headers["User-Agent"] = ua_string - result = views.recovery_code(pyramid_request, _form_class=form_class) + form_obj = pretend.stub( + validate=pretend.call_recorder(lambda: True), + recovery_code_value=pretend.stub(data="recovery-code"), + ) + form_class = pretend.call_recorder(lambda d, **kw: form_obj) + db_request.route_path = pretend.call_recorder( + lambda a: "/account/confirm-login" + ) + db_request.params = pretend.stub( + get=pretend.call_recorder(lambda k: query_params.get(k)) + ) - assert token_service.loads.calls == [ - pretend.call(pyramid_request.query_string, return_timestamp=True) - ] - assert result == {"form": form_obj} - assert form_class.calls == [ + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_unrecognized_login_email", send_email) + + result = views.recovery_code(db_request, _form_class=form_class) + + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/account/confirm-login" + assert send_email.calls == [ pretend.call( - pyramid_request.POST, - request=pyramid_request, - user_id=1, - user_service=user_service, + db_request, + user, + ip_address=db_request.remote_addr, + user_agent=ua_string or "Unknown User-Agent", + token="fake_token", ) ] - @pytest.mark.parametrize("redirect_url", ["test_redirect_url", None]) - def test_recovery_code_auth(self, monkeypatch, pyramid_request, redirect_url): + def test_recovery_code_auth_with_pending_unique_login( + self, monkeypatch, db_request, make_email_renderers + ): + make_email_renderers("unrecognized-login") remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")]) monkeypatch.setattr(views, "remember", remember) - query_params = {"userid": str(1)} - if redirect_url: - query_params["redirect_to"] = redirect_url + user = UserFactory.create( + with_verified_primary_email=True, + last_login=( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) + ), + ) + user.record_event = pretend.call_recorder(lambda *a, **kw: None) + user_id = user.id + query_params = {"userid": str(user_id)} - token_service = pretend.stub( + two_factor_token_service = pretend.stub( loads=pretend.call_recorder( lambda *args, **kwargs: ( query_params, @@ -1436,91 +2185,49 @@ def test_recovery_code_auth(self, monkeypatch, pyramid_request, redirect_url): ) ) ) - - user = pretend.stub( - last_login=( - datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=1) - ), - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) user_service = pretend.stub( - find_userid=pretend.call_recorder(lambda username: 1), get_user=pretend.call_recorder(lambda userid: user), - update_user=lambda *a, **k: None, - has_recovery_codes=lambda userid: True, check_recovery_code=lambda userid, recovery_code_value: True, - get_password_timestamp=lambda userid: 0, - needs_tos_flash=lambda userid, revision: False, ) - new_session = {} - - pyramid_request.find_service = lambda interface, **kwargs: { - ITokenService: token_service, - IUserService: user_service, - }[interface] - - pyramid_request.method = "POST" - pyramid_request.session = pretend.stub( - items=lambda: [("a", "b"), ("foo", "bar")], - update=new_session.update, - invalidate=pretend.call_recorder(lambda: None), - new_csrf_token=pretend.call_recorder(lambda: None), - flash=pretend.call_recorder(lambda message, queue: None), - ) + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: { + "two_factor": two_factor_token_service, + None: two_factor_token_service, + }, + IUserService: { + None: user_service, + }, + }[interface][name] - pyramid_request.set_property( - lambda r: str(uuid.uuid4()), name="unauthenticated_userid" - ) - pyramid_request.session.record_auth_timestamp = pretend.call_recorder( - lambda *args: None - ) - pyramid_request.session.record_password_timestamp = lambda timestamp: None + db_request.method = "POST" form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), recovery_code_value=pretend.stub(data="recovery-code"), ) - form_class = pretend.call_recorder(lambda d, user_service, **kw: form_obj) - pyramid_request.route_path = pretend.call_recorder( - lambda a: "/account/two-factor" + form_class = pretend.call_recorder(lambda d, **kw: form_obj) + db_request.route_path = pretend.call_recorder( + lambda a: "/account/confirm-login" ) - pyramid_request.params = pretend.stub( + db_request.params = pretend.stub( get=pretend.call_recorder(lambda k: query_params.get(k)) ) - result = views.recovery_code(pyramid_request, _form_class=form_class) - token_expected_data = {"userid": str(1)} - if redirect_url: - token_expected_data["redirect_to"] = redirect_url + UserUniqueLoginFactory.create( + user=user, + ip_address=db_request.remote_addr, + status=UniqueLoginStatus.PENDING, + ) - assert isinstance(result, HTTPSeeOther) - assert result.headers["Set-Cookie"].startswith("user_id__insecure=") + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_unrecognized_login_email", send_email) - assert remember.calls == [pretend.call(pyramid_request, str(1))] - assert pyramid_request.session.invalidate.calls == [pretend.call()] - assert pyramid_request.session.new_csrf_token.calls == [pretend.call()] - assert user.record_event.calls == [ - pretend.call( - tag=EventTag.Account.LoginSuccess, - request=pyramid_request, - additional={ - "two_factor_method": "recovery-code", - "two_factor_label": None, - }, - ), - pretend.call( - tag=EventTag.Account.RecoveryCodesUsed, - request=pyramid_request, - ), - ] - assert pyramid_request.session.flash.calls == [ - pretend.call( - "Recovery code accepted. The supplied code cannot be used again.", - queue="success", - ) - ] - assert pyramid_request.session.record_auth_timestamp.calls == [pretend.call()] + result = views.recovery_code(db_request, _form_class=form_class) + + assert isinstance(result, HTTPSeeOther) + assert result.headers["Location"] == "/account/confirm-login" + assert send_email.calls == [] def test_recovery_code_form_invalid(self): token_data = {"userid": 1} @@ -1677,23 +2384,23 @@ def test_get(self, db_request): result = views.register(db_request, _form_class=form) assert result["form"] is form_inst - def test_redirect_authenticated_user(self): - pyramid_request = pretend.stub(user=pretend.stub()) + def test_redirect_authenticated_user(self, pyramid_request): + pyramid_request.user = pretend.stub() pyramid_request.route_path = pretend.call_recorder(lambda a: "/the-redirect") result = views.register(pyramid_request) assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "/the-redirect" - def test_register_honeypot(self, pyramid_request, monkeypatch): - pyramid_request.method = "POST" + def test_register_honeypot(self, db_request, monkeypatch): + db_request.method = "POST" create_user = pretend.call_recorder(lambda *args, **kwargs: None) add_email = pretend.call_recorder(lambda *args, **kwargs: None) - pyramid_request.route_path = pretend.call_recorder(lambda name: "/") - pyramid_request.POST = {"confirm_form": "fuzzywuzzy@bears.com"} + db_request.route_path = pretend.call_recorder(lambda name: "/") + db_request.POST = {"confirm_form": "fuzzywuzzy@bears.com"} send_email = pretend.call_recorder(lambda *a: None) monkeypatch.setattr(views, "send_email_verification_email", send_email) - result = views.register(pyramid_request) + result = views.register(db_request) assert isinstance(result, HTTPSeeOther) assert result.headers["Location"] == "/" @@ -1705,10 +2412,8 @@ def test_register_redirect(self, db_request, monkeypatch): db_request.method = "POST" record_event = pretend.call_recorder(lambda *a, **kw: None) - user = pretend.stub( - id=pretend.stub(), - record_event=record_event, - ) + user = UserFactory.create() + user.record_event = record_event email = pretend.stub() create_user = pretend.call_recorder(lambda *args, **kwargs: user) add_email = pretend.call_recorder(lambda *args, **kwargs: email) @@ -5102,3 +5807,257 @@ def test_delete_pending_oidc_publisher( ) ] assert db_request.db.query(publisher_class).all() == [] + + +class TestConfirmLogin: + def test_already_logged_in(self, pyramid_request): + pyramid_request.user = UserFactory.create() + pyramid_request.route_path = pretend.call_recorder(lambda route: f"/{route}") + result = views.confirm_login(pyramid_request) + assert isinstance(result, HTTPSeeOther) + assert result.location == "/index" + assert pyramid_request.route_path.calls == [pretend.call("index")] + + def test_no_token(self, pyramid_request): + pyramid_request.user = None + pyramid_request.params = {} + result = views.confirm_login(pyramid_request) + assert result == {} + + @pytest.mark.parametrize( + ("exception", "message"), + [ + (TokenInvalid, "Invalid token: please try to login again"), + (TokenExpired, "Expired token: please try to login again"), + (TokenMissing, "Invalid token: no token supplied"), + ], + ) + def test_token_error(self, pyramid_request, exception, message): + pyramid_request.user = None + pyramid_request.params = {"token": "foo"} + token_service = pretend.stub(loads=pretend.raiser(exception)) + user_service = pretend.stub() + pyramid_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + pyramid_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + pyramid_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(pyramid_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert pyramid_request.session.flash.calls == [ + pretend.call(message, queue="error") + ] + + def test_invalid_action(self, pyramid_request): + pyramid_request.user = None + pyramid_request.params = {"token": "foo"} + token_data = {"action": "wrong-action"} + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub() + pyramid_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + pyramid_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + pyramid_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(pyramid_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert pyramid_request.session.flash.calls == [ + pretend.call("Invalid token: not a login confirmation token", queue="error") + ] + + def test_user_not_found(self, pyramid_request): + pyramid_request.user = None + pyramid_request.params = {"token": "foo"} + token_data = { + "action": "login-confirmation", + "user.id": str(uuid.uuid4()), + } + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: None)) + + pyramid_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + pyramid_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + pyramid_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(pyramid_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert pyramid_request.session.flash.calls == [ + pretend.call("Invalid token: user not found", queue="error") + ] + + def test_user_logged_in_since_naive_datetime(self, db_request): + user = UserFactory.create(last_login=datetime.datetime.now(datetime.UTC)) + db_request.user = None + db_request.params = {"token": "foo"} + token_data = { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": (user.last_login - datetime.timedelta(seconds=1)) + .replace(tzinfo=None) + .isoformat(), + } + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: user)) + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + db_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(db_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert db_request.session.flash.calls == [ + pretend.call( + "Invalid token: user has logged in since this token was requested", + queue="error", + ) + ] + + def test_user_logged_in_since(self, db_request): + user = UserFactory.create(last_login=datetime.datetime.now(datetime.UTC)) + db_request.user = None + db_request.params = {"token": "foo"} + token_data = { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": ( + user.last_login - datetime.timedelta(seconds=1) + ).isoformat(), + } + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: user)) + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + db_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(db_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert db_request.session.flash.calls == [ + pretend.call( + "Invalid token: user has logged in since this token was requested", + queue="error", + ) + ] + + def test_unique_login_not_found(self, db_request): + user = UserFactory.create(last_login=datetime.datetime.now(datetime.UTC)) + db_request.user = None + db_request.params = {"token": "foo"} + token_data = { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": user.last_login.isoformat(), + "unique_login_id": str(uuid.uuid4()), + } + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: user)) + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + db_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(db_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert db_request.session.flash.calls == [ + pretend.call("Invalid login attempt.", queue="error") + ] + + def test_ip_address_mismatch(self, db_request): + user = UserFactory.create(last_login=datetime.datetime.now(datetime.UTC)) + unique_login = UserUniqueLoginFactory.create(user=user, ip_address="1.1.1.1") + db_request.user = None + db_request.params = {"token": "foo"} + token_data = { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": user.last_login.isoformat(), + "unique_login_id": unique_login.id, + } + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: user)) + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + db_request.session.flash = pretend.call_recorder(lambda *a, **kw: None) + db_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(db_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/accounts.login" + assert db_request.session.flash.calls == [ + pretend.call( + "Device details didn't match, please try again from the device you " + "originally used to log in.", + queue="error", + ) + ] + + def test_success(self, monkeypatch, db_request): + user = UserFactory.create(last_login=datetime.datetime.now(datetime.UTC)) + unique_login = UserUniqueLoginFactory.create( + user=user, ip_address=db_request.remote_addr + ) + db_request.user = None + db_request.params = {"token": "foo"} + + token_data = { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": user.last_login.isoformat(), + "unique_login_id": str(unique_login.id), + } + token_service = pretend.stub(loads=pretend.call_recorder(lambda t: token_data)) + user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: user)) + + db_request.find_service = lambda interface, name=None, **kwargs: { + ITokenService: {"confirm_login": token_service}, + IUserService: {None: user_service}, + }[interface][name] + + _login_user = pretend.call_recorder(lambda request, userid: [("foo", "bar")]) + monkeypatch.setattr(views, "_login_user", _login_user) + _set_userid_insecure_cookie = pretend.call_recorder(lambda resp, userid: None) + monkeypatch.setattr( + views, "_set_userid_insecure_cookie", _set_userid_insecure_cookie + ) + + db_request.route_path = pretend.call_recorder(lambda r: f"/{r}") + + result = views.confirm_login(db_request) + + assert isinstance(result, HTTPSeeOther) + assert result.location == "/manage.projects" + assert unique_login.status == UniqueLoginStatus.CONFIRMED + assert _login_user.calls == [pretend.call(db_request, user.id)] + assert _set_userid_insecure_cookie.calls == [pretend.call(result, user.id)] diff --git a/tests/unit/email/test_init.py b/tests/unit/email/test_init.py index 8c9fc824ade3..98b2270509ee 100644 --- a/tests/unit/email/test_init.py +++ b/tests/unit/email/test_init.py @@ -6254,3 +6254,104 @@ def test_user_terms_of_service_updated( }, ) ] + + +class TestSendUnrecognizedLoginEmail: + def test_send_unrecognized_login_email( + self, + pyramid_request, + pyramid_config, + monkeypatch, + ): + stub_user = pretend.stub( + id="id", + username="username", + name="", + email="email@example.com", + primary_email=pretend.stub(email="email@example.com", verified=True), + ) + ip_address = "127.0.0.1" + user_agent = "Test Browser" + token = "test-token" + + subject_renderer = pyramid_config.testing_add_renderer( + "email/unrecognized-login/subject.txt" + ) + subject_renderer.string_response = "Email Subject" + body_renderer = pyramid_config.testing_add_renderer( + "email/unrecognized-login/body.txt" + ) + body_renderer.string_response = "Email Body" + html_renderer = pyramid_config.testing_add_renderer( + "email/unrecognized-login/body.html" + ) + html_renderer.string_response = "Email HTML Body" + + send_email = pretend.stub( + delay=pretend.call_recorder(lambda *args, **kwargs: None) + ) + pyramid_request.task = pretend.call_recorder(lambda *args, **kwargs: send_email) + monkeypatch.setattr(email, "send_email", send_email) + + pyramid_request.db = pretend.stub( + query=lambda a: pretend.stub( + filter=lambda *a: pretend.stub( + one=lambda: pretend.stub(user_id=stub_user.id) + ) + ), + ) + pyramid_request.user = stub_user + pyramid_request.registry.settings = {"mail.sender": "noreply@example.com"} + + result = email.send_unrecognized_login_email( + pyramid_request, + stub_user, + ip_address=ip_address, + user_agent=user_agent, + token=token, + ) + + assert result == { + "username": stub_user.username, + "ip_address": ip_address, + "user_agent": user_agent, + "token": token, + } + subject_renderer.assert_() + body_renderer.assert_( + username=stub_user.username, + ip_address=ip_address, + user_agent=user_agent, + token=token, + ) + html_renderer.assert_( + username=stub_user.username, + ip_address=ip_address, + user_agent=user_agent, + token=token, + ) + assert pyramid_request.task.calls == [pretend.call(send_email)] + assert send_email.delay.calls == [ + pretend.call( + f"{stub_user.username} <{stub_user.email}>", + { + "sender": None, + "subject": "Email Subject", + "body_text": "Email Body", + "body_html": ( + "\n\n" + "

Email HTML Body

\n\n" + ), + }, + { + "tag": "account:email:sent", + "user_id": stub_user.id, + "additional": { + "from_": "noreply@example.com", + "to": stub_user.email, + "subject": "Email Subject", + "redact_ip": False, + }, + }, + ) + ] diff --git a/tests/unit/test_routes.py b/tests/unit/test_routes.py index 480fe8f2660e..f5d46c909d3a 100644 --- a/tests/unit/test_routes.py +++ b/tests/unit/test_routes.py @@ -189,6 +189,9 @@ def add_redirect_rule(*args, **kwargs): pretend.call( "accounts.reset-password", "/account/reset-password/", domain=warehouse ), + pretend.call( + "accounts.confirm-login", "/account/confirm-login/", domain=warehouse + ), pretend.call( "accounts.verify-email", "/account/verify-email/", domain=warehouse ), diff --git a/warehouse/accounts/__init__.py b/warehouse/accounts/__init__.py index 0765f3bb9058..ff9b74477012 100644 --- a/warehouse/accounts/__init__.py +++ b/warehouse/accounts/__init__.py @@ -103,6 +103,9 @@ def includeme(config): config.register_service_factory( TokenServiceFactory(name="two_factor"), ITokenService, name="two_factor" ) + config.register_service_factory( + TokenServiceFactory(name="confirm_login"), ITokenService, name="confirm_login" + ) config.register_service_factory( TokenServiceFactory(name="remember_device"), ITokenService, diff --git a/warehouse/accounts/models.py b/warehouse/accounts/models.py index e762d64d28c6..0834d8c8ba1d 100644 --- a/warehouse/accounts/models.py +++ b/warehouse/accounts/models.py @@ -11,6 +11,7 @@ from pyramid.authorization import Allow, Authenticated from sqlalchemy import ( CheckConstraint, + Enum, ForeignKey, Index, LargeBinary, @@ -20,7 +21,7 @@ select, sql, ) -from sqlalchemy.dialects.postgresql import ARRAY, CITEXT, UUID as PG_UUID +from sqlalchemy.dialects.postgresql import ARRAY, CITEXT, JSONB, UUID as PG_UUID from sqlalchemy.exc import NoResultFound from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import Mapped, mapped_column @@ -120,6 +121,10 @@ class User(SitemapMixin, HasObservers, HasObservations, HasEvents, db.Model): order_by="Macaroon.created.desc()", ) + unique_logins: Mapped[list[UserUniqueLogin]] = orm.relationship( + back_populates="user", cascade="all, delete-orphan", lazy=True + ) + role_invitations: Mapped[list[RoleInvitation]] = orm.relationship( "RoleInvitation", back_populates="user", @@ -475,3 +480,49 @@ class ProhibitedUserName(db.Model): ) prohibited_by: Mapped[User] = orm.relationship(User) comment: Mapped[str] = mapped_column(server_default="") + + +class UniqueLoginStatus(str, enum.Enum): + PENDING = "pending" + CONFIRMED = "confirmed" + + +class UserUniqueLogin(db.Model): + __tablename__ = "user_unique_logins" + __table_args__ = ( + UniqueConstraint( + "user_id", "ip_address", name="_user_unique_logins_user_id_ip_address_uc" + ), + Index( + "user_unique_logins_user_id_ip_address_idx", + "user_id", + "ip_address", + unique=True, + ), + ) + + user_id: Mapped[UUID] = mapped_column( + PG_UUID(as_uuid=True), + ForeignKey("users.id", onupdate="CASCADE", ondelete="CASCADE"), + nullable=False, + index=True, + ) + user: Mapped[User] = orm.relationship(back_populates="unique_logins") + + ip_address: Mapped[str] = mapped_column(String, nullable=False) + created: Mapped[datetime_now] + last_used: Mapped[datetime_now] + device_information: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + status: Mapped[UniqueLoginStatus] = mapped_column( + Enum(UniqueLoginStatus, values_callable=lambda x: [e.value for e in x]), + nullable=False, + default=UniqueLoginStatus.PENDING, + server_default=UniqueLoginStatus.PENDING.value, + ) + + def __repr__(self): + return ( + f"" + ) diff --git a/warehouse/accounts/views.py b/warehouse/accounts/views.py index 736a6ee88ecb..f942e587d946 100644 --- a/warehouse/accounts/views.py +++ b/warehouse/accounts/views.py @@ -8,6 +8,7 @@ import humanize import pytz +from linehaul.ua import parser as linehaul_user_agent_parser from more_itertools import first_true from pyramid.httpexceptions import ( HTTPBadRequest, @@ -22,6 +23,7 @@ from pyramid.view import view_config, view_defaults from sqlalchemy import and_, func, select from sqlalchemy.exc import IntegrityError, NoResultFound +from ua_parser import user_agent_parser from webauthn.helpers import bytes_to_base64url from webob.multidict import MultiDict @@ -49,7 +51,13 @@ TooManyFailedLogins, TooManyPasswordResetRequests, ) -from warehouse.accounts.models import Email, TermsOfServiceEngagement, User +from warehouse.accounts.models import ( + Email, + TermsOfServiceEngagement, + UniqueLoginStatus, + User, + UserUniqueLogin, +) from warehouse.accounts.utils import update_email_domain_status from warehouse.admin.flags import AdminFlagValue from warehouse.authnz import Permissions @@ -67,7 +75,9 @@ send_password_reset_email, send_password_reset_unverified_email, send_recovery_code_reminder_email, + send_unrecognized_login_email, ) +from warehouse.events.models import UserAgentInfo from warehouse.events.tags import EventTag from warehouse.metrics.interfaces import IMetricsService from warehouse.oidc.forms import ( @@ -101,6 +111,7 @@ USER_ID_INSECURE_COOKIE = "user_id__insecure" REMEMBER_DEVICE_COOKIE = "remember_device" +PHISHABLE_METHODS = {"totp", "recovery-code"} @view_config(context=TooManyFailedLogins, has_translations=True) @@ -401,20 +412,105 @@ def two_factor_and_totp_validate(request, _form_class=TOTPAuthenticationForm): if request.method == "POST": form = two_factor_state["totp_form"] if form.validate(): - two_factor_method = "totp" - _login_user(request, userid, two_factor_method, two_factor_label="totp") - user_service.update_user(userid, last_totp_value=form.totp_value.data) + user = user_service.get_user(userid) - resp = HTTPSeeOther(redirect_to) - _set_userid_insecure_cookie(resp, userid) + unique_login = ( + request.db.query(UserUniqueLogin) + .filter( + UserUniqueLogin.user_id == userid, + UserUniqueLogin.ip_address == request.remote_addr, + ) + .one_or_none() + ) - if not two_factor_state.get("has_recovery_codes", False): - send_recovery_code_reminder_email(request, request.user) + if unique_login: + if unique_login.status == UniqueLoginStatus.CONFIRMED: + # We've seen this device before for this user and they've + # confirmed it, log in the user + two_factor_method = "totp" + _login_user( + request, userid, two_factor_method, two_factor_label="totp" + ) + user_service.update_user( + userid, last_totp_value=form.totp_value.data + ) + + resp = HTTPSeeOther(redirect_to) + _set_userid_insecure_cookie(resp, userid) + + if not two_factor_state.get("has_recovery_codes", False): + send_recovery_code_reminder_email(request, request.user) + + if form.remember_device.data: + _remember_device(request, resp, userid, two_factor_method) + + return resp + else: + # We've seen this device before for this user but they haven't + # confirmed it, don't send another email, just send them to + # the generic page + return HTTPSeeOther(request.route_path("accounts.confirm-login")) - if form.remember_device.data: - _remember_device(request, resp, userid, two_factor_method) + else: + # We haven't seen this device before from this user or they + # haven't confirmed it, make them confirm it + unique_login = UserUniqueLogin( + user_id=userid, + ip_address=request.remote_addr, + status=UniqueLoginStatus.PENDING, + ) + request.db.add(unique_login) + request.db.flush() # To get the ID for the token - return resp + token_service = request.find_service( + ITokenService, name="confirm_login" + ) + token = token_service.dumps( + { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": str( + user.last_login + or datetime.datetime.min.replace(tzinfo=pytz.UTC) + ), + "unique_login_id": unique_login.id, + } + ) + + # Get User Agent Information + user_agent_info_data = {} + if user_agent_str := request.headers.get("User-Agent"): + user_agent_info_data = { + # A hack to get it to fall back to the raw user agent + "installer": user_agent_str, + } + try: + parsed = linehaul_user_agent_parser.parse(user_agent_str) + if ( + parsed + and parsed.installer + and parsed.installer.name == "Browser" + ): + parsed_ua = user_agent_parser.Parse(user_agent_str) + user_agent_info_data = { + "installer": "Browser", + "device": parsed_ua["device"]["family"], + "os": parsed_ua["os"]["family"], + "user_agent": parsed_ua["user_agent"]["family"], + } + except linehaul_user_agent_parser.UnknownUserAgentError: + pass # Fallback to raw user-agent string + + user_agent_info = UserAgentInfo(**user_agent_info_data) + + send_unrecognized_login_email( + request, + user, + ip_address=request.remote_addr, + user_agent=user_agent_info.display(), + token=token, + ) + return HTTPSeeOther(request.route_path("accounts.confirm-login")) else: form.totp_value.data = "" @@ -590,6 +686,7 @@ def recovery_code(request, _form_class=RecoveryCodeAuthenticationForm): return HTTPSeeOther(request.route_path("accounts.login")) userid = two_factor_data.get("userid") + redirect_to = two_factor_data.get("redirect_to") user_service = request.find_service(IUserService, context=None) @@ -599,25 +696,105 @@ def recovery_code(request, _form_class=RecoveryCodeAuthenticationForm): if request.method == "POST": if form.validate(): - _login_user(request, userid, two_factor_method="recovery-code") - - resp = HTTPSeeOther(request.route_path("manage.account")) - _set_userid_insecure_cookie(resp, userid) - user = user_service.get_user(userid) - user.record_event( - tag=EventTag.Account.RecoveryCodesUsed, - request=request, - ) - request.session.flash( - request._( - "Recovery code accepted. The supplied code cannot be used again." - ), - queue="success", + unique_login = ( + request.db.query(UserUniqueLogin) + .filter( + UserUniqueLogin.user_id == userid, + UserUniqueLogin.ip_address == request.remote_addr, + ) + .one_or_none() ) - return resp + if unique_login: + if unique_login.status == UniqueLoginStatus.CONFIRMED: + # We've seen this device before for this user and they've + # confirmed it, log in the user + _login_user(request, userid, two_factor_method="recovery-code") + + user.record_event( + tag=EventTag.Account.RecoveryCodesUsed, + request=request, + ) + + request.session.flash( + request._( + "Recovery code accepted. " + "The supplied code cannot be used again." + ), + queue="success", + ) + + resp = HTTPSeeOther(redirect_to) + _set_userid_insecure_cookie(resp, userid) + + return resp + else: + # We've seen this device before for this user but they haven't + # confirmed it, don't send another email, just send them to + # the generic page + return HTTPSeeOther(request.route_path("accounts.confirm-login")) + else: + # We haven't seen this device before from this user or they + # haven't confirmed it, make them confirm it + unique_login = UserUniqueLogin( + user_id=userid, + ip_address=request.remote_addr, + status=UniqueLoginStatus.PENDING, + ) + request.db.add(unique_login) + request.db.flush() # To get the ID for the token + + token_service = request.find_service( + ITokenService, name="confirm_login" + ) + token = token_service.dumps( + { + "action": "login-confirmation", + "user.id": str(user.id), + "user.last_login": str( + user.last_login + or datetime.datetime.min.replace(tzinfo=pytz.UTC) + ), + "unique_login_id": unique_login.id, + } + ) + + # Get User Agent Information + user_agent_info_data = {} + if user_agent_str := request.headers.get("User-Agent"): + user_agent_info_data = { + # A hack to get it to fall back to the raw user agent + "installer": user_agent_str, + } + try: + parsed = linehaul_user_agent_parser.parse(user_agent_str) + if ( + parsed + and parsed.installer + and parsed.installer.name == "Browser" + ): + parsed_ua = user_agent_parser.Parse(user_agent_str) + user_agent_info_data = { + "installer": "Browser", + "device": parsed_ua["device"]["family"], + "os": parsed_ua["os"]["family"], + "user_agent": parsed_ua["user_agent"]["family"], + } + except linehaul_user_agent_parser.UnknownUserAgentError: + pass # Fallback to raw user-agent string + + user_agent_info = UserAgentInfo(**user_agent_info_data) + + send_unrecognized_login_email( + request, + user, + ip_address=request.remote_addr, + user_agent=user_agent_info.display(), + token=token, + ) + return HTTPSeeOther(request.route_path("accounts.confirm-login")) else: form.recovery_code_value.data = "" @@ -955,6 +1132,92 @@ def _error(message): return {"form": form} +@view_config( + route_name="accounts.confirm-login", + renderer="warehouse:templates/accounts/unrecognized-device.html", + uses_session=True, + require_csrf=True, + require_methods=False, + has_translations=True, +) +def confirm_login(request): + if request.user is not None: + return HTTPSeeOther(request.route_path("index")) + + if not request.params.get("token"): + # Show a generic page for when a non-logged-in user lands here without a token + return {} + + user_service = request.find_service(IUserService, context=None) + token_service = request.find_service(ITokenService, name="confirm_login") + + def _error(message): + request.session.flash(message, queue="error") + return HTTPSeeOther(request.route_path("accounts.login")) + + try: + token = request.params.get("token") + data = token_service.loads(token) + except TokenExpired: + return _error(request._("Expired token: please try to login again")) + except TokenInvalid: + return _error(request._("Invalid token: please try to login again")) + except TokenMissing: + return _error(request._("Invalid token: no token supplied")) + + # Check whether this token is being used correctly + if data.get("action") != "login-confirmation": + return _error(request._("Invalid token: not a login confirmation token")) + + # Check whether a user with the given user ID exists + user = user_service.get_user(uuid.UUID(data.get("user.id"))) + if user is None: + return _error(request._("Invalid token: user not found")) + + # Check whether the user has logged in since the token was created + last_login = datetime.datetime.fromisoformat(data.get("user.last_login")) + # Before updating itsdangerous to 2.x the last_login was naive, + # now it's localized to UTC + if not last_login.tzinfo: + last_login = pytz.UTC.localize(last_login) + if user.last_login and user.last_login > last_login: + return _error( + request._( + "Invalid token: user has logged in since this token was requested" + ) + ) + + unique_login_id = data.get("unique_login_id") + unique_login = ( + request.db.query(UserUniqueLogin) + .filter(UserUniqueLogin.id == unique_login_id) + .one_or_none() + ) + + if unique_login is None: + return _error(request._("Invalid login attempt.")) + + if unique_login.ip_address != request.remote_addr: + return _error( + request._( + "Device details didn't match, please try again from the device " + "you originally used to log in." + ) + ) + + unique_login.status = UniqueLoginStatus.CONFIRMED + + headers = _login_user(request, user.id) + resp = HTTPSeeOther(request.route_path("manage.projects"), headers=dict(headers)) + _set_userid_insecure_cookie(resp, user.id) + request.session.flash( + request._("Your login has been confirmed and this device is now recognized."), + queue="success", + ) + + return resp + + @view_config( route_name="accounts.verify-email", uses_session=True, @@ -1432,7 +1695,8 @@ def _login_user(request, userid, two_factor_method=None, two_factor_label=None): # Whenever we log in the user, we want to update their user so that it # records when the last login was. user_service = request.find_service(IUserService, context=None) - user_service.update_user(userid, last_login=datetime.datetime.now(datetime.UTC)) + now = datetime.datetime.now(datetime.UTC) + user_service.update_user(userid, last_login=now) user = user_service.get_user(userid) user.record_event( tag=EventTag.Account.LoginSuccess, @@ -1442,6 +1706,37 @@ def _login_user(request, userid, two_factor_method=None, two_factor_label=None): "two_factor_label": two_factor_label, }, ) + + # Create a new UserUniqueLogin if one doesn't already exist for this IP + unique_login = ( + request.db.query(UserUniqueLogin) + .filter( + UserUniqueLogin.user_id == userid, + UserUniqueLogin.ip_address == request.remote_addr, + ) + .one_or_none() + ) + if unique_login: + unique_login.last_used = now + + if unique_login is None and two_factor_method not in PHISHABLE_METHODS: + # We haven't seen this login before. Create a new one and mark it as confirmed + # if this is non-phishable. + unique_login = UserUniqueLogin( + user_id=userid, + ip_address=request.remote_addr, + status=UniqueLoginStatus.CONFIRMED, + ) + request.db.add(unique_login) + if ( + unique_login is not None + and unique_login.status == UniqueLoginStatus.PENDING + and two_factor_method not in PHISHABLE_METHODS + ): + # The user had a pending login, but has since logged in with a non-phishable + # method, so mark it as confirmed. + unique_login.status = UniqueLoginStatus.CONFIRMED + request.session.record_auth_timestamp() request.session.record_password_timestamp( user_service.get_password_timestamp(userid) diff --git a/warehouse/admin/templates/admin/users/detail.html b/warehouse/admin/templates/admin/users/detail.html index a13e40673350..08afbf23cfc3 100644 --- a/warehouse/admin/templates/admin/users/detail.html +++ b/warehouse/admin/templates/admin/users/detail.html @@ -988,6 +988,41 @@

Pending OpenID Connect Publishers

+
+
+

Unique logins

+
+ +
+ {% if user.unique_logins %} +
+ + + + + + + + + + + {% for login in user.unique_logins %} + + + + + + + {% endfor %} + +
CreatedIP addressStatusDevice Information
{{ login.created }}{{ login.ip_address }}{{ login.status.value }}{{ login.device_information }}
+
+ {% else %} + No known logins. + {% endif %} +
+
+

Account activity

diff --git a/warehouse/config.py b/warehouse/config.py index da2923e96ac1..aa677803f04a 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -410,6 +410,7 @@ def configure(settings=None): maybe_set(settings, "token.email.secret", "TOKEN_EMAIL_SECRET") maybe_set(settings, "token.two_factor.secret", "TOKEN_TWO_FACTOR_SECRET") maybe_set(settings, "token.remember_device.secret", "TOKEN_REMEMBER_DEVICE_SECRET") + maybe_set(settings, "token.confirm_login.secret", "TOKEN_CONFIRM_LOGIN_SECRET") maybe_set_redis(settings, "warehouse.xmlrpc.cache.url", "REDIS_URL", db=4) maybe_set( settings, diff --git a/warehouse/email/__init__.py b/warehouse/email/__init__.py index 43dea91c09a5..12cc9a6daafc 100644 --- a/warehouse/email/__init__.py +++ b/warehouse/email/__init__.py @@ -983,6 +983,16 @@ def send_recovery_code_reminder_email(request, user): return {"username": user.username} +@_email("unrecognized-login") +def send_unrecognized_login_email(request, user, *, ip_address, user_agent, token): + return { + "username": user.username, + "ip_address": ip_address, + "user_agent": user_agent, + "token": token, + } + + @_email("trusted-publisher-added") def send_trusted_publisher_added_email(request, user, project_name, publisher): # We use the request's user, since they're the one triggering the action. diff --git a/warehouse/locale/messages.pot b/warehouse/locale/messages.pot index 9f21b9d24ca4..cfe3a4c32171 100644 --- a/warehouse/locale/messages.pot +++ b/warehouse/locale/messages.pot @@ -122,21 +122,21 @@ msgstr "" msgid "The username isn't valid. Try again." msgstr "" -#: warehouse/accounts/views.py:110 +#: warehouse/accounts/views.py:121 #, python-brace-format msgid "" "There have been too many unsuccessful login attempts. You have been " "locked out for {}. Please try again later." msgstr "" -#: warehouse/accounts/views.py:131 +#: warehouse/accounts/views.py:142 #, python-brace-format msgid "" "Too many emails have been added to this account without verifying them. " "Check your inbox and follow the verification links. (IP: ${ip})" msgstr "" -#: warehouse/accounts/views.py:143 +#: warehouse/accounts/views.py:154 #, python-brace-format msgid "" "Too many password resets have been requested for this account without " @@ -144,185 +144,212 @@ msgid "" " ${ip})" msgstr "" -#: warehouse/accounts/views.py:375 warehouse/accounts/views.py:439 -#: warehouse/accounts/views.py:441 warehouse/accounts/views.py:470 -#: warehouse/accounts/views.py:472 warehouse/accounts/views.py:588 +#: warehouse/accounts/views.py:386 warehouse/accounts/views.py:535 +#: warehouse/accounts/views.py:537 warehouse/accounts/views.py:566 +#: warehouse/accounts/views.py:568 warehouse/accounts/views.py:684 msgid "Invalid or expired two factor login." msgstr "" -#: warehouse/accounts/views.py:433 +#: warehouse/accounts/views.py:529 msgid "Already authenticated" msgstr "" -#: warehouse/accounts/views.py:507 +#: warehouse/accounts/views.py:603 msgid "Successful WebAuthn assertion" msgstr "" -#: warehouse/accounts/views.py:615 warehouse/manage/views/__init__.py:871 +#: warehouse/accounts/views.py:723 warehouse/manage/views/__init__.py:871 msgid "Recovery code accepted. The supplied code cannot be used again." msgstr "" -#: warehouse/accounts/views.py:707 +#: warehouse/accounts/views.py:884 msgid "" "New user registration temporarily disabled. See https://pypi.org/help" "#admin-intervention for details." msgstr "" -#: warehouse/accounts/views.py:876 +#: warehouse/accounts/views.py:1053 msgid "Expired token: request a new password reset link" msgstr "" -#: warehouse/accounts/views.py:878 +#: warehouse/accounts/views.py:1055 msgid "Invalid token: request a new password reset link" msgstr "" -#: warehouse/accounts/views.py:880 warehouse/accounts/views.py:981 -#: warehouse/accounts/views.py:1087 warehouse/accounts/views.py:1256 +#: warehouse/accounts/views.py:1057 warehouse/accounts/views.py:1166 +#: warehouse/accounts/views.py:1244 warehouse/accounts/views.py:1350 +#: warehouse/accounts/views.py:1519 msgid "Invalid token: no token supplied" msgstr "" -#: warehouse/accounts/views.py:884 +#: warehouse/accounts/views.py:1061 msgid "Invalid token: not a password reset token" msgstr "" -#: warehouse/accounts/views.py:889 +#: warehouse/accounts/views.py:1066 warehouse/accounts/views.py:1175 msgid "Invalid token: user not found" msgstr "" -#: warehouse/accounts/views.py:900 +#: warehouse/accounts/views.py:1077 warehouse/accounts/views.py:1186 msgid "Invalid token: user has logged in since this token was requested" msgstr "" -#: warehouse/accounts/views.py:918 +#: warehouse/accounts/views.py:1095 msgid "" "Invalid token: password has already been changed since this token was " "requested" msgstr "" -#: warehouse/accounts/views.py:949 +#: warehouse/accounts/views.py:1126 msgid "You have reset your password" msgstr "" -#: warehouse/accounts/views.py:977 +#: warehouse/accounts/views.py:1162 +msgid "Expired token: please try to login again" +msgstr "" + +#: warehouse/accounts/views.py:1164 +msgid "Invalid token: please try to login again" +msgstr "" + +#: warehouse/accounts/views.py:1170 +msgid "Invalid token: not a login confirmation token" +msgstr "" + +#: warehouse/accounts/views.py:1198 +msgid "Invalid login attempt." +msgstr "" + +#: warehouse/accounts/views.py:1203 +msgid "" +"Device details didn't match, please try again from the device you " +"originally used to log in." +msgstr "" + +#: warehouse/accounts/views.py:1214 +msgid "Your login has been confirmed and this device is now recognized." +msgstr "" + +#: warehouse/accounts/views.py:1240 msgid "Expired token: request a new email verification link" msgstr "" -#: warehouse/accounts/views.py:979 +#: warehouse/accounts/views.py:1242 msgid "Invalid token: request a new email verification link" msgstr "" -#: warehouse/accounts/views.py:985 +#: warehouse/accounts/views.py:1248 msgid "Invalid token: not an email verification token" msgstr "" -#: warehouse/accounts/views.py:994 +#: warehouse/accounts/views.py:1257 msgid "Email not found" msgstr "" -#: warehouse/accounts/views.py:997 +#: warehouse/accounts/views.py:1260 msgid "Email already verified" msgstr "" -#: warehouse/accounts/views.py:1017 +#: warehouse/accounts/views.py:1280 msgid "You can now set this email as your primary address" msgstr "" -#: warehouse/accounts/views.py:1020 +#: warehouse/accounts/views.py:1283 msgid "This is your primary address" msgstr "" -#: warehouse/accounts/views.py:1026 +#: warehouse/accounts/views.py:1289 #, python-brace-format msgid "Email address ${email_address} verified. ${confirm_message}." msgstr "" -#: warehouse/accounts/views.py:1083 +#: warehouse/accounts/views.py:1346 msgid "Expired token: request a new organization invitation" msgstr "" -#: warehouse/accounts/views.py:1085 +#: warehouse/accounts/views.py:1348 msgid "Invalid token: request a new organization invitation" msgstr "" -#: warehouse/accounts/views.py:1091 +#: warehouse/accounts/views.py:1354 msgid "Invalid token: not an organization invitation token" msgstr "" -#: warehouse/accounts/views.py:1095 +#: warehouse/accounts/views.py:1358 msgid "Organization invitation is not valid." msgstr "" -#: warehouse/accounts/views.py:1104 +#: warehouse/accounts/views.py:1367 msgid "Organization invitation no longer exists." msgstr "" -#: warehouse/accounts/views.py:1156 +#: warehouse/accounts/views.py:1419 #, python-brace-format msgid "Invitation for '${organization_name}' is declined." msgstr "" -#: warehouse/accounts/views.py:1219 +#: warehouse/accounts/views.py:1482 #, python-brace-format msgid "You are now ${role} of the '${organization_name}' organization." msgstr "" -#: warehouse/accounts/views.py:1252 +#: warehouse/accounts/views.py:1515 msgid "Expired token: request a new project role invitation" msgstr "" -#: warehouse/accounts/views.py:1254 +#: warehouse/accounts/views.py:1517 msgid "Invalid token: request a new project role invitation" msgstr "" -#: warehouse/accounts/views.py:1260 +#: warehouse/accounts/views.py:1523 msgid "Invalid token: not a collaboration invitation token" msgstr "" -#: warehouse/accounts/views.py:1264 +#: warehouse/accounts/views.py:1527 msgid "Role invitation is not valid." msgstr "" -#: warehouse/accounts/views.py:1279 +#: warehouse/accounts/views.py:1542 msgid "Role invitation no longer exists." msgstr "" -#: warehouse/accounts/views.py:1311 +#: warehouse/accounts/views.py:1574 #, python-brace-format msgid "Invitation for '${project_name}' is declined." msgstr "" -#: warehouse/accounts/views.py:1377 +#: warehouse/accounts/views.py:1640 #, python-brace-format msgid "You are now ${role} of the '${project_name}' project." msgstr "" -#: warehouse/accounts/views.py:1457 +#: warehouse/accounts/views.py:1752 #, python-brace-format msgid "Please review our updated Terms of Service." msgstr "" -#: warehouse/accounts/views.py:1669 warehouse/accounts/views.py:1922 +#: warehouse/accounts/views.py:1964 warehouse/accounts/views.py:2217 #: warehouse/manage/views/__init__.py:1409 msgid "" "Trusted publishing is temporarily disabled. See https://pypi.org/help" "#admin-intervention for details." msgstr "" -#: warehouse/accounts/views.py:1690 +#: warehouse/accounts/views.py:1985 msgid "disabled. See https://pypi.org/help#admin-intervention for details." msgstr "" -#: warehouse/accounts/views.py:1706 +#: warehouse/accounts/views.py:2001 msgid "" "You must have a verified email in order to register a pending trusted " "publisher. See https://pypi.org/help#openid-connect for details." msgstr "" -#: warehouse/accounts/views.py:1719 +#: warehouse/accounts/views.py:2014 msgid "You can't register more than 3 pending trusted publishers at once." msgstr "" -#: warehouse/accounts/views.py:1734 warehouse/manage/views/__init__.py:1590 +#: warehouse/accounts/views.py:2029 warehouse/manage/views/__init__.py:1590 #: warehouse/manage/views/__init__.py:1705 #: warehouse/manage/views/__init__.py:1819 #: warehouse/manage/views/__init__.py:1931 @@ -331,29 +358,29 @@ msgid "" "again later." msgstr "" -#: warehouse/accounts/views.py:1744 warehouse/manage/views/__init__.py:1603 +#: warehouse/accounts/views.py:2039 warehouse/manage/views/__init__.py:1603 #: warehouse/manage/views/__init__.py:1718 #: warehouse/manage/views/__init__.py:1832 #: warehouse/manage/views/__init__.py:1944 msgid "The trusted publisher could not be registered" msgstr "" -#: warehouse/accounts/views.py:1759 +#: warehouse/accounts/views.py:2054 msgid "" "This trusted publisher has already been registered. Please contact PyPI's" " admins if this wasn't intentional." msgstr "" -#: warehouse/accounts/views.py:1793 +#: warehouse/accounts/views.py:2088 msgid "Registered a new pending publisher to create " msgstr "" -#: warehouse/accounts/views.py:1935 warehouse/accounts/views.py:1948 -#: warehouse/accounts/views.py:1955 +#: warehouse/accounts/views.py:2230 warehouse/accounts/views.py:2243 +#: warehouse/accounts/views.py:2250 msgid "Invalid publisher ID" msgstr "" -#: warehouse/accounts/views.py:1962 +#: warehouse/accounts/views.py:2257 msgid "Removed trusted publisher for project " msgstr "" diff --git a/warehouse/migrations/versions/4c20f2342bba_add_useruniquelogin.py b/warehouse/migrations/versions/4c20f2342bba_add_useruniquelogin.py new file mode 100755 index 000000000000..e3bbc56d0a02 --- /dev/null +++ b/warehouse/migrations/versions/4c20f2342bba_add_useruniquelogin.py @@ -0,0 +1,81 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +Refactor UserUniqueLogin to use mapped_column + +Revision ID: 4c20f2342bba +Revises: a6994b8bed95 +Create Date: 2025-07-29 00:55:39.682180 +""" + +import sqlalchemy as sa + +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "4c20f2342bba" +down_revision = "a6994b8bed95" + + +def upgrade(): + sa.Enum("pending", "confirmed", name="uniqueloginstatus").create(op.get_bind()) + op.create_table( + "user_unique_logins", + sa.Column("user_id", sa.UUID(), nullable=False), + sa.Column("ip_address", sa.String(), nullable=False), + sa.Column( + "created", sa.DateTime(), server_default=sa.text("now()"), nullable=False + ), + sa.Column( + "last_used", + sa.DateTime(), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "device_information", postgresql.JSONB(astext_type=sa.Text()), nullable=True + ), + sa.Column( + "status", + postgresql.ENUM( + "pending", + "confirmed", + name="uniqueloginstatus", + create_type=False, + ), + server_default="pending", + nullable=False, + ), + sa.Column( + "id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False + ), + sa.ForeignKeyConstraint( + ["user_id"], ["users.id"], onupdate="CASCADE", ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "user_id", "ip_address", name="_user_unique_logins_user_id_ip_address_uc" + ), + ) + op.create_index( + op.f("ix_user_unique_logins_user_id"), + "user_unique_logins", + ["user_id"], + unique=False, + ) + op.create_index( + "user_unique_logins_user_id_ip_address_idx", + "user_unique_logins", + ["user_id", "ip_address"], + unique=True, + ) + + +def downgrade(): + op.drop_index( + op.f("ix_user_unique_logins_user_id"), table_name="user_unique_logins" + ) + op.drop_index( + "user_unique_logins_user_id_ip_address_idx", table_name="user_unique_logins" + ) + op.drop_table("user_unique_logins") + sa.Enum("pending", "confirmed", name="uniqueloginstatus").drop(op.get_bind()) diff --git a/warehouse/routes.py b/warehouse/routes.py index b4e79bc27dee..da5e5d2dce13 100644 --- a/warehouse/routes.py +++ b/warehouse/routes.py @@ -201,6 +201,9 @@ def includeme(config): config.add_route( "accounts.reset-password", "/account/reset-password/", domain=warehouse ) + config.add_route( + "accounts.confirm-login", "/account/confirm-login/", domain=warehouse + ) config.add_route( "accounts.verify-email", "/account/verify-email/", domain=warehouse ) diff --git a/warehouse/templates/accounts/unrecognized-device.html b/warehouse/templates/accounts/unrecognized-device.html new file mode 100644 index 000000000000..51817e33ea0c --- /dev/null +++ b/warehouse/templates/accounts/unrecognized-device.html @@ -0,0 +1,17 @@ +{# SPDX-License-Identifier: Apache-2.0 -#} +{% extends "base.html" %} +{% block title %}Unrecognized device{% endblock %} +{% block content %} +
+
+

Unrecognized device

+

We did not recognize this device. Please check your email for a login confirmation link.

+

+ You should have received an email from noreply@pypi.org with the subject line "Unrecognized login to your PyPI account". +

+

+ If you did not make this change, you can email admin@pypi.org to communicate with the PyPI administrators. +

+
+
+{% endblock %} diff --git a/warehouse/templates/email/unrecognized-login/body.html b/warehouse/templates/email/unrecognized-login/body.html new file mode 100644 index 000000000000..34de5d12acce --- /dev/null +++ b/warehouse/templates/email/unrecognized-login/body.html @@ -0,0 +1,26 @@ +{# SPDX-License-Identifier: Apache-2.0 -#} +{% extends "email/_base/body.html" %} +{% block content %} +

A login attempt was made from an unrecognized device.

+

To complete your login, please visit the following link:

+ {% set link = request.route_url('accounts.confirm-login', _query={'token': token}) %} +

+ {{ link }} +

+

This login attempt was made from:

+
    +
  • + IP address: {{ ip_address }} +
  • +
  • + Location: {{ ip_address.geo_ip or "Unknown" }} +
  • +
  • + User agent: {{ user_agent }} +
  • +
+

+ If you did not make this change, you can email admin@pypi.org to + communicate with the PyPI administrators. +

+{% endblock %} diff --git a/warehouse/templates/email/unrecognized-login/body.txt b/warehouse/templates/email/unrecognized-login/body.txt new file mode 100644 index 000000000000..47e8634369fa --- /dev/null +++ b/warehouse/templates/email/unrecognized-login/body.txt @@ -0,0 +1,14 @@ +{# SPDX-License-Identifier: Apache-2.0 -#} +A login attempt was made from an unrecognized device. + +To complete your login, please visit the following link: + +{{ request.route_url('accounts.confirm-login', _query={'token': token}) }} + +This login attempt was made from: +- IP address: {{ ip_address }} +- Location: {{ ip_address.geo_ip or "Unknown"}} +- User agent: {{ user_agent }} + +If you did not make this change, you can email admin@pypi.org to +communicate with the PyPI administrators. diff --git a/warehouse/templates/email/unrecognized-login/subject.txt b/warehouse/templates/email/unrecognized-login/subject.txt new file mode 100644 index 000000000000..fbbeafbf55b7 --- /dev/null +++ b/warehouse/templates/email/unrecognized-login/subject.txt @@ -0,0 +1,5 @@ +{# SPDX-License-Identifier: Apache-2.0 -#} + +{% extends "email/_base/subject.txt" %} + +{% block subject %}Unrecognized login to your PyPI account{% endblock %}