Skip to content

Commit 3a51545

Browse files
authored
Merge pull request #535 from MerginMaps/create_checkpoint_recursively
Create checkpoint recursively
2 parents f01986e + 2b872e5 commit 3a51545

File tree

6 files changed

+353
-105
lines changed

6 files changed

+353
-105
lines changed

server/.test.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ SECURITY_BEARER_SALT='bearer'
2424
SECURITY_EMAIL_SALT='email'
2525
SECURITY_PASSWORD_SALT='password'
2626
DIAGNOSTIC_LOGS_DIR=/tmp/diagnostic_logs
27+
GEVENT_WORKER=0

server/mergin/sync/commands.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,50 @@ def remove(project_name):
127127
project.removed_by = None
128128
db.session.commit()
129129
click.secho("Project removed", fg="green")
130+
131+
@project.command()
132+
@click.argument("project-name", callback=normalize_input(lowercase=False))
133+
@click.option("--since", type=int, required=False)
134+
@click.option("--to", type=int, required=False)
135+
def create_checkpoint(project_name, since=None, to=None):
136+
"""Create project delta checkpoint, corresponding lower checkpoints and merged diffs for project"""
137+
ws, name = split_project_path(project_name)
138+
workspace = current_app.ws_handler.get_by_name(ws)
139+
if not workspace:
140+
click.secho("ERROR: Workspace does not exist", fg="red", err=True)
141+
sys.exit(1)
142+
project = (
143+
Project.query.filter_by(workspace_id=workspace.id, name=name)
144+
.filter(Project.storage_params.isnot(None))
145+
.first()
146+
)
147+
if not project:
148+
click.secho("ERROR: Project does not exist", fg="red", err=True)
149+
sys.exit(1)
150+
151+
since = since if since is not None else 0
152+
to = to if to is not None else project.latest_version
153+
if since < 0 or to < 1:
154+
click.secho(
155+
"ERROR: Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1",
156+
fg="red",
157+
err=True,
158+
)
159+
sys.exit(1)
160+
161+
if to > project.latest_version:
162+
click.secho(
163+
"ERROR: 'to' version exceeds latest project version", fg="red", err=True
164+
)
165+
sys.exit(1)
166+
167+
if since >= to:
168+
click.secho(
169+
"ERROR: 'since' version must be less than 'to' version",
170+
fg="red",
171+
err=True,
172+
)
173+
sys.exit(1)
174+
175+
project.get_delta_changes(since, to)
176+
click.secho("Project checkpoint(s) created", fg="green")

server/mergin/sync/models.py

Lines changed: 119 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,6 @@ def get_delta_changes(
399399
)
400400
existing_delta_map = {(c.rank, c.version): c for c in expected_deltas}
401401

402-
# Cache all individual (rank 0) delta rows in the required range.
403-
individual_deltas: List[ProjectVersionDelta] = []
404-
405402
result: List[DeltaChange] = []
406403
for checkpoint in expected_checkpoints:
407404
existing_delta = existing_delta_map.get((checkpoint.rank, checkpoint.end))
@@ -411,29 +408,10 @@ def get_delta_changes(
411408
result.extend(DeltaChangeSchema(many=True).load(existing_delta.changes))
412409
continue
413410

414-
# If higher rank delta checkopoint does not exists, we are using rank=0 deltas to create checkopoint
411+
# If higher rank delta checkopoint does not exists we need to create it
415412
if checkpoint.rank > 0:
416-
individual_deltas = (
417-
ProjectVersionDelta.query.filter(
418-
ProjectVersionDelta.project_id == project_id,
419-
ProjectVersionDelta.version >= since,
420-
ProjectVersionDelta.version <= to,
421-
ProjectVersionDelta.rank == 0,
422-
)
423-
.order_by(ProjectVersionDelta.version)
424-
.all()
425-
if not individual_deltas
426-
else individual_deltas
427-
)
428-
429-
if not individual_deltas:
430-
logging.error(
431-
f"No individual deltas found for project {project_id} in range {since} / {to} to create checkpoint."
432-
)
433-
return
434-
435413
new_checkpoint = ProjectVersionDelta.create_checkpoint(
436-
project_id, checkpoint, individual_deltas
414+
project_id, checkpoint
437415
)
438416
if new_checkpoint:
439417
result.extend(
@@ -443,6 +421,7 @@ def get_delta_changes(
443421
logging.error(
444422
f"Not possible to create checkpoint for project {project_id} in range {checkpoint.start}-{checkpoint.end}"
445423
)
424+
return
446425

447426
return ProjectVersionDelta.merge_changes(result)
448427

@@ -520,6 +499,10 @@ def __init__(self, project_id, path):
520499
self.project_id = project_id
521500
self.path = path
522501

502+
def generate_diff_name(self):
503+
"""Generate uniqute diff file name for server generated diff"""
504+
return mergin_secure_filename(f"{self.path}-diff-{uuid.uuid4()}")
505+
523506

524507
class LatestProjectFiles(db.Model):
525508
"""Store project latest version files history ids"""
@@ -775,7 +758,7 @@ def diffs_chain(
775758
),
776759
None,
777760
)
778-
if diff:
761+
if diff and os.path.exists(diff.abs_path):
779762
diffs.append(diff)
780763
elif item.rank > 0:
781764
# fallback if checkpoint does not exist: replace merged diff with individual diffs
@@ -876,16 +859,58 @@ def abs_path(self) -> str:
876859
"""
877860
return os.path.join(self.file.project.storage.project_dir, self.location)
878861

862+
@staticmethod
863+
def can_create_checkpoint(file_path_id: int, checkpoint: Checkpoint) -> bool:
864+
"""Check if it makes sense to create a diff file for a checkpoint, e.g. there were relevant changes within the range without breaking changes"""
865+
866+
basefile = FileHistory.get_basefile(file_path_id, checkpoint.end)
867+
if not basefile:
868+
return False
869+
870+
file_was_deleted = (
871+
FileHistory.query.filter_by(file_path_id=file_path_id)
872+
.filter(
873+
FileHistory.project_version_name
874+
>= max(basefile.project_version_name, checkpoint.start),
875+
FileHistory.project_version_name <= checkpoint.end,
876+
FileHistory.change == PushChangeType.DELETE.value,
877+
)
878+
.count()
879+
> 0
880+
)
881+
if file_was_deleted:
882+
return False
883+
884+
query = FileDiff.query.filter_by(basefile_id=basefile.id).filter(
885+
FileDiff.rank == 0
886+
)
887+
888+
# rank 0 is a special case we only verify it exists
889+
if checkpoint.rank == 0:
890+
query = query.filter(FileDiff.version == checkpoint.end)
891+
# for higher ranks we need to check if there were diff updates in that range
892+
else:
893+
query = query.filter(
894+
FileDiff.version >= checkpoint.start,
895+
FileDiff.version <= checkpoint.end,
896+
)
897+
898+
return query.count() > 0
899+
879900
def construct_checkpoint(self) -> bool:
880901
"""Create a diff file checkpoint (aka. merged diff).
881902
Find all smaller diffs which are needed to create the final diff file and merge them.
882-
In case of missing some lower rank checkpoint, use individual diffs instead.
903+
In case of missing some lower rank checkpoints, create them recursively.
883904
884905
Once checkpoint is created, size and checksum are updated in the database.
885906
886907
Returns:
887908
bool: True if checkpoint was successfully created or already present
888909
"""
910+
logging.debug(
911+
f"Construct checkpoint for file {self.path} v{self.version} of rank {self.rank}"
912+
)
913+
889914
if os.path.exists(self.abs_path):
890915
return True
891916

@@ -914,7 +939,7 @@ def construct_checkpoint(self) -> bool:
914939
return False
915940

916941
diffs_paths = []
917-
# let's confirm we have all intermediate diffs needed, if not, we need to use individual diffs instead
942+
# let's confirm we have all intermediate diffs needed, if not, we need to create them (recursively) first
918943
cached_items = Checkpoint.get_checkpoints(
919944
cache_level.start, cache_level.end - 1
920945
)
@@ -936,6 +961,7 @@ def construct_checkpoint(self) -> bool:
936961
continue
937962

938963
# find diff in table and on disk
964+
# diffs might not exist because theye were not created yet or there were no changes (e.g. for zeroth rank diffs)
939965
diff = next(
940966
(
941967
d
@@ -944,27 +970,34 @@ def construct_checkpoint(self) -> bool:
944970
),
945971
None,
946972
)
947-
if diff and os.path.exists(diff.abs_path):
973+
974+
if not diff:
975+
# lower rank diff not even in DB yet - create it and try to construct merged file
976+
if item.rank > 0 and FileDiff.can_create_checkpoint(
977+
self.file_path_id, item
978+
):
979+
diff = FileDiff(
980+
basefile=basefile,
981+
version=item.end,
982+
rank=item.rank,
983+
path=basefile.file.generate_diff_name(),
984+
size=None,
985+
checksum=None,
986+
)
987+
db.session.add(diff)
988+
db.session.commit()
989+
else:
990+
# such diff is not expected to exist
991+
continue
992+
993+
diff_exists = diff.construct_checkpoint()
994+
if diff_exists:
948995
diffs_paths.append(diff.abs_path)
949996
else:
950-
individual_diffs = (
951-
FileDiff.query.filter_by(
952-
basefile_id=basefile.id,
953-
rank=0,
954-
)
955-
.filter(
956-
FileDiff.version >= item.start, FileDiff.version <= item.end
957-
)
958-
.order_by(FileDiff.version)
959-
.all()
997+
logging.error(
998+
f"Unable to create checkpoint diff for {item} for file {self.file_path_id}"
960999
)
961-
if individual_diffs:
962-
diffs_paths.extend([i.abs_path for i in individual_diffs])
963-
else:
964-
logging.error(
965-
f"Unable to find diffs for {item} for file {self.file_path_id}"
966-
)
967-
return False
1000+
return False
9681001

9691002
# we apply latest change (if any) on previous version
9701003
end_diff = FileDiff.query.filter_by(
@@ -1186,34 +1219,59 @@ def create_checkpoint(
11861219
cls,
11871220
project_id: str,
11881221
checkpoint: Checkpoint,
1189-
from_deltas: List[ProjectVersionDelta] = [],
11901222
) -> Optional[ProjectVersionDelta]:
11911223
"""
1192-
Creates and caches new checkpoint and any required FileDiff checkpoints.
1193-
Use from_deltas to create checkpoint from existing individual deltas.
1194-
Returns created ProjectVersionDelta object with checkpoint.
1224+
Creates and caches new checkpoint and any required FileDiff checkpoints recursively if needed.
11951225
"""
1196-
delta_range = [
1197-
change
1198-
for change in from_deltas
1199-
if checkpoint.start <= change.version <= checkpoint.end
1200-
]
1226+
delta_range = []
1227+
# our new checkpoint will be created by adding last individual delta to previous checkpoints
1228+
expected_checkpoints = Checkpoint.get_checkpoints(
1229+
checkpoint.start, checkpoint.end - 1
1230+
)
1231+
expected_checkpoints.append(Checkpoint(rank=0, index=checkpoint.end))
1232+
1233+
expected_deltas = (
1234+
ProjectVersionDelta.query.filter(
1235+
ProjectVersionDelta.project_id == project_id,
1236+
tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_(
1237+
[(item.rank, item.end) for item in expected_checkpoints]
1238+
),
1239+
)
1240+
.order_by(ProjectVersionDelta.version)
1241+
.all()
1242+
)
1243+
1244+
existing_delta_map = {(c.rank, c.version): c for c in expected_deltas}
1245+
# make sure we have all components, if not, created them (recursively)
1246+
for item in expected_checkpoints:
1247+
existing_delta = existing_delta_map.get((item.rank, item.end))
1248+
if not existing_delta:
1249+
existing_delta = cls.create_checkpoint(project_id, item)
1250+
1251+
if existing_delta:
1252+
delta_range.append(existing_delta)
1253+
else:
1254+
logging.error(
1255+
f"Missing project delta endpoint for {project_id} v{item.end} rank {item.rank} which could not be recreated"
1256+
)
1257+
return
12011258

12021259
if not delta_range:
12031260
logging.warning(
1204-
f"No individual changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint."
1261+
f"No changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint."
12051262
)
12061263
return None
12071264

12081265
# dump changes lists from database and flatten list for merging
1266+
delta_range = sorted(delta_range, key=lambda x: x.version)
12091267
changes = []
12101268
for delta in delta_range:
12111269
changes.extend(DeltaChangeSchema(many=True).load(delta.changes))
12121270
merged_delta_items: List[DeltaChange] = [
12131271
d.to_data_delta() for d in cls.merge_changes(changes)
12141272
]
12151273

1216-
# Pre-fetch data for all versioned files to create FileDiff checkpoints
1274+
# Pre-fetch data for all versioned files to create FileDiff checkpoints where it makes sense
12171275
versioned_delta_items = [
12181276
item
12191277
for item in merged_delta_items
@@ -1246,17 +1304,17 @@ def create_checkpoint(
12461304
if not base_file:
12471305
continue
12481306

1249-
diff_path = mergin_secure_filename(
1250-
f"{item.path}-diff-{uuid.uuid4()}"
1251-
)
1307+
if not FileDiff.can_create_checkpoint(file_path_id, checkpoint):
1308+
continue
1309+
12521310
checkpoint_diff = FileDiff(
12531311
basefile=base_file,
1254-
path=diff_path,
1312+
path=base_file.file.generate_diff_name(),
12551313
rank=checkpoint.rank,
12561314
version=checkpoint.end,
12571315
)
12581316
# Patch the delta with the path to the new diff checkpoint
1259-
item.diff = diff_path
1317+
item.diff = checkpoint_diff.path
12601318
db.session.add(checkpoint_diff)
12611319

12621320
checkpoint_delta = ProjectVersionDelta(

0 commit comments

Comments
 (0)