|
7 | 7 | import jwt |
8 | 8 |
|
9 | 9 | import synapse.rest.admin |
| 10 | +from synapse.appservice import ApplicationService |
10 | 11 | from synapse.rest.client.v1 import login, logout |
11 | | -from synapse.rest.client.v2_alpha import devices |
| 12 | +from synapse.rest.client.v2_alpha import devices, register |
12 | 13 | from synapse.rest.client.v2_alpha.account import WhoamiRestServlet |
13 | 14 |
|
14 | 15 | from tests import unittest |
@@ -748,3 +749,134 @@ def test_login_jwt_invalid_signature(self): |
748 | 749 | channel.json_body["error"], |
749 | 750 | "JWT validation failed: Signature verification failed", |
750 | 751 | ) |
| 752 | + |
| 753 | + |
| 754 | +AS_USER = "as_user_alice" |
| 755 | + |
| 756 | + |
| 757 | +class AppserviceLoginRestServletTestCase(unittest.HomeserverTestCase): |
| 758 | + servlets = [ |
| 759 | + login.register_servlets, |
| 760 | + register.register_servlets, |
| 761 | + ] |
| 762 | + |
| 763 | + def register_as_user(self, username): |
| 764 | + request, channel = self.make_request( |
| 765 | + b"POST", |
| 766 | + "/_matrix/client/r0/register?access_token=%s" % (self.service.token,), |
| 767 | + {"username": username}, |
| 768 | + ) |
| 769 | + self.render(request) |
| 770 | + |
| 771 | + def make_homeserver(self, reactor, clock): |
| 772 | + self.hs = self.setup_test_homeserver() |
| 773 | + |
| 774 | + self.service = ApplicationService( |
| 775 | + id="unique_identifier", |
| 776 | + token="some_token", |
| 777 | + hostname="example.com", |
| 778 | + sender="@asbot:example.com", |
| 779 | + namespaces={ |
| 780 | + ApplicationService.NS_USERS: [ |
| 781 | + {"regex": r"@as_user.*", "exclusive": False} |
| 782 | + ], |
| 783 | + ApplicationService.NS_ROOMS: [], |
| 784 | + ApplicationService.NS_ALIASES: [], |
| 785 | + }, |
| 786 | + ) |
| 787 | + self.another_service = ApplicationService( |
| 788 | + id="another__identifier", |
| 789 | + token="another_token", |
| 790 | + hostname="example.com", |
| 791 | + sender="@as2bot:example.com", |
| 792 | + namespaces={ |
| 793 | + ApplicationService.NS_USERS: [ |
| 794 | + {"regex": r"@as2_user.*", "exclusive": False} |
| 795 | + ], |
| 796 | + ApplicationService.NS_ROOMS: [], |
| 797 | + ApplicationService.NS_ALIASES: [], |
| 798 | + }, |
| 799 | + ) |
| 800 | + |
| 801 | + self.hs.get_datastore().services_cache.append(self.service) |
| 802 | + self.hs.get_datastore().services_cache.append(self.another_service) |
| 803 | + return self.hs |
| 804 | + |
| 805 | + def test_login_appservice_user(self): |
| 806 | + """Test that an appservice user can use /login |
| 807 | + """ |
| 808 | + self.register_as_user(AS_USER) |
| 809 | + |
| 810 | + params = { |
| 811 | + "type": login.LoginRestServlet.APPSERVICE_TYPE, |
| 812 | + "identifier": {"type": "m.id.user", "user": AS_USER}, |
| 813 | + } |
| 814 | + request, channel = self.make_request( |
| 815 | + b"POST", LOGIN_URL, params, access_token=self.service.token |
| 816 | + ) |
| 817 | + |
| 818 | + self.render(request) |
| 819 | + self.assertEquals(channel.result["code"], b"200", channel.result) |
| 820 | + |
| 821 | + def test_login_appservice_user_bot(self): |
| 822 | + """Test that the appservice bot can use /login |
| 823 | + """ |
| 824 | + self.register_as_user(AS_USER) |
| 825 | + |
| 826 | + params = { |
| 827 | + "type": login.LoginRestServlet.APPSERVICE_TYPE, |
| 828 | + "identifier": {"type": "m.id.user", "user": self.service.sender}, |
| 829 | + } |
| 830 | + request, channel = self.make_request( |
| 831 | + b"POST", LOGIN_URL, params, access_token=self.service.token |
| 832 | + ) |
| 833 | + |
| 834 | + self.render(request) |
| 835 | + self.assertEquals(channel.result["code"], b"200", channel.result) |
| 836 | + |
| 837 | + def test_login_appservice_wrong_user(self): |
| 838 | + """Test that non-as users cannot login with the as token |
| 839 | + """ |
| 840 | + self.register_as_user(AS_USER) |
| 841 | + |
| 842 | + params = { |
| 843 | + "type": login.LoginRestServlet.APPSERVICE_TYPE, |
| 844 | + "identifier": {"type": "m.id.user", "user": "fibble_wibble"}, |
| 845 | + } |
| 846 | + request, channel = self.make_request( |
| 847 | + b"POST", LOGIN_URL, params, access_token=self.service.token |
| 848 | + ) |
| 849 | + |
| 850 | + self.render(request) |
| 851 | + self.assertEquals(channel.result["code"], b"403", channel.result) |
| 852 | + |
| 853 | + def test_login_appservice_wrong_as(self): |
| 854 | + """Test that as users cannot login with wrong as token |
| 855 | + """ |
| 856 | + self.register_as_user(AS_USER) |
| 857 | + |
| 858 | + params = { |
| 859 | + "type": login.LoginRestServlet.APPSERVICE_TYPE, |
| 860 | + "identifier": {"type": "m.id.user", "user": AS_USER}, |
| 861 | + } |
| 862 | + request, channel = self.make_request( |
| 863 | + b"POST", LOGIN_URL, params, access_token=self.another_service.token |
| 864 | + ) |
| 865 | + |
| 866 | + self.render(request) |
| 867 | + self.assertEquals(channel.result["code"], b"403", channel.result) |
| 868 | + |
| 869 | + def test_login_appservice_no_token(self): |
| 870 | + """Test that users must provide a token when using the appservice |
| 871 | + login method |
| 872 | + """ |
| 873 | + self.register_as_user(AS_USER) |
| 874 | + |
| 875 | + params = { |
| 876 | + "type": login.LoginRestServlet.APPSERVICE_TYPE, |
| 877 | + "identifier": {"type": "m.id.user", "user": AS_USER}, |
| 878 | + } |
| 879 | + request, channel = self.make_request(b"POST", LOGIN_URL, params) |
| 880 | + |
| 881 | + self.render(request) |
| 882 | + self.assertEquals(channel.result["code"], b"401", channel.result) |
0 commit comments