Skip to content

Commit 1da1061

Browse files
authored
perf(attack-paths): reduce sync and findings memory usage with smaller batches and cursor iteration (#10359)
1 parent e8aaf52 commit 1da1061

File tree

5 files changed

+353
-191
lines changed

5 files changed

+353
-191
lines changed

api/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ All notable changes to the **Prowler API** are documented in this file.
1212

1313
- Attack Paths: Complete migration to private graph labels and properties, removing deprecated dual-write support [(#10268)](https://github.com/prowler-cloud/prowler/pull/10268)
1414
- Attack Paths: Added tenant and provider related labels to the nodes so they can be easily filtered on custom queries [(#10308)](https://github.com/prowler-cloud/prowler/pull/10308)
15+
- Attack Paths: Reduce sync and findings memory usage with smaller batches, cursor iteration, and sequential sessions [(#10359)](https://github.com/prowler-cloud/prowler/pull/10359)
1516

1617
### 🐞 Fixed
1718

api/src/backend/tasks/jobs/attack_paths/config.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
from uuid import UUID
44

55
from config.env import env
6-
76
from tasks.jobs.attack_paths import aws
87

9-
10-
# Batch size for Neo4j operations
8+
# Batch size for Neo4j write operations (resource labeling, cleanup)
119
BATCH_SIZE = env.int("ATTACK_PATHS_BATCH_SIZE", 1000)
10+
# Batch size for Postgres findings fetch (keyset pagination page size)
11+
FINDINGS_BATCH_SIZE = env.int("ATTACK_PATHS_FINDINGS_BATCH_SIZE", 500)
12+
# Batch size for temp-to-tenant graph sync (nodes and relationships per cursor page)
13+
SYNC_BATCH_SIZE = env.int("ATTACK_PATHS_SYNC_BATCH_SIZE", 250)
1214

1315
# Neo4j internal labels (Prowler-specific, not provider-specific)
1416
# - `Internet`: Singleton node representing external internet access for exposed-resource queries

api/src/backend/tasks/jobs/attack_paths/findings.py

Lines changed: 57 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,15 @@
99
"""
1010

1111
from collections import defaultdict
12-
from dataclasses import asdict, dataclass, fields
1312
from typing import Any, Generator
1413
from uuid import UUID
1514

1615
import neo4j
17-
1816
from cartography.config import Config as CartographyConfig
1917
from celery.utils.log import get_task_logger
20-
21-
from api.db_router import READ_REPLICA_ALIAS
22-
from api.db_utils import rls_transaction
23-
from api.models import Finding as FindingModel
24-
from api.models import Provider, ResourceFindingMapping
25-
from prowler.config import config as ProwlerConfig
2618
from tasks.jobs.attack_paths.config import (
2719
BATCH_SIZE,
20+
FINDINGS_BATCH_SIZE,
2821
get_node_uid_field,
2922
get_provider_resource_label,
3023
get_root_node_label,
@@ -37,75 +30,54 @@
3730
render_cypher_template,
3831
)
3932

40-
logger = get_task_logger(__name__)
41-
42-
43-
# Type Definitions
44-
# -----------------
45-
46-
# Maps dataclass field names to Django ORM query field names
47-
_DB_FIELD_MAP: dict[str, str] = {
48-
"check_title": "check_metadata__checktitle",
49-
}
50-
51-
52-
@dataclass(slots=True)
53-
class Finding:
54-
"""
55-
Finding data for Neo4j ingestion.
56-
57-
Can be created from a Django .values() query result using from_db_record().
58-
"""
33+
from api.db_router import READ_REPLICA_ALIAS
34+
from api.db_utils import rls_transaction
35+
from api.models import Finding as FindingModel
36+
from api.models import Provider, ResourceFindingMapping
37+
from prowler.config import config as ProwlerConfig
5938

60-
id: str
61-
uid: str
62-
inserted_at: str
63-
updated_at: str
64-
first_seen_at: str
65-
scan_id: str
66-
delta: str
67-
status: str
68-
status_extended: str
69-
severity: str
70-
check_id: str
71-
check_title: str
72-
muted: bool
73-
muted_reason: str | None
74-
resource_uid: str | None = None
75-
76-
@classmethod
77-
def get_db_query_fields(cls) -> tuple[str, ...]:
78-
"""Get field names for Django .values() query."""
79-
return tuple(
80-
_DB_FIELD_MAP.get(f.name, f.name)
81-
for f in fields(cls)
82-
if f.name != "resource_uid"
83-
)
39+
logger = get_task_logger(__name__)
8440

85-
@classmethod
86-
def from_db_record(cls, record: dict[str, Any], resource_uid: str) -> "Finding":
87-
"""Create a Finding from a Django .values() query result."""
88-
return cls(
89-
id=str(record["id"]),
90-
uid=record["uid"],
91-
inserted_at=record["inserted_at"],
92-
updated_at=record["updated_at"],
93-
first_seen_at=record["first_seen_at"],
94-
scan_id=str(record["scan_id"]),
95-
delta=record["delta"],
96-
status=record["status"],
97-
status_extended=record["status_extended"],
98-
severity=record["severity"],
99-
check_id=str(record["check_id"]),
100-
check_title=record["check_metadata__checktitle"],
101-
muted=record["muted"],
102-
muted_reason=record["muted_reason"],
103-
resource_uid=resource_uid,
104-
)
10541

106-
def to_dict(self) -> dict[str, Any]:
107-
"""Convert to dict for Neo4j ingestion."""
108-
return asdict(self)
42+
# Django ORM field names for `.values()` queries
43+
# Most map 1:1 to Neo4j property names, exceptions are remapped in `_to_neo4j_dict`
44+
_DB_QUERY_FIELDS = [
45+
"id",
46+
"uid",
47+
"inserted_at",
48+
"updated_at",
49+
"first_seen_at",
50+
"scan_id",
51+
"delta",
52+
"status",
53+
"status_extended",
54+
"severity",
55+
"check_id",
56+
"check_metadata__checktitle",
57+
"muted",
58+
"muted_reason",
59+
]
60+
61+
62+
def _to_neo4j_dict(record: dict[str, Any], resource_uid: str) -> dict[str, Any]:
63+
"""Transform a Django `.values()` record into a `dict` ready for Neo4j ingestion."""
64+
return {
65+
"id": str(record["id"]),
66+
"uid": record["uid"],
67+
"inserted_at": record["inserted_at"],
68+
"updated_at": record["updated_at"],
69+
"first_seen_at": record["first_seen_at"],
70+
"scan_id": str(record["scan_id"]),
71+
"delta": record["delta"],
72+
"status": record["status"],
73+
"status_extended": record["status_extended"],
74+
"severity": record["severity"],
75+
"check_id": str(record["check_id"]),
76+
"check_title": record["check_metadata__checktitle"],
77+
"muted": record["muted"],
78+
"muted_reason": record["muted_reason"],
79+
"resource_uid": resource_uid,
80+
}
10981

11082

11183
# Public API
@@ -180,7 +152,7 @@ def add_resource_label(
180152

181153
def load_findings(
182154
neo4j_session: neo4j.Session,
183-
findings_batches: Generator[list[Finding], None, None],
155+
findings_batches: Generator[list[dict[str, Any]], None, None],
184156
prowler_api_provider: Provider,
185157
config: CartographyConfig,
186158
) -> None:
@@ -209,7 +181,7 @@ def load_findings(
209181
batch_size = len(batch)
210182
total_records += batch_size
211183

212-
parameters["findings_data"] = [f.to_dict() for f in batch]
184+
parameters["findings_data"] = batch
213185

214186
logger.info(f"Loading findings batch {batch_num} ({batch_size} records)")
215187
neo4j_session.run(query, parameters)
@@ -247,16 +219,17 @@ def cleanup_findings(
247219
def stream_findings_with_resources(
248220
prowler_api_provider: Provider,
249221
scan_id: str,
250-
) -> Generator[list[Finding], None, None]:
222+
) -> Generator[list[dict[str, Any]], None, None]:
251223
"""
252224
Stream findings with their associated resources in batches.
253225
254226
Uses keyset pagination for efficient traversal of large datasets.
255-
Memory efficient: yields one batch at a time, never holds all findings in memory.
227+
Memory efficient: yields one batch at a time as dicts ready for Neo4j ingestion,
228+
never holds all findings in memory.
256229
"""
257230
logger.info(
258231
f"Starting findings stream for scan {scan_id} "
259-
f"(tenant {prowler_api_provider.tenant_id}) with batch size {BATCH_SIZE}"
232+
f"(tenant {prowler_api_provider.tenant_id}) with batch size {FINDINGS_BATCH_SIZE}"
260233
)
261234

262235
tenant_id = prowler_api_provider.tenant_id
@@ -305,15 +278,14 @@ def _fetch_findings_batch(
305278
Uses read replica and RLS-scoped transaction.
306279
"""
307280
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
308-
# Use all_objects to avoid the ActiveProviderManager's implicit JOIN
309-
# through Scan -> Provider (to check is_deleted=False).
310-
# The provider is already validated as active in this context.
281+
# Use `all_objects` to get `Findings` even on soft-deleted `Providers`
282+
# But even the provider is already validated as active in this context
311283
qs = FindingModel.all_objects.filter(scan_id=scan_id).order_by("id")
312284

313285
if after_id is not None:
314286
qs = qs.filter(id__gt=after_id)
315287

316-
return list(qs.values(*Finding.get_db_query_fields())[:BATCH_SIZE])
288+
return list(qs.values(*_DB_QUERY_FIELDS)[:FINDINGS_BATCH_SIZE])
317289

318290

319291
# Batch Enrichment
@@ -323,7 +295,7 @@ def _fetch_findings_batch(
323295
def _enrich_batch_with_resources(
324296
findings_batch: list[dict[str, Any]],
325297
tenant_id: str,
326-
) -> list[Finding]:
298+
) -> list[dict[str, Any]]:
327299
"""
328300
Enrich findings with their resource UIDs.
329301
@@ -334,7 +306,7 @@ def _enrich_batch_with_resources(
334306
resource_map = _build_finding_resource_map(finding_ids, tenant_id)
335307

336308
return [
337-
Finding.from_db_record(finding, resource_uid)
309+
_to_neo4j_dict(finding, resource_uid)
338310
for finding in findings_batch
339311
for resource_uid in resource_map.get(finding["id"], [])
340312
]

0 commit comments

Comments
 (0)