Skip to content

Commit d19d5d1

Browse files
authored
Merge pull request #537 from MerginMaps/sync-remove-chunks
Sync bottleneck: prevent remove chunks
2 parents 58fca7c + 05fb028 commit d19d5d1

File tree

4 files changed

+32
-15
lines changed

4 files changed

+32
-15
lines changed

server/mergin/sync/public_api_controller.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,14 +1028,6 @@ def push_finish(transaction_id):
10281028
# let's move uploaded files where they are expected to be
10291029
os.renames(files_dir, version_dir)
10301030

1031-
# remove used chunks
1032-
for file in upload.changes["added"] + upload.changes["updated"]:
1033-
file_chunks = file.get("chunks", [])
1034-
for chunk_id in file_chunks:
1035-
chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id)
1036-
if os.path.exists(chunk_file):
1037-
move_to_tmp(chunk_file)
1038-
10391031
logging.info(
10401032
f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}."
10411033
)

server/mergin/sync/public_api_v2_controller.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
from marshmallow import ValidationError
1515
from sqlalchemy.exc import IntegrityError
1616

17+
from mergin.sync.tasks import remove_transaction_chunks
18+
1719
from .schemas_v2 import ProjectSchema as ProjectSchemaV2
1820
from ..app import db
1921
from ..auth import auth_required
@@ -319,12 +321,12 @@ def create_project_version(id):
319321
os.renames(temp_files_dir, version_dir)
320322

321323
# remove used chunks
324+
# get chunks from added and updated files
325+
chunks_ids = []
322326
for file in to_be_added_files + to_be_updated_files:
323327
file_chunks = file.get("chunks", [])
324-
for chunk_id in file_chunks:
325-
chunk_file = get_chunk_location(chunk_id)
326-
if os.path.exists(chunk_file):
327-
move_to_tmp(chunk_file)
328+
chunks_ids.extend(file_chunks)
329+
remove_transaction_chunks.delay(chunks_ids)
328330

329331
logging.info(
330332
f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}."
@@ -377,7 +379,6 @@ def upload_chunk(id: str):
377379
# we could have used request.data here, but it could eventually cause OOM issue
378380
save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"])
379381
except IOError:
380-
move_to_tmp(dest_file, chunk_id)
381382
return BigChunkError().response(413)
382383
except Exception as e:
383384
return UploadError(error="Error saving chunk").response(400)

server/mergin/sync/tasks.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
import os
88
import time
99
from datetime import datetime, timedelta, timezone
10+
from typing import List, Optional
1011
from zipfile import ZIP_DEFLATED, ZipFile
1112
from flask import current_app
1213

1314
from .models import Project, ProjectVersion, FileHistory
1415
from .storages.disk import move_to_tmp
1516
from .config import Configuration
16-
from .utils import remove_outdated_files
17+
from .utils import get_chunk_location, remove_outdated_files
1718
from ..celery import celery
1819
from ..app import db
1920

@@ -169,3 +170,14 @@ def remove_unused_chunks():
169170
if not os.path.isdir(dir):
170171
continue
171172
remove_outdated_files(dir, time_delta)
173+
174+
175+
@celery.task
176+
def remove_transaction_chunks(chunks: Optional[List[str]] = None):
177+
"""Remove chunks related to a specific sync transaction"""
178+
if not chunks:
179+
return
180+
for chunk in chunks:
181+
chunk_path = get_chunk_location(chunk)
182+
if os.path.exists(chunk_path):
183+
os.remove(chunk_path)

server/mergin/tests/test_public_api_v2.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#
33
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial
44

5+
from mergin.sync.tasks import remove_transaction_chunks, remove_unused_chunks
56
from . import DEFAULT_USER
67
from .utils import (
78
add_user,
@@ -22,6 +23,7 @@
2223

2324
from mergin.app import db
2425
from mergin.config import Configuration
26+
from mergin.sync.config import Configuration as SyncConfiguration
2527
from mergin.sync.errors import (
2628
BigChunkError,
2729
ProjectLocked,
@@ -356,6 +358,7 @@ def test_create_version(client, data, expected, err_code):
356358
assert project.latest_version == 1
357359

358360
chunks = []
361+
chunk_ids = []
359362
if expected == 201:
360363
# mimic chunks were uploaded
361364
for f in data["changes"]["added"] + data["changes"]["updated"]:
@@ -372,12 +375,21 @@ def test_create_version(client, data, expected, err_code):
372375
out_file.write(in_file.read(CHUNK_SIZE))
373376

374377
chunks.append(chunk_location)
378+
chunk_ids.append(chunk)
375379

376-
response = client.post(f"v2/projects/{project.id}/versions", json=data)
380+
with patch(
381+
"mergin.sync.public_api_v2_controller.remove_transaction_chunks.delay"
382+
) as mock_remove:
383+
response = client.post(f"v2/projects/{project.id}/versions", json=data)
377384
assert response.status_code == expected
378385
if expected == 201:
379386
assert response.json["version"] == "v2"
380387
assert project.latest_version == 2
388+
# chunks exists after upload, cleanup job did not remove them
389+
assert all(os.path.exists(chunk) for chunk in chunks)
390+
if chunk_ids:
391+
assert mock_remove.called_once_with(chunk_ids)
392+
remove_transaction_chunks(chunk_ids)
381393
assert all(not os.path.exists(chunk) for chunk in chunks)
382394
else:
383395
assert project.latest_version == 1

0 commit comments

Comments
 (0)