|
| 1 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 2 | +# you may not use this file except in compliance with the License. |
| 3 | +# You may obtain a copy of the License at |
| 4 | +# |
| 5 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 6 | +# |
| 7 | +# Unless required by applicable law or agreed to in writing, software |
| 8 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | +# See the License for the specific language governing permissions and |
| 11 | +# limitations under the License. |
| 12 | +from http import HTTPStatus |
| 13 | +from typing import BinaryIO, Callable, Dict, List, Optional, Tuple |
| 14 | +from unittest.mock import Mock |
| 15 | + |
| 16 | +from twisted.test.proto_helpers import MemoryReactor |
| 17 | +from twisted.web.http_headers import Headers |
| 18 | + |
| 19 | +from synapse.api.errors import Codes, SynapseError |
| 20 | +from synapse.http.client import RawHeaders |
| 21 | +from synapse.server import HomeServer |
| 22 | +from synapse.util import Clock |
| 23 | + |
| 24 | +from tests import unittest |
| 25 | +from tests.test_utils import SMALL_PNG, FakeResponse |
| 26 | + |
| 27 | + |
| 28 | +class TestSSOHandler(unittest.HomeserverTestCase): |
| 29 | + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: |
| 30 | + self.http_client = Mock(spec=["get_file"]) |
| 31 | + self.http_client.get_file.side_effect = mock_get_file |
| 32 | + self.http_client.user_agent = b"Synapse Test" |
| 33 | + hs = self.setup_test_homeserver( |
| 34 | + proxied_blacklisted_http_client=self.http_client |
| 35 | + ) |
| 36 | + return hs |
| 37 | + |
| 38 | + async def test_set_avatar(self) -> None: |
| 39 | + """Tests successfully setting the avatar of a newly created user""" |
| 40 | + handler = self.hs.get_sso_handler() |
| 41 | + |
| 42 | + # Create a new user to set avatar for |
| 43 | + reg_handler = self.hs.get_registration_handler() |
| 44 | + user_id = self.get_success(reg_handler.register_user(approved=True)) |
| 45 | + |
| 46 | + self.assertTrue( |
| 47 | + self.get_success(handler.set_avatar(user_id, "http://my.server/me.png")) |
| 48 | + ) |
| 49 | + |
| 50 | + # Ensure avatar is set on this newly created user, |
| 51 | + # so no need to compare for the exact image |
| 52 | + profile_handler = self.hs.get_profile_handler() |
| 53 | + profile = self.get_success(profile_handler.get_profile(user_id)) |
| 54 | + self.assertIsNot(profile["avatar_url"], None) |
| 55 | + |
| 56 | + @unittest.override_config({"max_avatar_size": 1}) |
| 57 | + async def test_set_avatar_too_big_image(self) -> None: |
| 58 | + """Tests that saving an avatar fails when it is too big""" |
| 59 | + handler = self.hs.get_sso_handler() |
| 60 | + |
| 61 | + # any random user works since image check is supposed to fail |
| 62 | + user_id = "@sso-user:test" |
| 63 | + |
| 64 | + self.assertFalse( |
| 65 | + self.get_success(handler.set_avatar(user_id, "http://my.server/me.png")) |
| 66 | + ) |
| 67 | + |
| 68 | + @unittest.override_config({"allowed_avatar_mimetypes": ["image/jpeg"]}) |
| 69 | + async def test_set_avatar_incorrect_mime_type(self) -> None: |
| 70 | + """Tests that saving an avatar fails when its mime type is not allowed""" |
| 71 | + handler = self.hs.get_sso_handler() |
| 72 | + |
| 73 | + # any random user works since image check is supposed to fail |
| 74 | + user_id = "@sso-user:test" |
| 75 | + |
| 76 | + self.assertFalse( |
| 77 | + self.get_success(handler.set_avatar(user_id, "http://my.server/me.png")) |
| 78 | + ) |
| 79 | + |
| 80 | + async def test_skip_saving_avatar_when_not_changed(self) -> None: |
| 81 | + """Tests whether saving of avatar correctly skips if the avatar hasn't |
| 82 | + changed""" |
| 83 | + handler = self.hs.get_sso_handler() |
| 84 | + |
| 85 | + # Create a new user to set avatar for |
| 86 | + reg_handler = self.hs.get_registration_handler() |
| 87 | + user_id = self.get_success(reg_handler.register_user(approved=True)) |
| 88 | + |
| 89 | + # set avatar for the first time, should be a success |
| 90 | + self.assertTrue( |
| 91 | + self.get_success(handler.set_avatar(user_id, "http://my.server/me.png")) |
| 92 | + ) |
| 93 | + |
| 94 | + # get avatar picture for comparison after another attempt |
| 95 | + profile_handler = self.hs.get_profile_handler() |
| 96 | + profile = self.get_success(profile_handler.get_profile(user_id)) |
| 97 | + url_to_match = profile["avatar_url"] |
| 98 | + |
| 99 | + # set same avatar for the second time, should be a success |
| 100 | + self.assertTrue( |
| 101 | + self.get_success(handler.set_avatar(user_id, "http://my.server/me.png")) |
| 102 | + ) |
| 103 | + |
| 104 | + # compare avatar picture's url from previous step |
| 105 | + profile = self.get_success(profile_handler.get_profile(user_id)) |
| 106 | + self.assertEqual(profile["avatar_url"], url_to_match) |
| 107 | + |
| 108 | + |
| 109 | +async def mock_get_file( |
| 110 | + url: str, |
| 111 | + output_stream: BinaryIO, |
| 112 | + max_size: Optional[int] = None, |
| 113 | + headers: Optional[RawHeaders] = None, |
| 114 | + is_allowed_content_type: Optional[Callable[[str], bool]] = None, |
| 115 | +) -> Tuple[int, Dict[bytes, List[bytes]], str, int]: |
| 116 | + |
| 117 | + fake_response = FakeResponse(code=404) |
| 118 | + if url == "http://my.server/me.png": |
| 119 | + fake_response = FakeResponse( |
| 120 | + code=200, |
| 121 | + headers=Headers( |
| 122 | + {"Content-Type": ["image/png"], "Content-Length": [str(len(SMALL_PNG))]} |
| 123 | + ), |
| 124 | + body=SMALL_PNG, |
| 125 | + ) |
| 126 | + |
| 127 | + if max_size is not None and max_size < len(SMALL_PNG): |
| 128 | + raise SynapseError( |
| 129 | + HTTPStatus.BAD_GATEWAY, |
| 130 | + "Requested file is too large > %r bytes" % (max_size,), |
| 131 | + Codes.TOO_LARGE, |
| 132 | + ) |
| 133 | + |
| 134 | + if is_allowed_content_type and not is_allowed_content_type("image/png"): |
| 135 | + raise SynapseError( |
| 136 | + HTTPStatus.BAD_GATEWAY, |
| 137 | + ( |
| 138 | + "Requested file's content type not allowed for this operation: %s" |
| 139 | + % "image/png" |
| 140 | + ), |
| 141 | + ) |
| 142 | + |
| 143 | + output_stream.write(fake_response.body) |
| 144 | + |
| 145 | + return len(SMALL_PNG), {b"Content-Type": [b"image/png"]}, "", 200 |
0 commit comments