Skip to content

Commit df6a80e

Browse files
gurusainathsainath
andauthored
chore: add sync jobs for issue_version and issue_description_version tables (#6199)
* chore: added fields in issue_version and profile tables and created a new sticky table * chore: removed point in issue version * chore: add imports in init * chore: added sync jobs for issue_version and issue_description_version * chore: removed logs * chore: updated logginh --------- Co-authored-by: sainath <sainath@sainaths-MacBook-Pro.local>
1 parent 6ff258c commit df6a80e

File tree

4 files changed

+421
-0
lines changed

4 files changed

+421
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Python imports
2+
from typing import Optional
3+
import logging
4+
5+
# Django imports
6+
from django.utils import timezone
7+
from django.db import transaction
8+
9+
# Third party imports
10+
from celery import shared_task
11+
12+
# Module imports
13+
from plane.db.models import Issue, IssueDescriptionVersion, ProjectMember
14+
from plane.utils.exception_logger import log_exception
15+
16+
17+
def get_owner_id(issue: Issue) -> Optional[int]:
18+
"""Get the owner ID of the issue"""
19+
20+
if issue.updated_by_id:
21+
return issue.updated_by_id
22+
23+
if issue.created_by_id:
24+
return issue.created_by_id
25+
26+
# Find project admin as fallback
27+
project_member = ProjectMember.objects.filter(
28+
project_id=issue.project_id,
29+
role=20, # Admin role
30+
).first()
31+
32+
return project_member.member_id if project_member else None
33+
34+
35+
@shared_task
36+
def sync_issue_description_version(batch_size=5000, offset=0, countdown=300):
37+
"""Task to create IssueDescriptionVersion records for existing Issues in batches"""
38+
try:
39+
with transaction.atomic():
40+
base_query = Issue.objects
41+
total_issues_count = base_query.count()
42+
43+
if total_issues_count == 0:
44+
return
45+
46+
# Calculate batch range
47+
end_offset = min(offset + batch_size, total_issues_count)
48+
49+
# Fetch issues with related data
50+
issues_batch = (
51+
base_query.order_by("created_at")
52+
.select_related("workspace", "project")
53+
.only(
54+
"id",
55+
"workspace_id",
56+
"project_id",
57+
"created_by_id",
58+
"updated_by_id",
59+
"description_binary",
60+
"description_html",
61+
"description_stripped",
62+
"description",
63+
)[offset:end_offset]
64+
)
65+
66+
if not issues_batch:
67+
return
68+
69+
version_objects = []
70+
for issue in issues_batch:
71+
# Validate required fields
72+
if not issue.workspace_id or not issue.project_id:
73+
logging.warning(
74+
f"Skipping {issue.id} - missing workspace_id or project_id"
75+
)
76+
continue
77+
78+
# Determine owned_by_id
79+
owned_by_id = get_owner_id(issue)
80+
if owned_by_id is None:
81+
logging.warning(f"Skipping issue {issue.id} - missing owned_by")
82+
continue
83+
84+
# Create version object
85+
version_objects.append(
86+
IssueDescriptionVersion(
87+
workspace_id=issue.workspace_id,
88+
project_id=issue.project_id,
89+
created_by_id=issue.created_by_id,
90+
updated_by_id=issue.updated_by_id,
91+
owned_by_id=owned_by_id,
92+
last_saved_at=timezone.now(),
93+
issue_id=issue.id,
94+
description_binary=issue.description_binary,
95+
description_html=issue.description_html,
96+
description_stripped=issue.description_stripped,
97+
description_json=issue.description,
98+
)
99+
)
100+
101+
# Bulk create version objects
102+
if version_objects:
103+
IssueDescriptionVersion.objects.bulk_create(version_objects)
104+
105+
# Schedule next batch if needed
106+
if end_offset < total_issues_count:
107+
sync_issue_description_version.apply_async(
108+
kwargs={
109+
"batch_size": batch_size,
110+
"offset": end_offset,
111+
"countdown": countdown,
112+
},
113+
countdown=countdown,
114+
)
115+
return
116+
except Exception as e:
117+
log_exception(e)
118+
return
119+
120+
121+
@shared_task
122+
def schedule_issue_description_version(batch_size=5000, countdown=300):
123+
sync_issue_description_version.delay(
124+
batch_size=int(batch_size), countdown=countdown
125+
)
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
# Python imports
2+
import json
3+
from typing import Optional, List, Dict
4+
from uuid import UUID
5+
from itertools import groupby
6+
import logging
7+
8+
# Django imports
9+
from django.utils import timezone
10+
from django.db import transaction
11+
12+
# Third party imports
13+
from celery import shared_task
14+
15+
# Module imports
16+
from plane.db.models import (
17+
Issue,
18+
IssueVersion,
19+
ProjectMember,
20+
CycleIssue,
21+
ModuleIssue,
22+
IssueActivity,
23+
IssueAssignee,
24+
IssueLabel,
25+
)
26+
from plane.utils.exception_logger import log_exception
27+
28+
29+
@shared_task
30+
def issue_task(updated_issue, issue_id, user_id):
31+
try:
32+
current_issue = json.loads(updated_issue) if updated_issue else {}
33+
issue = Issue.objects.get(id=issue_id)
34+
35+
updated_current_issue = {}
36+
for key, value in current_issue.items():
37+
if getattr(issue, key) != value:
38+
updated_current_issue[key] = value
39+
40+
if updated_current_issue:
41+
issue_version = (
42+
IssueVersion.objects.filter(issue_id=issue_id)
43+
.order_by("-last_saved_at")
44+
.first()
45+
)
46+
47+
if (
48+
issue_version
49+
and str(issue_version.owned_by) == str(user_id)
50+
and (timezone.now() - issue_version.last_saved_at).total_seconds()
51+
<= 600
52+
):
53+
for key, value in updated_current_issue.items():
54+
setattr(issue_version, key, value)
55+
issue_version.last_saved_at = timezone.now()
56+
issue_version.save(
57+
update_fields=list(updated_current_issue.keys()) + ["last_saved_at"]
58+
)
59+
else:
60+
IssueVersion.log_issue_version(issue, user_id)
61+
62+
return
63+
except Issue.DoesNotExist:
64+
return
65+
except Exception as e:
66+
log_exception(e)
67+
return
68+
69+
70+
def get_owner_id(issue: Issue) -> Optional[int]:
71+
"""Get the owner ID of the issue"""
72+
73+
if issue.updated_by_id:
74+
return issue.updated_by_id
75+
76+
if issue.created_by_id:
77+
return issue.created_by_id
78+
79+
# Find project admin as fallback
80+
project_member = ProjectMember.objects.filter(
81+
project_id=issue.project_id,
82+
role=20, # Admin role
83+
).first()
84+
85+
return project_member.member_id if project_member else None
86+
87+
88+
def get_related_data(issue_ids: List[UUID]) -> Dict:
89+
"""Get related data for the given issue IDs"""
90+
91+
cycle_issues = {
92+
ci.issue_id: ci.cycle_id
93+
for ci in CycleIssue.objects.filter(issue_id__in=issue_ids)
94+
}
95+
96+
# Get assignees with proper grouping
97+
assignee_records = list(
98+
IssueAssignee.objects.filter(issue_id__in=issue_ids)
99+
.values_list("issue_id", "assignee_id")
100+
.order_by("issue_id")
101+
)
102+
assignees = {}
103+
for issue_id, group in groupby(assignee_records, key=lambda x: x[0]):
104+
assignees[issue_id] = [str(g[1]) for g in group]
105+
106+
# Get labels with proper grouping
107+
label_records = list(
108+
IssueLabel.objects.filter(issue_id__in=issue_ids)
109+
.values_list("issue_id", "label_id")
110+
.order_by("issue_id")
111+
)
112+
labels = {}
113+
for issue_id, group in groupby(label_records, key=lambda x: x[0]):
114+
labels[issue_id] = [str(g[1]) for g in group]
115+
116+
# Get modules with proper grouping
117+
module_records = list(
118+
ModuleIssue.objects.filter(issue_id__in=issue_ids)
119+
.values_list("issue_id", "module_id")
120+
.order_by("issue_id")
121+
)
122+
modules = {}
123+
for issue_id, group in groupby(module_records, key=lambda x: x[0]):
124+
modules[issue_id] = [str(g[1]) for g in group]
125+
126+
# Get latest activities
127+
latest_activities = {}
128+
activities = IssueActivity.objects.filter(issue_id__in=issue_ids).order_by(
129+
"issue_id", "-created_at"
130+
)
131+
for issue_id, activities_group in groupby(activities, key=lambda x: x.issue_id):
132+
first_activity = next(activities_group, None)
133+
if first_activity:
134+
latest_activities[issue_id] = first_activity.id
135+
136+
return {
137+
"cycle_issues": cycle_issues,
138+
"assignees": assignees,
139+
"labels": labels,
140+
"modules": modules,
141+
"activities": latest_activities,
142+
}
143+
144+
145+
def create_issue_version(issue: Issue, related_data: Dict) -> Optional[IssueVersion]:
146+
"""Create IssueVersion object from the given issue and related data"""
147+
148+
try:
149+
if not issue.workspace_id or not issue.project_id:
150+
logging.warning(
151+
f"Skipping issue {issue.id} - missing workspace_id or project_id"
152+
)
153+
return None
154+
155+
owned_by_id = get_owner_id(issue)
156+
if owned_by_id is None:
157+
logging.warning(f"Skipping issue {issue.id} - missing owned_by")
158+
return None
159+
160+
return IssueVersion(
161+
workspace_id=issue.workspace_id,
162+
project_id=issue.project_id,
163+
created_by_id=issue.created_by_id,
164+
updated_by_id=issue.updated_by_id,
165+
owned_by_id=owned_by_id,
166+
last_saved_at=timezone.now(),
167+
activity_id=related_data["activities"].get(issue.id),
168+
properties=getattr(issue, "properties", {}),
169+
meta=getattr(issue, "meta", {}),
170+
issue_id=issue.id,
171+
parent=issue.parent_id,
172+
state=issue.state_id,
173+
estimate_point=issue.estimate_point_id,
174+
name=issue.name,
175+
priority=issue.priority,
176+
start_date=issue.start_date,
177+
target_date=issue.target_date,
178+
assignees=related_data["assignees"].get(issue.id, []),
179+
sequence_id=issue.sequence_id,
180+
labels=related_data["labels"].get(issue.id, []),
181+
sort_order=issue.sort_order,
182+
completed_at=issue.completed_at,
183+
archived_at=issue.archived_at,
184+
is_draft=issue.is_draft,
185+
external_source=issue.external_source,
186+
external_id=issue.external_id,
187+
type=issue.type_id,
188+
cycle=related_data["cycle_issues"].get(issue.id),
189+
modules=related_data["modules"].get(issue.id, []),
190+
)
191+
except Exception as e:
192+
log_exception(e)
193+
return None
194+
195+
196+
@shared_task
197+
def sync_issue_version(batch_size=5000, offset=0, countdown=300):
198+
"""Task to create IssueVersion records for existing Issues in batches"""
199+
200+
try:
201+
with transaction.atomic():
202+
base_query = Issue.objects
203+
total_issues_count = base_query.count()
204+
205+
if total_issues_count == 0:
206+
return
207+
208+
end_offset = min(offset + batch_size, total_issues_count)
209+
210+
# Get issues batch with optimized queries
211+
issues_batch = list(
212+
base_query.order_by("created_at")
213+
.select_related("workspace", "project")
214+
.all()[offset:end_offset]
215+
)
216+
217+
if not issues_batch:
218+
return
219+
220+
# Get all related data in bulk
221+
issue_ids = [issue.id for issue in issues_batch]
222+
related_data = get_related_data(issue_ids)
223+
224+
issue_versions = []
225+
for issue in issues_batch:
226+
version = create_issue_version(issue, related_data)
227+
if version:
228+
issue_versions.append(version)
229+
230+
# Bulk create versions
231+
if issue_versions:
232+
IssueVersion.objects.bulk_create(issue_versions, batch_size=1000)
233+
234+
# Schedule the next batch if there are more workspaces to process
235+
if end_offset < total_issues_count:
236+
sync_issue_version.apply_async(
237+
kwargs={
238+
"batch_size": batch_size,
239+
"offset": end_offset,
240+
"countdown": countdown,
241+
},
242+
countdown=countdown,
243+
)
244+
245+
logging.info(f"Processed Issues: {end_offset}")
246+
return
247+
except Exception as e:
248+
log_exception(e)
249+
return
250+
251+
252+
@shared_task
253+
def schedule_issue_version(batch_size=5000, countdown=300):
254+
sync_issue_version.delay(batch_size=int(batch_size), countdown=countdown)

0 commit comments

Comments
 (0)