Skip to content

Commit b0a0bc2

Browse files
committed
Add migration to look for unreplicated files and replicate them
This is preferable to simply retrying older failed replication background jobs, as it's possible that the objects they correlate to have been deleted or changed since and so those old background jobs would no longer be applicable.
1 parent 3eb5b4e commit b0a0bc2

File tree

2 files changed

+122
-1
lines changed

2 files changed

+122
-1
lines changed

backend/btrixcloud/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
) = object
4545

4646

47-
CURR_DB_VERSION = "0056"
47+
CURR_DB_VERSION = "0057"
4848

4949
MIN_DB_VERSION = 7.0
5050

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""
2+
Migration 0057 - Replicate any unreplicated crawl and profile files
3+
"""
4+
5+
from btrixcloud.migrations import BaseMigration
6+
from btrixcloud.models import BaseCrawl, Profile, BgJobType
7+
8+
9+
MIGRATION_VERSION = "0057"
10+
11+
12+
class Migration(BaseMigration):
13+
"""Migration class."""
14+
15+
# pylint: disable=unused-argument
16+
def __init__(self, mdb, **kwargs):
17+
super().__init__(mdb, migration_version=MIGRATION_VERSION)
18+
19+
self.background_job_ops = kwargs.get("background_job_ops")
20+
21+
# pylint: disable=too-many-locals
22+
async def migrate_up(self):
23+
"""Perform migration up.
24+
25+
Identify files from archived items and profiles that should have been
26+
replicated but weren't, and start new background jobs to re-replicate
27+
the files if there isn't already an in-progress job to do the same.
28+
"""
29+
orgs_mdb = self.mdb["organizations"]
30+
jobs_mdb = self.mdb["jobs"]
31+
crawls_mdb = self.mdb["crawls"]
32+
profiles_mdb = self.mdb["profiles"]
33+
34+
if self.background_job_ops is None:
35+
print(
36+
"Unable to replicate unreplicated files, missing required ops",
37+
flush=True,
38+
)
39+
return
40+
41+
# Future-proof in anticipation of custom storage - do not attempt to
42+
# replicate files for orgs that don't have a replica location configured
43+
orgs_with_replicas = []
44+
async for org in orgs_mdb.find(
45+
{"storageReplicas.0": {"$exists": True}}, projection=["_id"]
46+
):
47+
orgs_with_replicas.append(org["_id"])
48+
49+
# Archived items
50+
51+
crawls_match_query = {
52+
"oid": {"$in": orgs_with_replicas},
53+
"files": {"$elemMatch": {"replicas": {"$in": [None, []]}}},
54+
}
55+
async for crawl_raw in crawls_mdb.find(crawls_match_query):
56+
crawl = BaseCrawl.from_dict(crawl_raw)
57+
for file_ in crawl.files:
58+
if not file_.replicas:
59+
# Check that there isn't an in-progress job for this file
60+
matching_job = await jobs_mdb.find(
61+
{
62+
"type": BgJobType.CREATE_REPLICA.value,
63+
"object_id": crawl.id,
64+
"object_type": crawl.type,
65+
"file_path": file_.filename,
66+
"started": {"$ne": None},
67+
"finished": None,
68+
"success": None,
69+
}
70+
)
71+
if matching_job:
72+
continue
73+
74+
try:
75+
await self.background_job_ops.create_replica_jobs(
76+
crawl.oid, file_, crawl.id, crawl.type
77+
)
78+
# pylint: disable=broad-exception-caught
79+
except Exception as err:
80+
print(
81+
f"Error replicating unreplicated file for item {crawl.id}: {err}",
82+
flush=True,
83+
)
84+
85+
# Profiles
86+
87+
profiles_match_query = {
88+
"oid": {"$in": orgs_with_replicas},
89+
"resource.replicas": {"$in": [None, []]},
90+
}
91+
async for profile_raw in profiles_mdb.find(profiles_match_query):
92+
profile = Profile.from_dict(profile_raw)
93+
94+
if not profile.resource:
95+
continue
96+
97+
# Check there isn't already an in-progress job for this profile
98+
matching_job = await jobs_mdb.find(
99+
{
100+
"type": BgJobType.CREATE_REPLICA.value,
101+
"object_id": profile.id,
102+
"object_type": "profile",
103+
"file_path": profile.resource.filename,
104+
"started": {"$ne": None},
105+
"finished": None,
106+
"success": None,
107+
}
108+
)
109+
if matching_job:
110+
continue
111+
112+
try:
113+
await self.background_job_ops.create_replica_jobs(
114+
profile.oid, profile.resource, profile.id, "profile"
115+
)
116+
# pylint: disable=broad-exception-caught
117+
except Exception as err:
118+
print(
119+
f"Error replicating unreplicated file for profile {profile.id}: {err}",
120+
flush=True,
121+
)

0 commit comments

Comments
 (0)