Skip to content

Commit 7732eab

Browse files
committed
chore: run ruff
1 parent c9a89d6 commit 7732eab

File tree

2 files changed

+41
-37
lines changed

2 files changed

+41
-37
lines changed

mitreattack/attackToExcel/attackToExcel.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,9 @@ def build_dataframes(src: MemoryStore, domain: str) -> Dict:
149149

150150
def build_ds_an_lg_relationships(dataframes: Dict) -> Dict[str, pd.DataFrame]:
151151
"""Build detection-mappings.xlsx with a single DS → Analytic → LogSource sheet."""
152+
ds_an = dataframes["detectionstrategies"].get("detectionstrategies-analytic", pd.DataFrame())
152153

153-
ds_an = dataframes["detectionstrategies"].get(
154-
"detectionstrategies-analytic", pd.DataFrame()
155-
)
156-
157-
an_ls = dataframes["analytics"].get(
158-
"analytic-logsource", pd.DataFrame()
159-
)
154+
an_ls = dataframes["analytics"].get("analytic-logsource", pd.DataFrame())
160155

161156
if ds_an.empty or an_ls.empty:
162157
combined = pd.DataFrame()
@@ -167,13 +162,12 @@ def build_ds_an_lg_relationships(dataframes: Dict) -> Dict[str, pd.DataFrame]:
167162
how="left",
168163
)
169164

170-
return {
171-
"ds_an_ls": combined
172-
}
173-
165+
return {"ds_an_ls": combined}
174166

175167

176-
def write_excel(dataframes: Dict, domain: str, src: MemoryStore, version: Optional[str] = None, output_dir: str = ".") -> List:
168+
def write_excel(
169+
dataframes: Dict, domain: str, src: MemoryStore, version: Optional[str] = None, output_dir: str = "."
170+
) -> List:
177171
"""Given a set of dataframes from build_dataframes, write the ATT&CK dataset to output directory.
178172
179173
Parameters
@@ -232,10 +226,14 @@ def write_excel(dataframes: Dict, domain: str, src: MemoryStore, version: Option
232226
for sheet_name in object_data:
233227
logger.debug(f"Writing sheet to {fp}: {sheet_name}")
234228
object_data[sheet_name].to_excel(object_writer, sheet_name=sheet_name, index=False)
235-
229+
236230
# Write Detection strategy - Analytics - Log sources file
237-
if object_type in add_ds_an_ls_to and isinstance(ds_an_ls_df, pd.DataFrame) and not ds_an_ls_df.empty:
238-
ds_an_ls_df.to_excel(object_writer, sheet_name="detection mappings", index=False)
231+
if (
232+
object_type in add_ds_an_ls_to
233+
and isinstance(ds_an_ls_df, pd.DataFrame)
234+
and not ds_an_ls_df.empty
235+
):
236+
ds_an_ls_df.to_excel(object_writer, sheet_name="defensive mappings", index=False)
239237
written_files.append(fp)
240238

241239
# add citations to master citations list
@@ -323,7 +321,7 @@ def write_excel(dataframes: Dict, domain: str, src: MemoryStore, version: Option
323321
written_files.append(fp)
324322

325323
if isinstance(ds_an_ls_df, pd.DataFrame) and not ds_an_ls_df.empty:
326-
ds_an_ls_df.to_excel(master_writer, sheet_name="detection mappings", index=False)
324+
ds_an_ls_df.to_excel(master_writer, sheet_name="defensive mappings", index=False)
327325
# remove duplicate citations and add sheet to master file
328326
logger.debug(f"Writing sheet to {master_fp}: citations")
329327
citations.drop_duplicates(subset="reference", ignore_index=True).sort_values("reference").to_excel(
@@ -404,7 +402,7 @@ def export(
404402
major_version = int(match.group(1))
405403
if major_version < 18:
406404
dataframes = build_dataframes_pre_v18(src=mem_store, domain=domain)
407-
write_excel(dataframes=dataframes, domain=domain, version=version, output_dir=output_dir)
405+
write_excel(dataframes=dataframes, domain=domain, src=mem_store, version=version, output_dir=output_dir)
408406
return
409407

410408
dataframes = build_dataframes(src=mem_store, domain=domain)

mitreattack/attackToExcel/stixToDf.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,11 @@ def analyticsToDf(src):
382382
for ds in detection_strategies:
383383
for analytic_id in ds.get("x_mitre_analytic_refs", []):
384384
analytic_to_ds_map.setdefault(analytic_id, []).append(
385-
{"detection_strategy_attack_id": ds["external_references"][0]["external_id"], "detection_strategy_id": ds["id"], "detection_strategy_name": ds.get("name", "")}
385+
{
386+
"detection_strategy_attack_id": ds["external_references"][0]["external_id"],
387+
"detection_strategy_id": ds["id"],
388+
"detection_strategy_name": ds.get("name", ""),
389+
}
386390
)
387391

388392
for analytic in tqdm(analytics, desc="parsing analytics"):
@@ -404,7 +408,7 @@ def analyticsToDf(src):
404408
"data_component_attack_id": data_comp_attack_id,
405409
"log_source_name": logsrc.get("name", ""),
406410
"channel": logsrc.get("channel", ""),
407-
"platforms": ", ".join(sorted(analytic.get("x_mitre_platforms", [])))
411+
"platforms": ", ".join(sorted(analytic.get("x_mitre_platforms", []))),
408412
}
409413
)
410414

@@ -417,8 +421,7 @@ def analyticsToDf(src):
417421
"detection_strategy_id": ds_info["detection_strategy_id"],
418422
"detection_strategy_attack_id": ds_info["detection_strategy_attack_id"],
419423
"detection_strategy_name": ds_info["detection_strategy_name"],
420-
"platforms": ", ".join(sorted(analytic.get("x_mitre_platforms", [])))
421-
424+
"platforms": ", ".join(sorted(analytic.get("x_mitre_platforms", []))),
422425
}
423426
)
424427

@@ -463,8 +466,7 @@ def detectionstrategiesToDf(src):
463466
"detection_strategy_name": detection_strategy.get("name", ""),
464467
"analytic_id": analytic_id,
465468
"analytic_name": analytic_obj["external_references"][0]["external_id"],
466-
"platforms": ", ".join(sorted(analytic_obj.get("x_mitre_platforms", [])))
467-
469+
"platforms": ", ".join(sorted(analytic_obj.get("x_mitre_platforms", []))),
468470
}
469471
)
470472

@@ -525,6 +527,7 @@ def softwareToDf(src):
525527

526528
return dataframes
527529

530+
528531
def detectionStrategiesAnalyticsLogSourcesDf(src):
529532
"""Build a single DS -> LogSource -> Analytic dataframe directly from STIX."""
530533
detection_strategies = src.query([Filter("type", "=", "x-mitre-detection-strategy")])
@@ -550,22 +553,25 @@ def detectionStrategiesAnalyticsLogSourcesDf(src):
550553
data_comp_id = logsrc.get("x_mitre_data_component_ref", "")
551554
data_comp = src.get(data_comp_id)
552555

553-
rows.append({
554-
"detection_strategy_attack_id": ds_attack_id,
555-
"detection_strategy_id": ds_id,
556-
"detection_strategy_name": ds_name,
557-
"analytic_id": analytic_id,
558-
"analytic_name": analytic_attack_id,
559-
"platforms": platforms,
560-
"log_source_name": logsrc.get("name", ""),
561-
"channel": logsrc.get("channel", ""),
562-
"data_component_id": data_comp_id,
563-
"data_component_name": (data_comp.get("name", "") if data_comp else ""),
564-
"data_component_attack_id": data_comp["external_references"][0]["external_id"]
565-
})
556+
rows.append(
557+
{
558+
"detection_strategy_attack_id": ds_attack_id,
559+
"detection_strategy_id": ds_id,
560+
"detection_strategy_name": ds_name,
561+
"analytic_id": analytic_id,
562+
"analytic_name": analytic_attack_id,
563+
"platforms": platforms,
564+
"log_source_name": logsrc.get("name", ""),
565+
"channel": logsrc.get("channel", ""),
566+
"data_component_id": data_comp_id,
567+
"data_component_name": (data_comp.get("name", "") if data_comp else ""),
568+
"data_component_attack_id": data_comp["external_references"][0]["external_id"],
569+
}
570+
)
566571

567572
return pd.DataFrame(rows)
568573

574+
569575
def groupsToDf(src):
570576
"""Parse STIX groups from the given data and return corresponding pandas dataframes.
571577
@@ -1309,4 +1315,4 @@ def _get_relationship_citations(object_dataframe, relationship_df):
13091315
else:
13101316
for i in range(0, len(new_citations)):
13111317
new_citations[i] = ",".join([new_citations[i], subset[i]])
1312-
return new_citations
1318+
return new_citations

0 commit comments

Comments
 (0)