Skip to content

Conversation

@karthikrab
Copy link
Contributor

No description provided.

  - Clean up dapr workflow data

   Generated with [Claude Code](https://claude.ai/code)

  Co-Authored-By: Claude <[email protected]>
@karthikrab karthikrab requested a review from dittops October 23, 2025 19:24
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @karthikrab, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a Dapr workflow archival feature designed to manage the growth of the Dapr state store. It establishes a weekly scheduled cleanup process that automatically purges old, completed, or failed workflow instances based on configurable retention policies. Additionally, it provides new API endpoints for administrators to inspect workflows eligible for cleanup and to manually initiate cleanup operations with options for dry runs and batch processing.

Highlights

  • Automated Workflow Cleanup: Implemented a scheduled task to automatically purge old completed or failed Dapr workflows from the state store, preventing unbounded storage growth.
  • Configurable Retention: Introduced new configuration settings (WORKFLOW_RETENTION_DAYS, WORKFLOW_CLEANUP_BATCH_SIZE) to control how long workflows are retained and the batch size for cleanup operations.
  • Manual Cleanup APIs: Added new API endpoints (/cleanup/old-workflows and /cleanup/trigger) to list workflows eligible for cleanup and to manually trigger the cleanup process, including a dry-run option.
  • Dedicated Cleanup Module: A new Python module (workflow_ops/cleanup.py) was created to encapsulate the core logic for identifying and purging Dapr workflow states.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable feature for cleaning up old Dapr workflows, which is crucial for managing storage growth. The implementation includes configuration, a background scheduler, and new API endpoints for manual control. The code is generally well-structured.

However, there are several significant performance issues in how old workflows are queried from the database. The current implementation fetches large amounts of data into memory for filtering, which should be done at the database level. I've provided suggestions to optimize these queries. Additionally, there are opportunities to improve code organization and simplify some of the scheduling logic for better maintainability.

Comment on lines +45 to +93
async def get_old_workflows(
session,
cutoff_date: datetime,
batch_size: int = 100,
) -> List[Workflow]:
"""Get workflows older than cutoff date that are in terminal states.
Args:
session: Database session
cutoff_date: Only return workflows updated before this date
batch_size: Maximum number of workflows to return
Returns:
List[Workflow]: List of workflows eligible for purging
"""
# Query workflows in COMPLETED or FAILED state
# We need to check both statuses separately since filters doesn't support list values
workflows_completed, count_completed = await WorkflowDataManager(session).get_all_workflows(
offset=0,
limit=batch_size,
filters={"status": WorkflowStatusEnum.COMPLETED},
)

workflows_failed, count_failed = await WorkflowDataManager(session).get_all_workflows(
offset=0,
limit=batch_size,
filters={"status": WorkflowStatusEnum.FAILED},
)

# Combine results
all_workflows = workflows_completed + workflows_failed

# Filter by modified_at to get only old workflows
old_workflows = [w for w in all_workflows if w.modified_at and w.modified_at < cutoff_date]

# Sort by modified_at and limit to batch_size
old_workflows = sorted(old_workflows, key=lambda w: w.modified_at or datetime.min.replace(tzinfo=UTC))
old_workflows = old_workflows[:batch_size]

logger.debug(
"Found %s workflows in terminal states (completed=%s, failed=%s), %s older than %s",
count_completed + count_failed,
count_completed,
count_failed,
len(old_workflows),
cutoff_date,
)

return old_workflows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method is inefficient as it fetches workflows from the database and then filters them by date in memory. This can lead to fetching many unnecessary records and poor performance. The filtering by status and date, as well as sorting and limiting, should all be done at the database level.

The two separate queries for COMPLETED and FAILED statuses can also be combined into a single, more efficient query.

    async def get_old_workflows(
        session,
        cutoff_date: datetime,
        batch_size: int = 100,
    ) -> List[Workflow]:
        """Get workflows older than cutoff date that are in terminal states.

        Args:
            session: Database session
            cutoff_date: Only return workflows updated before this date
            batch_size: Maximum number of workflows to return

        Returns:
            List[Workflow]: List of workflows eligible for purging
        """
        from sqlalchemy import or_, select

        # This query is more efficient as it filters and limits in the database.
        stmt = (
            select(Workflow)
            .where(
                or_(Workflow.status == WorkflowStatusEnum.COMPLETED, Workflow.status == WorkflowStatusEnum.FAILED),
                Workflow.modified_at < cutoff_date,
            )
            .order_by(Workflow.modified_at.asc())
            .limit(batch_size)
        )

        # Using a direct SQLAlchemy query is more performant here.
        old_workflows = session.scalars(stmt).all()

        logger.debug(
            "Found %d workflows older than %s to be cleaned up in this batch.",
            len(old_workflows),
            cutoff_date,
        )

        return old_workflows

Comment on lines +727 to +792
async def list_old_workflows(
self,
retention_days: int,
page: int = 1,
limit: int = 50,
) -> Tuple[List[Dict[str, Any]], int]:
"""List workflows older than retention period.
Args:
retention_days: List workflows older than this many days
page: Page number for pagination
limit: Number of items per page
Returns:
Tuple of (workflows list, total count)
"""
from datetime import UTC, datetime, timedelta

cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
offset = (page - 1) * limit

# Get workflows in terminal states
workflows_completed, count_completed = await WorkflowDataManager(self.session).get_all_workflows(
offset=0,
limit=10000, # Get all to filter by date
filters={"status": WorkflowStatusEnum.COMPLETED},
)

workflows_failed, count_failed = await WorkflowDataManager(self.session).get_all_workflows(
offset=0,
limit=10000,
filters={"status": WorkflowStatusEnum.FAILED},
)

# Combine and filter by date
all_workflows = workflows_completed + workflows_failed
old_workflows = [w for w in all_workflows if w.modified_at and w.modified_at < cutoff_date]

# Sort by modified_at (oldest first)
old_workflows = sorted(old_workflows, key=lambda w: w.modified_at or datetime.min.replace(tzinfo=UTC))

# Calculate age in days for each workflow
now = datetime.now(UTC)
workflow_items = []
for workflow in old_workflows:
age_days = (now - workflow.modified_at).days if workflow.modified_at else 0
workflow_items.append(
{
"id": workflow.id,
"workflow_type": workflow.workflow_type,
"title": workflow.title,
"status": workflow.status,
"current_step": workflow.current_step,
"total_steps": workflow.total_steps,
"created_at": workflow.created_at,
"modified_at": workflow.modified_at,
"reason": workflow.reason,
"age_days": age_days,
}
)

# Apply pagination
total_count = len(workflow_items)
paginated_items = workflow_items[offset : offset + limit]

return paginated_items, total_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method is highly inefficient. It fetches up to 20,000 workflow records from the database (10,000 completed and 10,000 failed) into memory, and then performs filtering, sorting, and pagination in Python. This can lead to significant memory usage and slow performance, especially as the number of workflows grows. The hardcoded limit of 10,000 is also a major concern.

All these operations (filtering by date, status, sorting, and pagination) should be performed at the database level for better performance.

    async def list_old_workflows(
        self,
        retention_days: int,
        page: int = 1,
        limit: int = 50,
    ) -> Tuple[List[Dict[str, Any]], int]:
        """List workflows older than retention period.

        Args:
            retention_days: List workflows older than this many days
            page: Page number for pagination
            limit: Number of items per page

        Returns:
            Tuple of (workflows list, total count)
        """
        from datetime import UTC, datetime, timedelta
        from sqlalchemy import func, or_, select

        cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
        offset = (page - 1) * limit

        # Base query for old workflows in terminal states
        base_query = select(WorkflowModel).where(
            or_(
                WorkflowModel.status == WorkflowStatusEnum.COMPLETED,
                WorkflowModel.status == WorkflowStatusEnum.FAILED,
            ),
            WorkflowModel.modified_at < cutoff_date,
        )

        # Get total count for pagination
        count_stmt = select(func.count()).select_from(base_query.subquery())
        total_count = self.session.execute(count_stmt).scalar_one() or 0

        # Get paginated results
        stmt = base_query.order_by(WorkflowModel.modified_at.asc()).offset(offset).limit(limit)
        old_workflows = self.session.scalars(stmt).all()

        # Calculate age in days for each workflow
        now = datetime.now(UTC)
        workflow_items = []
        for workflow in old_workflows:
            age_days = (now - workflow.modified_at).days if workflow.modified_at else 0
            workflow_items.append(
                {
                    "id": workflow.id,
                    "workflow_type": workflow.workflow_type,
                    "title": workflow.title,
                    "status": workflow.status,
                    "current_step": workflow.current_step,
                    "total_steps": workflow.total_steps,
                    "created_at": workflow.created_at,
                    "modified_at": workflow.modified_at,
                    "reason": workflow.reason,
                    "age_days": age_days,
                }
            )

        return workflow_items, total_count

Comment on lines +234 to +247
days_until_sunday = (6 - now.weekday()) % 7
if days_until_sunday == 0:
# Today is Sunday - check if it's before 5 AM
target_time = now.replace(hour=5, minute=0, second=0, microsecond=0)
if now >= target_time:
# Already past 5 AM today, schedule for next Sunday
days_until_sunday = 7
else:
# Calculate target Sunday
days_until_sunday = days_until_sunday if days_until_sunday > 0 else 7

next_sunday = now + timedelta(days=days_until_sunday)
next_run = next_sunday.replace(hour=5, minute=0, second=0, microsecond=0)
seconds_until_run = (next_run - now).total_seconds()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to calculate the seconds until the next run is unnecessarily complex and contains a redundant else block. This can be simplified to improve readability and maintainability.

Suggested change
days_until_sunday = (6 - now.weekday()) % 7
if days_until_sunday == 0:
# Today is Sunday - check if it's before 5 AM
target_time = now.replace(hour=5, minute=0, second=0, microsecond=0)
if now >= target_time:
# Already past 5 AM today, schedule for next Sunday
days_until_sunday = 7
else:
# Calculate target Sunday
days_until_sunday = days_until_sunday if days_until_sunday > 0 else 7
next_sunday = now + timedelta(days=days_until_sunday)
next_run = next_sunday.replace(hour=5, minute=0, second=0, microsecond=0)
seconds_until_run = (next_run - now).total_seconds()
days_until_sunday = (6 - now.weekday()) % 7
if days_until_sunday == 0:
# Today is Sunday - check if it's after 5 AM
target_time = now.replace(hour=5, minute=0, second=0, microsecond=0)
if now >= target_time:
# Already past 5 AM today, schedule for next Sunday
days_until_sunday = 7
next_sunday = now + timedelta(days=days_until_sunday)
next_run = next_sunday.replace(hour=5, minute=0, second=0, microsecond=0)
seconds_until_run = (next_run - now).total_seconds()


return db_workflow_step

async def list_old_workflows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new methods list_old_workflows and trigger_manual_cleanup are related to Workflow management, but they have been added to WorkflowStepService. For better code organization and to follow the Single Responsibility Principle, these methods should be moved to WorkflowService or a new dedicated service for workflow cleanup operations.

Comment on lines +820 to +839
if dry_run:
# Simulate cleanup - just count workflows
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)

workflows_completed, _ = await WorkflowDataManager(self.session).get_all_workflows(
offset=0,
limit=batch_size,
filters={"status": WorkflowStatusEnum.COMPLETED},
)

workflows_failed, _ = await WorkflowDataManager(self.session).get_all_workflows(
offset=0,
limit=batch_size,
filters={"status": WorkflowStatusEnum.FAILED},
)

all_workflows = workflows_completed + workflows_failed
old_workflows = [w for w in all_workflows if w.modified_at and w.modified_at < cutoff_date]
old_workflows = old_workflows[:batch_size]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for finding old workflows in dry_run mode is duplicated from list_old_workflows and get_old_workflows in cleanup.py. This should be avoided to follow the DRY (Don't Repeat Yourself) principle.

Consider creating a shared utility function to fetch old workflows from the database. This function could be used by list_old_workflows, trigger_manual_cleanup (for dry runs), and WorkflowCleanupScheduler to make the code more maintainable.

)

# Estimate storage size (rough estimate: 10KB per workflow in Redis)
estimated_size_kb = total_count * 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The number 10 used to estimate the storage size of a workflow in Redis is a "magic number". It would be better to define this as a named constant with a comment explaining how this estimate was derived. This improves readability and makes it easier to adjust in the future.

Suggested change
estimated_size_kb = total_count * 10
# Rough estimate: average workflow state size in Redis is ~10KB
WORKFLOW_SIZE_ESTIMATE_KB = 10
estimated_size_kb = total_count * WORKFLOW_SIZE_ESTIMATE_KB

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants