Skip to content
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
pip install -U pip wheel setuptools
pip install -U -r requirements-test.txt
pip install -U -e .
pip install -UI "openwisp-users @ https://github.com/openwisp/openwisp-users/tarball/issues/238-view-shared-objects" "cryptography~=43.0.3"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrading openwisp-users was installing the latest version of cryptography which is incompatible with django-x509. Thus, pinning the dependency here fixed the CI build.

pip install ${{ matrix.django-version }}

- name: Start postgres and redis
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from django.db import migrations

from . import assign_permissions_to_groups


class Migration(migrations.Migration):
dependencies = [
("config", "0060_cleanup_api_task_notification_types"),
]

operations = [
migrations.RunPython(
assign_permissions_to_groups, reverse_code=migrations.RunPython.noop
)
]
2 changes: 1 addition & 1 deletion openwisp_controller/config/migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def assign_permissions_to_groups(apps, schema_editor):
create_default_permissions(apps, schema_editor)
operators_and_admins_can_change = ["device", "config", "template"]
operators_read_only_admins_manage = ["vpn"]
manage_operations = ["add", "change", "delete"]
manage_operations = ["add", "change", "view", "delete"]
Group = get_swapped_model(apps, "openwisp_users", "Group")

try:
Expand Down
84 changes: 82 additions & 2 deletions openwisp_controller/config/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,63 @@ def test_template_vpn_fk_autocomplete_view(self):
hidden=[data["vpn2"].name, data["vpn_inactive"].name],
)

def test_org_admin_create_template_with_shared_vpn(self):
vpn = self._create_vpn(organization=None)
org = self._create_org()
administrator = self._create_administrator(organizations=[org])
path = reverse(f"admin:{self.app_label}_template_add")
payload = {
"organization": "",
"name": "Test",
"type": "vpn",
"vpn": str(vpn.id),
"backend": "netjsonconfig.OpenWrt",
"config": "",
"default_values": "",
"tags": "",
}
self.assertEqual(Template.objects.count(), 2)

with self.subTest("Should not allow creating shared template"):
self._test_org_admin_create_shareable_object(
path=path,
payload=payload,
model=Template,
expected_count=2,
user=administrator,
)

with self.subTest("Should allow creating non-shared template"):
payload["organization"] = str(org.pk)
self._test_org_admin_create_shareable_object(
path=path,
payload=payload,
model=Template,
expected_count=3,
user=administrator,
raises_error=False,
)

def test_org_admin_view_shared_template(self):
vpn = self._create_vpn(organization=None)
template = self._create_template(type="vpn", vpn=vpn)
self._test_org_admin_view_shareable_object(
path=reverse(f"admin:{self.app_label}_template_change", args=[template.pk]),
)

def test_vpn_queryset(self):
data = self._create_multitenancy_test_env(vpn=True)
self._test_multitenant_admin(
url=reverse(f"admin:{self.app_label}_vpn_changelist"),
visible=[data["org1"].name, data["vpn1"].name],
visible=[
data["org1"].name,
data["vpn1"].name,
data["vpn_shared"].name,
],
hidden=[
data["org2"].name,
data["inactive"],
data["vpn2"].name,
data["vpn_shared"].name,
data["vpn_inactive"].name,
],
)
Expand Down Expand Up @@ -561,6 +608,39 @@ def test_recoverlist_operator_403(self):
self._test_recoverlist_operator_403(self.app_label, "template")
self._test_recoverlist_operator_403(self.app_label, "vpn")

def test_org_admin_create_shared_vpn(self):
org = self._get_org()
ca = self._create_ca(organization=org)
self._test_org_admin_create_shareable_object(
path=reverse(f"admin:{self.app_label}_vpn_add"),
model=Vpn,
payload={
"organization": "",
"name": "Test",
"host": "vpn1.test.com",
"key": "vZFUV5FqYt4WW9nerc23BofQH51gHNNy",
"backend": "openwisp_controller.vpn_backends.OpenVPN",
"ca": ca.pk,
"config": {
"openvpn": [
{
"server_bridge": "10.8.0.0 255.255.255.0",
"name": "tun0",
"mode": "server",
"proto": "udp",
"dev": "tun0",
}
]
},
},
)

def test_org_admin_view_shared_vpn(self):
vpn = self._create_vpn(organization=None)
self._test_org_admin_view_shareable_object(
path=reverse(f"admin:{self.app_label}_vpn_change", args=[vpn.pk]),
)

def test_device_template_filter(self):
org = self._get_org(org_name="test-org")
t = self._create_template(name="test-template", organization=org)
Expand Down
121 changes: 86 additions & 35 deletions openwisp_controller/config/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from openwisp_controller.config.api.serializers import BaseConfigSerializer
from openwisp_controller.tests.utils import TestAdminMixin
from openwisp_users.tests.test_api import AuthenticationMixin
from openwisp_users.tests.test_api import AuthenticationMixin, TestMultitenantApiMixin
from openwisp_utils.tests import capture_any_output, catch_signal

from .. import settings as app_settings
Expand All @@ -33,7 +33,7 @@
OrganizationUser = load_model("openwisp_users", "OrganizationUser")


class ApiTestMixin:
class ApiTestMixin(AuthenticationMixin, TestMultitenantApiMixin):
@property
def _template_data(self):
return {
Expand Down Expand Up @@ -103,7 +103,6 @@ class TestConfigApi(
CreateConfigTemplateMixin,
TestVpnX509Mixin,
CreateDeviceGroupMixin,
AuthenticationMixin,
TestCase,
):
def setUp(self):
Expand Down Expand Up @@ -671,29 +670,50 @@ def test_template_create_of_vpn_type(self):
self.assertEqual(Template.objects.count(), 1)
self.assertEqual(r.status_code, 201)

def test_template_create_with_shared_vpn(self):
org1 = self._get_org()
test_user = self._create_operator(organizations=[org1])
self.client.force_login(test_user)
vpn1 = self._create_vpn(name="vpn1", organization=None)
path = reverse("config_api:template_list")
data = self._template_data
data["type"] = "vpn"
data["vpn"] = vpn1.id
data["organization"] = org1.pk
r = self.client.post(path, data, content_type="application/json")
self.assertEqual(r.status_code, 201)
self.assertEqual(Template.objects.count(), 1)
self.assertEqual(r.data["vpn"], vpn1.id)

def test_template_creation_with_no_org_by_operator(self):
path = reverse("config_api:template_list")
data = self._template_data
def test_operator_access_shared_template(self):
test_user = self._create_operator(organizations=[self._get_org()])
self.client.force_login(test_user)
r = self.client.post(path, data, content_type="application/json")
self.assertEqual(r.status_code, 400)
self.assertIn("This field may not be null.", str(r.content))
token = self._obtain_auth_token(test_user)
self._create_template(organization=None)
self._test_org_user_access_shared_object(
listview_name="config_api:template_list",
detailview_name="config_api:template_detail",
create_payload={"name": "test", "organization": ""},
update_payload={"name": "updated-test"},
expected_count=1,
token=token,
)

def test_org_admin_create_template_with_shared_vpn(self):
org = self._get_org()
vpn = self._create_vpn(organization=None)
create_payload = self._template_data
create_payload.update(
{
"organization": org.pk,
"type": "vpn",
"vpn": vpn.pk,
}
)
update_payload = create_payload.copy()
update_payload["name"] = "updated-test"
test_user = self._create_operator(organizations=[org])
self._test_org_user_access_shared_object(
listview_name="config_api:template_list",
detailview_name="config_api:template_detail",
create_payload=create_payload,
update_payload=update_payload,
expected_count=1,
token=self._obtain_auth_token(test_user),
expected_status_codes={
"create": 201,
"list": 200,
"retrieve": 200,
"update": 200,
"delete": 204,
"head": 200,
"option": 200,
},
)

def test_template_create_with_empty_config(self):
path = reverse("config_api:template_list")
Expand Down Expand Up @@ -855,19 +875,50 @@ def test_vpn_create_api(self):
self.assertEqual(r.status_code, 201)
self.assertEqual(Vpn.objects.count(), 1)

def test_vpn_create_with_shared_objects(self):
org1 = self._get_org()
def test_org_admin_access_vpn_with_shared_objects(self):
org = self._get_org()
shared_ca = self._create_ca(name="shared_ca", organization=None)
create_payload = self._vpn_data
create_payload.update(
{
"organization": org.pk,
"ca": shared_ca.pk,
}
)
update_payload = create_payload.copy()
update_payload["name"] = "updated-test-vpn"
administrator = self._create_administrator(organizations=[org])
self._test_access_shared_object(
listview_name="config_api:vpn_list",
detailview_name="config_api:vpn_detail",
create_payload=create_payload,
update_payload=update_payload,
expected_count=1,
expected_status_codes={
"create": 201,
"list": 200,
"retrieve": 200,
"update": 200,
"delete": 204,
"head": 200,
"option": 200,
},
token=self._obtain_auth_token(administrator),
)

def test_org_admin_create_shared_vpn(self):
shared_ca = self._create_ca(name="shared_ca", organization=None)
test_user = self._create_administrator(organizations=[org1])
self.client.force_login(test_user)
data = self._vpn_data
data["organization"] = org1.pk
data["ca"] = shared_ca.pk
path = reverse("config_api:vpn_list")
r = self.client.post(path, data, content_type="application/json")
self.assertEqual(Vpn.objects.count(), 1)
self.assertEqual(r.status_code, 201)
self.assertEqual(r.data["ca"], shared_ca.pk)
# API does not allow creating shared VPN by org admin,
# therefore we create an object to test the detail view.
self._create_vpn(organization=None, ca=shared_ca)
self._test_org_user_access_shared_object(
listview_name="config_api:vpn_list",
detailview_name="config_api:vpn_detail",
create_payload=data,
expected_count=1,
)

def test_vpn_list_api(self):
org = self._get_org()
Expand Down
1 change: 1 addition & 0 deletions openwisp_controller/connection/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class AbstractCredentials(ConnectorMixin, ShareableOrgMixinUniqueName, BaseModel
# Controls the number of objects which can be stored in memory
# before commiting them to database during bulk auto add operation.
chunk_size = 1000
sensitive_fields = ["params"]

connector = models.CharField(
_("connection type"),
Expand Down
43 changes: 43 additions & 0 deletions openwisp_controller/connection/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,49 @@ def test_admin_menu_groups(self):
url = reverse(f"admin:{self.app_label}_{model}_changelist")
self.assertContains(response, f' class="mg-link" href="{url}"')

def test_org_admin_create_shared_credentials(self):
self._test_org_admin_create_shareable_object(
path=reverse(f"admin:{self.app_label}_credentials_add"),
model=Credentials,
payload={
"name": "Shared Credentials",
"organization": "",
"connector": "openwisp_controller.connection.connectors.ssh.Ssh",
"params": {
"username": "root",
"password": "password",
"port": 22,
},
},
)

def test_org_admin_view_shared_credentials(self):
credentials = self._create_credentials(organization=None)
self._test_org_admin_view_shareable_object(
path=reverse(
f"admin:{self.app_label}_credentials_change", args=[credentials.pk]
),
)

def test_credential_admin_sensitive_fields(self):
"""
Sensitive fields for shared credentials should be hidden for non-superusers.
"""
org = self._get_org()
shared_credentials = self._create_credentials(organization=None)
org_credentials = self._create_credentials(organization=org)
self._test_sensitive_fields_visibility_on_shared_and_org_objects(
sensitive_fields=["params"],
shared_obj_path=reverse(
f"admin:{self.app_label}_credentials_change",
args=(shared_credentials.id,),
),
org_obj_path=reverse(
f"admin:{self.app_label}_credentials_change", args=(org_credentials.id,)
),
organization=org,
)


class TestCommandInlines(TestAdminMixin, CreateConnectionsMixin, TestCase):
config_app_label = "config"
Expand Down
Loading
Loading