-
Notifications
You must be signed in to change notification settings - Fork 3.8k
chore: add sync jobs for issue_version and issue_description_version tables #6199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+421
−0
Merged
Changes from 5 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
f0bb5c1
chore: added fields in issue_version and profile tables and created a…
b308698
chore: removed point in issue version
c6f5c9f
chore: add imports in init
1e77d1d
chore: added sync jobs for issue_version and issue_description_version
gurusainath 3d8da7b
chore: removed logs
gurusainath cdc529e
Merge branch 'preview' of gurusainath:makeplane/plane into chore_issu…
gurusainath 0c6d2e2
chore: updated logginh
gurusainath File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
122 changes: 122 additions & 0 deletions
122
apiserver/plane/bgtasks/issue_description_version_sync.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # Python imports | ||
| from typing import Optional | ||
|
|
||
| # Django imports | ||
| from django.utils import timezone | ||
| from django.db import transaction | ||
|
|
||
| # Third party imports | ||
| from celery import shared_task | ||
|
|
||
| # Module imports | ||
| from plane.db.models import Issue, IssueDescriptionVersion, ProjectMember | ||
| from plane.utils.exception_logger import log_exception | ||
|
|
||
|
|
||
| def get_owner_id(issue: Issue) -> Optional[int]: | ||
| """Get the owner ID of the issue""" | ||
|
|
||
| if issue.updated_by_id: | ||
| return issue.updated_by_id | ||
|
|
||
| if issue.created_by_id: | ||
| return issue.created_by_id | ||
|
|
||
| # Find project admin as fallback | ||
| project_member = ProjectMember.objects.filter( | ||
| project_id=issue.project_id, | ||
| role=20, # Admin role | ||
| ).first() | ||
|
|
||
| return project_member.member_id if project_member else None | ||
|
|
||
|
|
||
| @shared_task | ||
| def sync_issue_description_version(batch_size=5000, offset=0, countdown=300): | ||
| """Task to create IssueDescriptionVersion records for existing Issues in batches""" | ||
| try: | ||
| with transaction.atomic(): | ||
| base_query = Issue.objects | ||
| total_issues_count = base_query.count() | ||
|
|
||
| if total_issues_count == 0: | ||
| return | ||
|
|
||
| # Calculate batch range | ||
| end_offset = min(offset + batch_size, total_issues_count) | ||
|
|
||
| # Fetch issues with related data | ||
| issues_batch = ( | ||
| base_query.order_by("created_at") | ||
| .select_related("workspace", "project") | ||
| .only( | ||
| "id", | ||
| "workspace_id", | ||
| "project_id", | ||
| "created_by_id", | ||
| "updated_by_id", | ||
| "description_binary", | ||
| "description_html", | ||
| "description_stripped", | ||
| "description", | ||
| )[offset:end_offset] | ||
| ) | ||
|
|
||
| if not issues_batch: | ||
| return | ||
|
|
||
| version_objects = [] | ||
| for issue in issues_batch: | ||
| # Validate required fields | ||
| if not issue.workspace_id or not issue.project_id: | ||
| print(f"Skipping {issue.id} - missing workspace_id or project_id") | ||
| continue | ||
|
|
||
| # Determine owned_by_id | ||
| owned_by_id = get_owner_id(issue) | ||
| if owned_by_id is None: | ||
| print(f"Skipping issue {issue.id} - missing owned_by") | ||
| continue | ||
|
|
||
| # Create version object | ||
| version_objects.append( | ||
| IssueDescriptionVersion( | ||
| workspace_id=issue.workspace_id, | ||
| project_id=issue.project_id, | ||
| created_by_id=issue.created_by_id, | ||
| updated_by_id=issue.updated_by_id, | ||
| owned_by_id=owned_by_id, | ||
| last_saved_at=timezone.now(), | ||
| issue_id=issue.id, | ||
| description_binary=issue.description_binary, | ||
| description_html=issue.description_html, | ||
| description_stripped=issue.description_stripped, | ||
| description_json=issue.description, | ||
| ) | ||
| ) | ||
|
|
||
| # Bulk create version objects | ||
| if version_objects: | ||
| IssueDescriptionVersion.objects.bulk_create(version_objects) | ||
|
|
||
| # Schedule next batch if needed | ||
| if end_offset < total_issues_count: | ||
| sync_issue_description_version.apply_async( | ||
| kwargs={ | ||
| "batch_size": batch_size, | ||
| "offset": end_offset, | ||
| "countdown": countdown, | ||
| }, | ||
| countdown=countdown, | ||
| ) | ||
| return | ||
| except Exception as e: | ||
| log_exception(e) | ||
| return | ||
|
|
||
|
|
||
| @shared_task | ||
| def schedule_issue_description_version(batch_size=5000, countdown=300): | ||
| sync_issue_description_version.delay( | ||
| batch_size=int(batch_size), countdown=countdown | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,251 @@ | ||
| # Python imports | ||
| import json | ||
| from typing import Optional, List, Dict | ||
| from uuid import UUID | ||
| from itertools import groupby | ||
|
|
||
| # Django imports | ||
| from django.utils import timezone | ||
| from django.db import transaction | ||
|
|
||
| # Third party imports | ||
| from celery import shared_task | ||
|
|
||
| # Module imports | ||
| from plane.db.models import ( | ||
| Issue, | ||
| IssueVersion, | ||
| ProjectMember, | ||
| CycleIssue, | ||
| ModuleIssue, | ||
| IssueActivity, | ||
| IssueAssignee, | ||
| IssueLabel, | ||
| ) | ||
| from plane.utils.exception_logger import log_exception | ||
|
|
||
|
|
||
| @shared_task | ||
| def issue_task(updated_issue, issue_id, user_id): | ||
| try: | ||
| current_issue = json.loads(updated_issue) if updated_issue else {} | ||
| issue = Issue.objects.get(id=issue_id) | ||
|
|
||
| updated_current_issue = {} | ||
| for key, value in current_issue.items(): | ||
| if getattr(issue, key) != value: | ||
| updated_current_issue[key] = value | ||
|
|
||
| if updated_current_issue: | ||
| issue_version = ( | ||
| IssueVersion.objects.filter(issue_id=issue_id) | ||
| .order_by("-last_saved_at") | ||
| .first() | ||
| ) | ||
|
|
||
| if ( | ||
| issue_version | ||
| and str(issue_version.owned_by) == str(user_id) | ||
| and (timezone.now() - issue_version.last_saved_at).total_seconds() | ||
| <= 600 | ||
| ): | ||
| for key, value in updated_current_issue.items(): | ||
| setattr(issue_version, key, value) | ||
| issue_version.last_saved_at = timezone.now() | ||
| issue_version.save( | ||
| update_fields=list(updated_current_issue.keys()) + ["last_saved_at"] | ||
| ) | ||
| else: | ||
| IssueVersion.log_issue_version(issue, user_id) | ||
|
|
||
| return | ||
| except Issue.DoesNotExist: | ||
| return | ||
| except Exception as e: | ||
| log_exception(e) | ||
| return | ||
|
|
||
|
|
||
| def get_owner_id(issue: Issue) -> Optional[int]: | ||
| """Get the owner ID of the issue""" | ||
|
|
||
| if issue.updated_by_id: | ||
| return issue.updated_by_id | ||
|
|
||
| if issue.created_by_id: | ||
| return issue.created_by_id | ||
|
|
||
| # Find project admin as fallback | ||
| project_member = ProjectMember.objects.filter( | ||
| project_id=issue.project_id, | ||
| role=20, # Admin role | ||
| ).first() | ||
|
|
||
| return project_member.member_id if project_member else None | ||
|
|
||
|
|
||
| def get_related_data(issue_ids: List[UUID]) -> Dict: | ||
| """Get related data for the given issue IDs""" | ||
|
|
||
| cycle_issues = { | ||
| ci.issue_id: ci.cycle_id | ||
| for ci in CycleIssue.objects.filter(issue_id__in=issue_ids) | ||
| } | ||
|
|
||
| # Get assignees with proper grouping | ||
| assignee_records = list( | ||
| IssueAssignee.objects.filter(issue_id__in=issue_ids) | ||
| .values_list("issue_id", "assignee_id") | ||
| .order_by("issue_id") | ||
| ) | ||
| assignees = {} | ||
| for issue_id, group in groupby(assignee_records, key=lambda x: x[0]): | ||
| assignees[issue_id] = [str(g[1]) for g in group] | ||
|
|
||
| # Get labels with proper grouping | ||
| label_records = list( | ||
| IssueLabel.objects.filter(issue_id__in=issue_ids) | ||
| .values_list("issue_id", "label_id") | ||
| .order_by("issue_id") | ||
| ) | ||
| labels = {} | ||
| for issue_id, group in groupby(label_records, key=lambda x: x[0]): | ||
| labels[issue_id] = [str(g[1]) for g in group] | ||
|
|
||
| # Get modules with proper grouping | ||
| module_records = list( | ||
| ModuleIssue.objects.filter(issue_id__in=issue_ids) | ||
| .values_list("issue_id", "module_id") | ||
| .order_by("issue_id") | ||
| ) | ||
| modules = {} | ||
| for issue_id, group in groupby(module_records, key=lambda x: x[0]): | ||
| modules[issue_id] = [str(g[1]) for g in group] | ||
|
|
||
| # Get latest activities | ||
| latest_activities = {} | ||
| activities = IssueActivity.objects.filter(issue_id__in=issue_ids).order_by( | ||
| "issue_id", "-created_at" | ||
| ) | ||
| for issue_id, activities_group in groupby(activities, key=lambda x: x.issue_id): | ||
| first_activity = next(activities_group, None) | ||
| if first_activity: | ||
| latest_activities[issue_id] = first_activity.id | ||
|
|
||
| return { | ||
| "cycle_issues": cycle_issues, | ||
| "assignees": assignees, | ||
| "labels": labels, | ||
| "modules": modules, | ||
| "activities": latest_activities, | ||
| } | ||
|
|
||
|
|
||
| def create_issue_version(issue: Issue, related_data: Dict) -> Optional[IssueVersion]: | ||
| """Create IssueVersion object from the given issue and related data""" | ||
|
|
||
| try: | ||
| if not issue.workspace_id or not issue.project_id: | ||
| print(f"Skipping issue {issue.id} - missing workspace_id or project_id") | ||
| return None | ||
gurusainath marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| owned_by_id = get_owner_id(issue) | ||
| if owned_by_id is None: | ||
| print(f"Skipping issue {issue.id} - missing owned_by") | ||
| return None | ||
|
|
||
| return IssueVersion( | ||
| workspace_id=issue.workspace_id, | ||
| project_id=issue.project_id, | ||
| created_by_id=issue.created_by_id, | ||
| updated_by_id=issue.updated_by_id, | ||
| owned_by_id=owned_by_id, | ||
| last_saved_at=timezone.now(), | ||
| activity_id=related_data["activities"].get(issue.id), | ||
| properties=getattr(issue, "properties", {}), | ||
| meta=getattr(issue, "meta", {}), | ||
| issue_id=issue.id, | ||
| parent=issue.parent_id, | ||
| state=issue.state_id, | ||
| estimate_point=issue.estimate_point_id, | ||
| name=issue.name, | ||
| priority=issue.priority, | ||
| start_date=issue.start_date, | ||
| target_date=issue.target_date, | ||
| assignees=related_data["assignees"].get(issue.id, []), | ||
| sequence_id=issue.sequence_id, | ||
| labels=related_data["labels"].get(issue.id, []), | ||
| sort_order=issue.sort_order, | ||
| completed_at=issue.completed_at, | ||
| archived_at=issue.archived_at, | ||
| is_draft=issue.is_draft, | ||
| external_source=issue.external_source, | ||
| external_id=issue.external_id, | ||
| type=issue.type_id, | ||
| cycle=related_data["cycle_issues"].get(issue.id), | ||
| modules=related_data["modules"].get(issue.id, []), | ||
| ) | ||
| except Exception as e: | ||
| log_exception(e) | ||
| return None | ||
|
|
||
|
|
||
| @shared_task | ||
| def sync_issue_version(batch_size=5000, offset=0, countdown=300): | ||
| """Task to create IssueVersion records for existing Issues in batches""" | ||
|
|
||
| try: | ||
| with transaction.atomic(): | ||
| base_query = Issue.objects | ||
| total_issues_count = base_query.count() | ||
|
|
||
| if total_issues_count == 0: | ||
| return | ||
|
|
||
| end_offset = min(offset + batch_size, total_issues_count) | ||
|
|
||
| # Get issues batch with optimized queries | ||
| issues_batch = list( | ||
| base_query.order_by("created_at") | ||
| .select_related("workspace", "project") | ||
| .all()[offset:end_offset] | ||
| ) | ||
|
|
||
| if not issues_batch: | ||
| return | ||
|
|
||
| # Get all related data in bulk | ||
| issue_ids = [issue.id for issue in issues_batch] | ||
| related_data = get_related_data(issue_ids) | ||
|
|
||
| issue_versions = [] | ||
| for issue in issues_batch: | ||
| version = create_issue_version(issue, related_data) | ||
| if version: | ||
| issue_versions.append(version) | ||
|
|
||
| # Bulk create versions | ||
| if issue_versions: | ||
| IssueVersion.objects.bulk_create(issue_versions, batch_size=1000) | ||
|
|
||
| # Schedule the next batch if there are more workspaces to process | ||
| if end_offset < total_issues_count: | ||
| sync_issue_version.apply_async( | ||
| kwargs={ | ||
| "batch_size": batch_size, | ||
| "offset": end_offset, | ||
| "countdown": countdown, | ||
| }, | ||
| countdown=countdown, | ||
| ) | ||
|
|
||
| print(f"Processed Issues: {end_offset}") | ||
gurusainath marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return | ||
| except Exception as e: | ||
| log_exception(e) | ||
| return | ||
|
|
||
|
|
||
| @shared_task | ||
| def schedule_issue_version(batch_size=5000, countdown=300): | ||
| sync_issue_version.delay(batch_size=int(batch_size), countdown=countdown) | ||
23 changes: 23 additions & 0 deletions
23
apiserver/plane/db/management/commands/sync_issue_description_version.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| # Django imports | ||
| from django.core.management.base import BaseCommand | ||
|
|
||
| # Module imports | ||
| from plane.bgtasks.issue_description_version_sync import ( | ||
| schedule_issue_description_version, | ||
| ) | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Creates IssueDescriptionVersion records for existing Issues in batches" | ||
|
|
||
| def handle(self, *args, **options): | ||
| batch_size = input("Enter the batch size: ") | ||
| batch_countdown = input("Enter the batch countdown: ") | ||
|
|
||
| schedule_issue_description_version.delay( | ||
| batch_size=batch_size, countdown=int(batch_countdown) | ||
| ) | ||
|
|
||
| self.stdout.write( | ||
| self.style.SUCCESS("Successfully created issue description version task") | ||
| ) | ||
gurusainath marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.