Skip to content

Commit 987271c

Browse files
muhammad-ali-eclaudechandrasekharan-zipstack
authored
UN-3011 [FEAT] Limit maximum file execution count with file history viewer (#1676)
* UN-3011 [FEAT] Limit maximum file execution count with file history viewer - Add file history management UI with bulk operations - Add execution count tracking and limits - Add permissions for workflow-scoped access 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * added file path search and addressed coderabbit comments * removed cache clear now. will address it later * single to bulk delete * minor changes in validations * addressed commenets * addressed commenets * addressed commenets * removed unwanted console * addressed ux design suggestions --------- Co-authored-by: Claude <[email protected]> Co-authored-by: Chandrasekharan M <[email protected]>
1 parent 3d4415d commit 987271c

File tree

23 files changed

+1596
-98
lines changed

23 files changed

+1596
-98
lines changed

backend/backend/settings/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |
200200
MAX_PARALLEL_FILE_BATCHES_MAX_VALUE = int(
201201
os.environ.get("MAX_PARALLEL_FILE_BATCHES_MAX_VALUE", 100)
202202
)
203+
# Maximum number of times a file can be executed in a workflow
204+
MAX_FILE_EXECUTION_COUNT = int(os.environ.get("MAX_FILE_EXECUTION_COUNT", 3))
203205

204206
CELERY_RESULT_CHORD_RETRY_INTERVAL = float(
205207
os.environ.get("CELERY_RESULT_CHORD_RETRY_INTERVAL", "3")

backend/sample.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,10 @@ FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000
217217
# File execution tracker completed TTL in seconds (10 minutes)
218218
FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600
219219

220+
# Maximum number of times a file can be executed in ETL/TASK workflows
221+
# Default: 3 (file is permanently skipped after 3 execution attempts)
222+
MAX_FILE_EXECUTION_COUNT=3
223+
220224
# Runner polling timeout (3 hours)
221225
MAX_RUNNER_POLLING_WAIT_SECONDS=10800
222226
# Runner polling interval (2 seconds)

backend/workflow_manager/internal_views.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ def update_status(self, request, id=None):
502502
from workflow_manager.workflow_v2.enums import ExecutionStatus
503503

504504
status_enum = ExecutionStatus(validated_data["status"])
505+
logger.info(f"Updating status for execution {id} to {status_enum}")
505506
execution.update_execution(
506507
status=status_enum,
507508
error=error_message,
@@ -2305,6 +2306,9 @@ def post(self, request):
23052306
file_history_queryset, request, "workflow__organization"
23062307
)
23072308

2309+
# Get max execution count for this workflow (for worker decision making)
2310+
max_execution_count = workflow.get_max_execution_count()
2311+
23082312
# Get full file history details for cached results
23092313
file_histories = file_history_queryset.values(
23102314
"cache_key",
@@ -2313,6 +2317,8 @@ def post(self, request):
23132317
"error",
23142318
"file_path",
23152319
"provider_file_uuid",
2320+
"execution_count",
2321+
"status",
23162322
)
23172323

23182324
# Build response with both processed hashes (for compatibility) and full details
@@ -2328,16 +2334,21 @@ def post(self, request):
23282334
"error": fh["error"],
23292335
"file_path": fh["file_path"],
23302336
"provider_file_uuid": fh["provider_file_uuid"],
2337+
"execution_count": fh["execution_count"],
2338+
"status": fh["status"],
2339+
"max_execution_count": max_execution_count, # Include for worker logic
23312340
}
23322341

23332342
logger.info(
2334-
f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed"
2343+
f"File history batch check: {len(processed_file_hashes)}/{len(file_hashes)} files already processed "
2344+
f"(max_execution_count: {max_execution_count})"
23352345
)
23362346

23372347
return Response(
23382348
{
23392349
"processed_file_hashes": processed_file_hashes, # For backward compatibility
23402350
"file_history_details": file_history_details, # Full details for cached results
2351+
"max_execution_count": max_execution_count, # Global max for this workflow
23412352
}
23422353
)
23432354

@@ -2470,10 +2481,17 @@ def post(self, request):
24702481
)
24712482

24722483
logger.info(
2473-
f"Created file history entry {file_history.id} for file {file_name}"
2484+
f"Created/updated file history entry {file_history.id} for file {file_name} "
2485+
f"(execution_count: {file_history.execution_count})"
24742486
)
24752487

2476-
return Response({"created": True, "file_history_id": str(file_history.id)})
2488+
return Response(
2489+
{
2490+
"created": True,
2491+
"file_history_id": str(file_history.id),
2492+
"execution_count": file_history.execution_count,
2493+
}
2494+
)
24772495

24782496
except Exception as e:
24792497
logger.error(f"File history creation failed: {str(e)}")

backend/workflow_manager/workflow_v2/file_history_helper.py

Lines changed: 108 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Any
44

55
from django.conf import settings
6-
from django.db.models import Q
6+
from django.db.models import F, Q
77
from django.db.utils import IntegrityError
88
from django.utils import timezone
99
from utils.cache_service import CacheService
@@ -238,6 +238,61 @@ def _get_reprocessing_interval_from_config(
238238
workflow_log.log_error(logger=logger, message=error_msg)
239239
return None
240240

241+
@staticmethod
242+
def _safe_str(value: Any) -> str:
243+
"""Convert value to string, return empty string if None.
244+
245+
Args:
246+
value: Value to convert
247+
248+
Returns:
249+
str: String representation or empty string
250+
"""
251+
return "" if value is None else str(value)
252+
253+
@staticmethod
254+
def _truncate_hash(file_hash: str | None) -> str:
255+
"""Truncate hash for logging purposes.
256+
257+
Args:
258+
file_hash: Hash string to truncate
259+
260+
Returns:
261+
str: Truncated hash (first 16 chars) or 'None' if missing
262+
"""
263+
return file_hash[:16] if file_hash else "None"
264+
265+
@staticmethod
266+
def _increment_file_history(
267+
file_history: FileHistory,
268+
status: ExecutionStatus,
269+
result: Any,
270+
metadata: str | None,
271+
error: str | None,
272+
) -> FileHistory:
273+
"""Update existing file history with incremented execution count.
274+
275+
Args:
276+
file_history: FileHistory instance to update
277+
status: New execution status
278+
result: Execution result
279+
metadata: Execution metadata
280+
error: Error message if any
281+
282+
Returns:
283+
FileHistory: Updated file history instance
284+
"""
285+
FileHistory.objects.filter(id=file_history.id).update(
286+
execution_count=F("execution_count") + 1,
287+
status=status,
288+
result=str(result),
289+
metadata=FileHistoryHelper._safe_str(metadata),
290+
error=FileHistoryHelper._safe_str(error),
291+
)
292+
# Refresh from DB to get updated values
293+
file_history.refresh_from_db()
294+
return file_history
295+
241296
@staticmethod
242297
def create_file_history(
243298
file_hash: FileHash,
@@ -248,7 +303,11 @@ def create_file_history(
248303
error: str | None = None,
249304
is_api: bool = False,
250305
) -> FileHistory:
251-
"""Create a new file history record or return existing one.
306+
"""Create a new file history record or increment existing one's execution count.
307+
308+
This method implements execution count tracking:
309+
- If file history exists: increments execution_count atomically
310+
- If file history is new: creates with execution_count=1
252311
253312
Args:
254313
file_hash (FileHash): The file hash for the file.
@@ -260,44 +319,64 @@ def create_file_history(
260319
is_api (bool): Whether this is for API workflow (affects file_path handling).
261320
262321
Returns:
263-
FileHistory: Either newly created or existing file history record.
322+
FileHistory: Either newly created or updated file history record.
264323
"""
265324
file_path = file_hash.file_path if not is_api else None
266325

267-
# Prepare data for creation
326+
# Check if file history already exists
327+
existing_history = FileHistoryHelper.get_file_history(
328+
workflow=workflow,
329+
cache_key=file_hash.file_hash,
330+
provider_file_uuid=file_hash.provider_file_uuid,
331+
file_path=file_path,
332+
)
333+
334+
if existing_history:
335+
# File history exists - increment execution count atomically
336+
updated_history = FileHistoryHelper._increment_file_history(
337+
existing_history, status, result, metadata, error
338+
)
339+
logger.info(
340+
f"Updated FileHistory record (execution_count: {updated_history.execution_count}) - "
341+
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
342+
f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', "
343+
f"workflow={workflow}"
344+
)
345+
return updated_history
346+
347+
# File history doesn't exist - create new record with execution_count=1
268348
create_data = {
269349
"workflow": workflow,
270350
"cache_key": file_hash.file_hash,
271351
"provider_file_uuid": file_hash.provider_file_uuid,
272352
"status": status,
273353
"result": str(result),
274-
"metadata": str(metadata) if metadata else "",
275-
"error": str(error) if error else "",
354+
"metadata": FileHistoryHelper._safe_str(metadata),
355+
"error": FileHistoryHelper._safe_str(error),
276356
"file_path": file_path,
357+
"execution_count": 1,
277358
}
278359

279360
try:
280-
# Try to create the file history record
281361
file_history = FileHistory.objects.create(**create_data)
282362
logger.info(
283-
f"Created new FileHistory record - "
363+
f"Created new FileHistory record (execution_count: 1) - "
284364
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
285-
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
365+
f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', "
286366
f"workflow={workflow}"
287367
)
288368
return file_history
289369

290370
except IntegrityError as e:
291-
# Race condition detected - another worker created the record
292-
# Try to retrieve the existing record
371+
# Race condition: another worker created the record between our check and create
293372
logger.info(
294-
f"FileHistory constraint violation (expected in concurrent environment) - "
373+
f"FileHistory constraint violation (race condition) - "
295374
f"file_name='{file_hash.file_name}', file_path='{file_hash.file_path}', "
296-
f"file_hash='{file_hash.file_hash[:16] if file_hash.file_hash else 'None'}', "
297-
f"workflow={workflow}. Error: {str(e)}"
375+
f"file_hash='{FileHistoryHelper._truncate_hash(file_hash.file_hash)}', "
376+
f"workflow={workflow}. Error: {e!s}"
298377
)
299378

300-
# Use the existing get_file_history method to retrieve the record
379+
# Retrieve the record created by another worker and increment it
301380
existing_record = FileHistoryHelper.get_file_history(
302381
workflow=workflow,
303382
cache_key=file_hash.file_hash,
@@ -306,18 +385,22 @@ def create_file_history(
306385
)
307386

308387
if existing_record:
309-
logger.info(
310-
f"Retrieved existing FileHistory record after constraint violation - "
311-
f"ID: {existing_record.id}, workflow={workflow}"
388+
# Increment the existing record
389+
updated_record = FileHistoryHelper._increment_file_history(
390+
existing_record, status, result, metadata, error
312391
)
313-
return existing_record
314-
else:
315-
# This should rarely happen, but if we can't find the existing record,
316-
# log the issue and re-raise the original exception
317-
logger.error(
318-
f"Failed to retrieve existing FileHistory record after constraint violation - "
319-
f"file_name='{file_hash.file_name}', workflow={workflow}"
392+
logger.info(
393+
f"Retrieved and updated existing FileHistory record (execution_count: {updated_record.execution_count}) - "
394+
f"ID: {updated_record.id}, workflow={workflow}"
320395
)
396+
return updated_record
397+
398+
# This should rarely happen - existing record not found after IntegrityError
399+
logger.exception(
400+
f"Failed to retrieve existing FileHistory record after constraint violation - "
401+
f"file_name='{file_hash.file_name}', workflow={workflow}"
402+
)
403+
raise
321404

322405
@staticmethod
323406
def clear_history_for_workflow(

0 commit comments

Comments
 (0)