Skip to content

Commit c509482

Browse files
fix(scans): scheduled scans duplicates (#9883)
Co-authored-by: Víctor Fernández Poyatos <victor@prowler.com>
1 parent 09ee069 commit c509482

File tree

4 files changed

+334
-69
lines changed

4 files changed

+334
-69
lines changed

api/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ All notable changes to the **Prowler API** are documented in this file.
1515
- Lazy load Neo4j driver for workers only [(#9872)](https://github.com/prowler-cloud/prowler/pull/9872)
1616
- Improve Cypher query for inserting Findings into Attack Paths scan graphs [(#9874)](https://github.com/prowler-cloud/prowler/pull/9874)
1717
- Clear Neo4j database cache after Attack Paths scan and each API query [(#9877)](https://github.com/prowler-cloud/prowler/pull/9877)
18+
- Deduplicated scheduled scans for long-running providers [(#9829)](https://github.com/prowler-cloud/prowler/pull/9829)
1819

1920
## [1.18.0] (Prowler v5.17.0)
2021

api/src/backend/tasks/tasks.py

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,13 @@
11
import os
2-
32
from datetime import datetime, timedelta, timezone
43
from pathlib import Path
54
from shutil import rmtree
65

76
from celery import chain, group, shared_task
87
from celery.utils.log import get_task_logger
9-
from django_celery_beat.models import PeriodicTask
10-
11-
from api.compliance import get_compliance_frameworks
12-
from api.db_router import READ_REPLICA_ALIAS
13-
from api.db_utils import rls_transaction
14-
from api.decorators import handle_provider_deletion, set_tenant
15-
from api.models import Finding, Integration, Provider, Scan, ScanSummary, StateChoices
16-
from api.utils import initialize_prowler_provider
17-
from api.v1.serializers import ScanTaskSerializer
188
from config.celery import RLSTask
199
from config.django.base import DJANGO_FINDINGS_BATCH_SIZE, DJANGO_TMP_OUTPUT_DIRECTORY
20-
from prowler.lib.check.compliance_models import Compliance
21-
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
22-
from prowler.lib.outputs.finding import Finding as FindingOutput
10+
from django_celery_beat.models import PeriodicTask
2311
from tasks.jobs.attack_paths import (
2412
attack_paths_scan,
2513
can_provider_run_attack_paths_scan,
@@ -64,7 +52,22 @@
6452
perform_prowler_scan,
6553
update_provider_compliance_scores,
6654
)
67-
from tasks.utils import batched, get_next_execution_datetime
55+
from tasks.utils import (
56+
_get_or_create_scheduled_scan,
57+
batched,
58+
get_next_execution_datetime,
59+
)
60+
61+
from api.compliance import get_compliance_frameworks
62+
from api.db_router import READ_REPLICA_ALIAS
63+
from api.db_utils import rls_transaction
64+
from api.decorators import handle_provider_deletion, set_tenant
65+
from api.models import Finding, Integration, Provider, Scan, ScanSummary, StateChoices
66+
from api.utils import initialize_prowler_provider
67+
from api.v1.serializers import ScanTaskSerializer
68+
from prowler.lib.check.compliance_models import Compliance
69+
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
70+
from prowler.lib.outputs.finding import Finding as FindingOutput
6871

6972
logger = get_task_logger(__name__)
7073

@@ -275,44 +278,38 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
275278
periodic_task_instance = PeriodicTask.objects.get(
276279
name=f"scan-perform-scheduled-{provider_id}"
277280
)
281+
executing_scan = (
282+
Scan.objects.filter(
283+
tenant_id=tenant_id,
284+
provider_id=provider_id,
285+
trigger=Scan.TriggerChoices.SCHEDULED,
286+
state=StateChoices.EXECUTING,
287+
)
288+
.order_by("-started_at")
289+
.first()
290+
)
291+
if executing_scan:
292+
logger.warning(
293+
f"Scheduled scan already executing for provider {provider_id}. Skipping."
294+
)
295+
return ScanTaskSerializer(instance=executing_scan).data
278296

279297
executed_scan = Scan.objects.filter(
280298
tenant_id=tenant_id,
281299
provider_id=provider_id,
282300
task__task_runner_task__task_id=task_id,
283-
).order_by("completed_at")
301+
).first()
284302

285-
if (
286-
Scan.objects.filter(
287-
tenant_id=tenant_id,
288-
provider_id=provider_id,
289-
trigger=Scan.TriggerChoices.SCHEDULED,
290-
state=StateChoices.EXECUTING,
291-
scheduler_task_id=periodic_task_instance.id,
292-
scheduled_at__date=datetime.now(timezone.utc).date(),
293-
).exists()
294-
or executed_scan.exists()
295-
):
296-
# Duplicated task execution due to visibility timeout or scan is already running
303+
if executed_scan:
304+
# Duplicated task execution due to visibility timeout
297305
logger.warning(f"Duplicated scheduled scan for provider {provider_id}.")
298-
try:
299-
affected_scan = executed_scan.first()
300-
if not affected_scan:
301-
raise ValueError(
302-
"Error retrieving affected scan details after detecting duplicated scheduled "
303-
"scan."
304-
)
305-
# Return the affected scan details to avoid losing data
306-
serializer = ScanTaskSerializer(instance=affected_scan)
307-
except Exception as duplicated_scan_exception:
308-
logger.error(
309-
f"Duplicated scheduled scan for provider {provider_id}. Error retrieving affected scan details: "
310-
f"{str(duplicated_scan_exception)}"
311-
)
312-
raise duplicated_scan_exception
313-
return serializer.data
306+
return ScanTaskSerializer(instance=executed_scan).data
314307

308+
interval = periodic_task_instance.interval
315309
next_scan_datetime = get_next_execution_datetime(task_id, provider_id)
310+
current_scan_datetime = next_scan_datetime - timedelta(
311+
**{interval.period: interval.every}
312+
)
316313

317314
# TEMPORARY WORKAROUND: Clean up orphan scans from transaction isolation issue
318315
_cleanup_orphan_scheduled_scans(
@@ -321,19 +318,12 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
321318
scheduler_task_id=periodic_task_instance.id,
322319
)
323320

324-
scan_instance, _ = Scan.objects.get_or_create(
321+
scan_instance = _get_or_create_scheduled_scan(
325322
tenant_id=tenant_id,
326323
provider_id=provider_id,
327-
trigger=Scan.TriggerChoices.SCHEDULED,
328-
state__in=(StateChoices.SCHEDULED, StateChoices.AVAILABLE),
329324
scheduler_task_id=periodic_task_instance.id,
330-
defaults={
331-
"state": StateChoices.SCHEDULED,
332-
"name": "Daily scheduled scan",
333-
"scheduled_at": next_scan_datetime - timedelta(days=1),
334-
},
325+
scheduled_at=current_scan_datetime,
335326
)
336-
337327
scan_instance.task_id = task_id
338328
scan_instance.save()
339329

@@ -343,18 +333,19 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
343333
scan_id=str(scan_instance.id),
344334
provider_id=provider_id,
345335
)
346-
except Exception as e:
347-
raise e
348336
finally:
349337
with rls_transaction(tenant_id):
350-
Scan.objects.get_or_create(
338+
now = datetime.now(timezone.utc)
339+
if next_scan_datetime <= now:
340+
interval_delta = timedelta(**{interval.period: interval.every})
341+
while next_scan_datetime <= now:
342+
next_scan_datetime += interval_delta
343+
_get_or_create_scheduled_scan(
351344
tenant_id=tenant_id,
352-
name="Daily scheduled scan",
353345
provider_id=provider_id,
354-
trigger=Scan.TriggerChoices.SCHEDULED,
355-
state=StateChoices.SCHEDULED,
356-
scheduled_at=next_scan_datetime,
357346
scheduler_task_id=periodic_task_instance.id,
347+
scheduled_at=next_scan_datetime,
348+
update_state=True,
358349
)
359350

360351
_perform_scan_complete_tasks(tenant_id, str(scan_instance.id), provider_id)

0 commit comments

Comments
 (0)