Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions server/mergin/sync/public_api_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,14 +1028,6 @@ def push_finish(transaction_id):
# let's move uploaded files where they are expected to be
os.renames(files_dir, version_dir)

# remove used chunks
for file in upload.changes["added"] + upload.changes["updated"]:
file_chunks = file.get("chunks", [])
for chunk_id in file_chunks:
chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id)
if os.path.exists(chunk_file):
move_to_tmp(chunk_file)

logging.info(
f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}."
)
Expand Down
11 changes: 6 additions & 5 deletions server/mergin/sync/public_api_v2_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from marshmallow import ValidationError
from sqlalchemy.exc import IntegrityError

from mergin.sync.tasks import remove_transaction_chunks

from .schemas_v2 import ProjectSchema as ProjectSchemaV2
from ..app import db
from ..auth import auth_required
Expand Down Expand Up @@ -319,12 +321,12 @@ def create_project_version(id):
os.renames(temp_files_dir, version_dir)

# remove used chunks
# get chunks from added and updated files
chunks_ids = []
for file in to_be_added_files + to_be_updated_files:
file_chunks = file.get("chunks", [])
for chunk_id in file_chunks:
chunk_file = get_chunk_location(chunk_id)
if os.path.exists(chunk_file):
move_to_tmp(chunk_file)
chunks_ids.extend(file_chunks)
remove_transaction_chunks.delay(chunks_ids)

logging.info(
f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}."
Expand Down Expand Up @@ -377,7 +379,6 @@ def upload_chunk(id: str):
# we could have used request.data here, but it could eventually cause OOM issue
save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"])
except IOError:
move_to_tmp(dest_file, chunk_id)
return BigChunkError().response(413)
except Exception as e:
return UploadError(error="Error saving chunk").response(400)
Expand Down
11 changes: 10 additions & 1 deletion server/mergin/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .models import Project, ProjectVersion, FileHistory
from .storages.disk import move_to_tmp
from .config import Configuration
from .utils import remove_outdated_files
from .utils import get_chunk_location, remove_outdated_files
from ..celery import celery
from ..app import db

Expand Down Expand Up @@ -169,3 +169,12 @@ def remove_unused_chunks():
if not os.path.isdir(dir):
continue
remove_outdated_files(dir, time_delta)


@celery.task
def remove_transaction_chunks(chunks=[]):
"""Remove chunks related to a specific sync transaction"""
for chunk in chunks:
chunk_path = get_chunk_location(chunk)
if os.path.exists(chunk_path):
os.remove(chunk_path)
14 changes: 13 additions & 1 deletion server/mergin/tests/test_public_api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial

from mergin.sync.tasks import remove_transaction_chunks, remove_unused_chunks
from . import DEFAULT_USER
from .utils import (
add_user,
Expand All @@ -22,6 +23,7 @@

from mergin.app import db
from mergin.config import Configuration
from mergin.sync.config import Configuration as SyncConfiguration
from mergin.sync.errors import (
BigChunkError,
ProjectLocked,
Expand Down Expand Up @@ -356,6 +358,7 @@ def test_create_version(client, data, expected, err_code):
assert project.latest_version == 1

chunks = []
chunk_ids = []
if expected == 201:
# mimic chunks were uploaded
for f in data["changes"]["added"] + data["changes"]["updated"]:
Expand All @@ -372,12 +375,21 @@ def test_create_version(client, data, expected, err_code):
out_file.write(in_file.read(CHUNK_SIZE))

chunks.append(chunk_location)
chunk_ids.append(chunk)

response = client.post(f"v2/projects/{project.id}/versions", json=data)
with patch(
"mergin.sync.public_api_v2_controller.remove_transaction_chunks.delay"
) as mock_remove:
response = client.post(f"v2/projects/{project.id}/versions", json=data)
assert response.status_code == expected
if expected == 201:
assert response.json["version"] == "v2"
assert project.latest_version == 2
# chunks exists after upload, cleanup job did not remove them
assert all(os.path.exists(chunk) for chunk in chunks)
if chunk_ids:
assert mock_remove.called_once_with(chunk_ids)
remove_transaction_chunks(chunk_ids)
assert all(not os.path.exists(chunk) for chunk in chunks)
else:
assert project.latest_version == 1
Expand Down
Loading