Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/autotests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ jobs:

- name: Run tests
run: |
pytest --cov=mergin mergin/test/
pytest --cov=mergin --cov-report=lcov mergin/test/

- name: Submit coverage to Coveralls
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
coveralls --service=github
uses: coverallsapp/github-action@v2
with:
format: lcov
120 changes: 119 additions & 1 deletion mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import typing
import warnings

from .common import ClientError, LoginError, InvalidProject, ErrorCode
from typing import List

from .common import ClientError, LoginError, WorkspaceRole, ProjectRole
from .merginproject import MerginProject
from .client_pull import (
download_file_finalize,
Expand All @@ -36,6 +38,7 @@
from .version import __version__

this_dir = os.path.dirname(os.path.realpath(__file__))
json_headers = {"Content-Type": "application/json"}


class TokenError(Exception):
Expand Down Expand Up @@ -244,6 +247,11 @@ def patch(self, path, data=None, headers={}):
request = urllib.request.Request(url, data, headers, method="PATCH")
return self._do_request(request)

def delete(self, path):
url = urllib.parse.urljoin(self.url, urllib.parse.quote(path))
request = urllib.request.Request(url, method="DELETE")
return self._do_request(request)

def login(self, login, password):
"""
Authenticate login credentials and store session token
Expand Down Expand Up @@ -796,6 +804,12 @@ def add_user_permissions_to_project(self, project_path, usernames, permission_le
if permission_level in ("writer", "owner", "editor", "reader"):
access.get("readersnames").append(name)
self.set_project_access(project_path, access)
warnings.warn(
"This method will be deprecated in the next major release (1.0.0)"
"Use `add_project_collaborator` to create a project permission and "
"`update_project_collaborator` to change it instead.",
category=DeprecationWarning,
)

def remove_user_permissions_from_project(self, project_path, usernames):
"""
Expand All @@ -815,6 +829,11 @@ def remove_user_permissions_from_project(self, project_path, usernames):
if name in access.get("readersnames", []):
access.get("readersnames").remove(name)
self.set_project_access(project_path, access)
warnings.warn(
"This method will be deprecated in the next major release (1.0.0)"
"Use `remove_project_collaborator` instead.",
category=DeprecationWarning,
)

def project_user_permissions(self, project_path):
"""
Expand Down Expand Up @@ -1228,3 +1247,102 @@ def has_editor_support(self):
Returns whether the server version is acceptable for editor support.
"""
return is_version_acceptable(self.server_version(), "2024.4.0")

def create_user(
self,
email: str,
password: str,
workspace_id: int,
workspace_role: WorkspaceRole,
username: str = None,
notify_user: bool = False,
) -> dict:
"""
Create a new user in a workspace. The username is generated from the email address.

param email: email of the new user - must be unique
param password: password - must meet the requirements
param workspace_id: id of the workspace user is created in
param workspace_role: workspace role of the user
param username: username - will be autogenerated from the email if not provided
param notify_user: flag for email notifications - confirmation email will be sent
"""
params = {
"email": email,
"password": password,
"workspace_id": workspace_id,
"role": workspace_role.value,
"notify_user": notify_user,
}
if username:
params["username"] = username
user_info = self.post("v2/users", params, json_headers)
return json.load(user_info)

def get_workspace_member(self, workspace_id: int, user_id: int) -> dict:
"""
Get a workspace member detail
"""
resp = self.get(f"v2/workspaces/{workspace_id}/members/{user_id}")
return json.load(resp)

def list_workspace_members(self, workspace_id: int) -> List[dict]:
"""
Get a list of workspace members
"""
resp = self.get(f"v2/workspaces/{workspace_id}/members")
return json.load(resp)

def update_workspace_member(
self, workspace_id: int, user_id: int, workspace_role: WorkspaceRole, reset_projects_roles: bool = False
) -> dict:
"""
Update workspace role of a workspace member, optionally resets the projects role

param reset_projects_roles: all project specific roles will be removed
"""
params = {
"reset_projects_roles": reset_projects_roles,
"workspace_role": workspace_role.value,
}
workspace_member = self.patch(f"v2/workspaces/{workspace_id}/members/{user_id}", params, json_headers)
return json.load(workspace_member)

def remove_workspace_member(self, workspace_id: int, user_id: int):
"""
Remove a user from workspace members
"""
self.delete(f"v2/workspaces/{workspace_id}/members/{user_id}")

def list_project_collaborators(self, project_id: int) -> List[dict]:
"""
Get a list of project collaborators
"""
project_collaborators = self.get(f"v2/projects/{project_id}/collaborators")
return json.load(project_collaborators)

def add_project_collaborator(self, project_id: int, user: str, project_role: ProjectRole) -> dict:
"""
Add a user to project collaborators and grant them a project role.
Fails if user is already a member of the project.

param user: login (username or email) of the user
"""
params = {"role": project_role.value, "user": user}
project_collaborator = self.post(f"v2/projects/{project_id}/collaborators", params, json_headers)
return json.load(project_collaborator)

def update_project_collaborator(self, project_id: int, user_id: int, project_role: ProjectRole) -> dict:
"""
Update project role of the existing project collaborator.
Fails if user is not a member of the project yet.
"""
params = {"role": project_role.value}
project_collaborator = self.patch(f"v2/projects/{project_id}/collaborators/{user_id}", params, json_headers)
return json.load(project_collaborator)

def remove_project_collaborator(self, project_id: int, user_id: int):
"""
Remove a user from project collaborators
"""
self.delete(f"v2/projects/{project_id}/collaborators/{user_id}")
24 changes: 24 additions & 0 deletions mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,27 @@ class InvalidProject(Exception):

import dateutil.parser
from dateutil.tz import tzlocal


class WorkspaceRole(Enum):
"""
Workspace roles
"""

GUEST = "guest"
READER = "reader"
EDITOR = "editor"
WRITER = "writer"
ADMIN = "admin"
OWNER = "owner"


class ProjectRole(Enum):
"""
Project roles
"""

READER = "reader"
EDITOR = "editor"
WRITER = "writer"
OWNER = "owner"
6 changes: 3 additions & 3 deletions mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import filterfalse
from typing import Callable
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file

Expand All @@ -24,7 +24,7 @@ def is_editor_enabled(mc, project_info: dict) -> bool:
return server_support and project_role == EDITOR_ROLE_NAME


def _apply_editor_filters(changes: dict[str, list[dict]]) -> dict[str, list[dict]]:
def _apply_editor_filters(changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""
Applies editor-specific filters to the changes dictionary, removing any changes to files that are not in the editor's list of allowed files.

Expand All @@ -40,7 +40,7 @@ def _apply_editor_filters(changes: dict[str, list[dict]]) -> dict[str, list[dict
return changes


def filter_changes(mc, project_info: dict, changes: dict[str, list[dict]]) -> dict[str, list[dict]]:
def filter_changes(mc, project_info: dict, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""
Filters the given changes dictionary based on the editor's enabled state.

Expand Down
108 changes: 96 additions & 12 deletions mergin/test/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import random
import tempfile
import subprocess
import shutil
Expand All @@ -19,7 +20,6 @@
decode_token_data,
TokenError,
ServerType,
ErrorCode,
)
from ..client_push import push_project_async, push_project_cancel
from ..client_pull import (
Expand All @@ -39,7 +39,7 @@
from ..merginproject import pygeodiff
from ..report import create_report
from ..editor import EDITOR_ROLE_NAME, filter_changes, is_editor_enabled

from ..common import ErrorCode, WorkspaceRole, ProjectRole

SERVER_URL = os.environ.get("TEST_MERGIN_URL")
API_USER = os.environ.get("TEST_API_USERNAME")
Expand All @@ -51,6 +51,8 @@
CHANGED_SCHEMA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "modified_schema")
STORAGE_WORKSPACE = os.environ.get("TEST_STORAGE_WORKSPACE", "testpluginstorage")

json_headers = {"Content-Type": "application/json"}


def get_limit_overrides(storage: int):
return {"storage": storage, "projects": 2, "api_allowed": True}
Expand Down Expand Up @@ -2597,17 +2599,17 @@ def test_editor_push(mc: MerginClient, mc2: MerginClient):
"""Test push with editor"""
if not mc.has_editor_support():
return
test_project = "test_editor_push"
test_project_fullname = API_USER + "/" + test_project
project = API_USER + "/" + test_project
project_dir = os.path.join(TMP_DIR, test_project)
project_dir2 = os.path.join(TMP_DIR, test_project + "_2")
cleanup(mc, project, [project_dir, project_dir2])
test_project_name = "test_editor_push"
test_project_fullname = API_USER + "/" + test_project_name
project_dir = os.path.join(TMP_DIR, test_project_name)
project_dir2 = os.path.join(TMP_DIR, test_project_name + "_2")
cleanup(mc, test_project_fullname, [project_dir, project_dir2])

# create new (empty) project on server
# TODO: return project_info from create project, don't use project_full name for project info, instead returned id of project
mc.create_project(test_project)
mc.add_user_permissions_to_project(project, [API_USER2], "editor")
mc.create_project(test_project_name)
project_info = get_project_info(mc, API_USER, test_project_name)
mc.add_project_collaborator(project_info["id"], mc2.username(), ProjectRole.EDITOR)
# download empty project
mc2.download_project(test_project_fullname, project_dir)

Expand Down Expand Up @@ -2649,12 +2651,11 @@ def test_editor_push(mc: MerginClient, mc2: MerginClient):
# editor is trying to update qgis file
with open(os.path.join(project_dir, qgs_file_name), "a") as f:
f.write("Editor is here!")
project_info = mc2.project_info(test_project_fullname)
pull_changes, push_changes, push_changes_summary = mc.project_status(project_dir)
# ggs is still waiting to push
assert any(file["path"] == qgs_file_name for file in push_changes.get("updated")) is True

# push as owner do cleanup local changes and preparation to conflicited copy simulate
# push as owner do cleanup local changes and preparation to conflicted copy simulate
mc.push_project(project_dir)

# simulate conflicting copy of qgis file
Expand Down Expand Up @@ -2742,3 +2743,86 @@ def test_workspace_requests(mc2: MerginClient):
assert service["plan"]["product_id"] == None
assert service["plan"]["type"] == "custom"
assert service["subscription"] == None


def test_access_management(mc: MerginClient, mc2: MerginClient):
# create a user in the workspace
workspace_id = next((w["id"] for w in mc.workspaces_list() if w["name"] == mc.username()))
email = "create_user" + str(random.randint(1000, 9999)) + "@client.py"
password = "Il0vemergin"
ws_role = WorkspaceRole.WRITER
user_info = mc.create_user(email, password, workspace_id, ws_role)
assert user_info["email"] == email
assert user_info["receive_notifications"] is False
# list workspace members
workspace_members = mc.list_workspace_members(workspace_id)
new_user = next((m for m in workspace_members if m["email"] == email))
assert new_user
assert new_user["workspace_role"] == ws_role.value
# get workspace member
ws_member = mc.get_workspace_member(workspace_id, new_user["id"])
assert ws_member["email"] == email
assert ws_member["workspace_role"] == ws_role.value
updated_role = WorkspaceRole.ADMIN
# update workspace member
mc.update_workspace_member(workspace_id, new_user["id"], updated_role)
updated_user = mc.get_workspace_member(workspace_id, new_user["id"])
assert updated_user["workspace_role"] == updated_role.value
# test permissions - a different client cannot update the role
with pytest.raises(ClientError, match=f"You do not have admin permissions to workspace"):
mc2.update_workspace_member(workspace_id, new_user["id"], ws_role)
# remove workspace member
mc.remove_workspace_member(workspace_id, new_user["id"])
workspace_members = mc.list_workspace_members(workspace_id)
assert not any(m["id"] == new_user["id"] for m in workspace_members)
# duplicated call
with pytest.raises(ClientError) as exc_info:
mc.remove_workspace_member(workspace_id, new_user["id"])
assert exc_info.value.http_error == 404
# add project
test_project_name = "test_collaborators"
test_project_fullname = API_USER + "/" + test_project_name
project_dir = os.path.join(TMP_DIR, test_project_name, API_USER)
cleanup(mc, test_project_fullname, [project_dir])
mc.create_project(test_project_name)
project_info = get_project_info(mc, API_USER, test_project_name)
test_project_id = project_info["id"]
project_role = ProjectRole.READER
# user must be added to project collaborators before updating project role
updated_role = ProjectRole.OWNER
with pytest.raises(ClientError) as exc_info2:
mc.update_project_collaborator(test_project_id, new_user["id"], updated_role)
assert exc_info2.value.http_error == 404
# add project collaborator
mc.add_project_collaborator(test_project_id, new_user["email"], project_role)
collaborators = mc.list_project_collaborators(test_project_id)
new_collaborator = next((c for c in collaborators if c["id"] == new_user["id"]))
assert new_collaborator
assert new_collaborator["project_role"] == project_role.value
# update project collaborator
mc.update_project_collaborator(test_project_id, new_user["id"], updated_role)
collaborators = mc.list_project_collaborators(test_project_id)
updated_collaborator = next((c for c in collaborators if c["id"] == new_user["id"]))
assert updated_collaborator["project_role"] == updated_role.value
# remove project collaborator
mc.remove_project_collaborator(test_project_id, new_user["id"])
collaborators = mc.list_project_collaborators(test_project_id)
assert not any(c["id"] == new_user["id"] for c in collaborators)
# try to assign new editor when editors limit is reached
ws_usage = mc.workspace_usage(workspace_id)
editors_usage = ws_usage["editors"]["editors_count"] + ws_usage["editors"]["invitations_count"]
mc.patch(
f"/v1/tests/workspaces/{workspace_id}",
{"limits_override": {"editors": editors_usage}},
json_headers,
)
editor_role = ProjectRole.EDITOR
with pytest.raises(ClientError, match="Maximum number of editors in this workspace is reached."):
mc.add_project_collaborator(test_project_id, new_user["email"], editor_role)
# set limits to the original state
orig_projects_limit = ws_usage["projects"]["quota"]
mc.patch(
f"/v1/tests/workspaces/{workspace_id}",
{"limits_override": {"projects": orig_projects_limit}},
json_headers,
)