Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit 04dd64e

Browse files
committed
v1.4.1 release
2 parents 36a2815 + 65a41b4 commit 04dd64e

File tree

6 files changed

+321
-26
lines changed

6 files changed

+321
-26
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.DS_Store

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
# V1.4.1 - 18 May 2020
2+
## New Scripts
3+
- New script [technique_mappings_to_csv.py](technique_mappings_to_csv.py) added to support mapping Techniques with Mitigations, Groups or Software. The output is a CSV file. Added in PR [#23](https://github.com/mitre-attack/attack-scripts/pull/23)
4+
## Improvements
5+
- Updated [diff_stix.py](scripts/diff_stix.py) with sub-techniques support. See issue [#12](https://github.com/mitre-attack/attack-scripts/issues/12).
6+
## Fixes
7+
- Fixed bug in LayerOps causing issues with cross-tactic techniques, as well as a bug where a score lambda could affect the outcome of other lambdas.
8+
19
# V1.4 - 5 May 2020
210
## New Scripts
311
- Added Layers folder with utility scripts for working with [ATT&CK Navigator](https://github.com/mitre-attack/attack-navigator) Layers. See the Layers [README](layers/README.md) for more details. See issues [#2](https://github.com/mitre-attack/attack-scripts/issues/2) and [#3](https://github.com/mitre-attack/attack-scripts/issues/3).

layers/manipulators/layerops.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,13 @@ def _build_template(self, data):
211211
for key in temp:
212212
for elm in temp[key]:
213213
if not any(elm['techniqueID'] == x['techniqueID']
214+
and elm['tactic'] == x['tactic']
214215
for x in t2):
215216
t2.append(elm)
216217
else:
217218
[x.update(elm)
218219
if x['techniqueID'] == elm['techniqueID']
220+
and elm['tactic'] == x['tactic']
219221
else x for x in t2]
220222
return t2
221223

@@ -231,8 +233,13 @@ def _template(self, data):
231233
temp.append([{"techniqueID": x.techniqueID, "tactic": x.tactic}
232234
if x.tactic else {"techniqueID": x.techniqueID}
233235
for x in entry])
234-
return list({v['techniqueID']: v
235-
for v in [elm for list in temp for elm in list]}.values())
236+
complete = []
237+
for entry in temp:
238+
for val in entry:
239+
if val in complete:
240+
continue
241+
complete.append(val)
242+
return complete
236243

237244
def _grabList(self, search, collection):
238245
"""
@@ -303,7 +310,10 @@ def _applyOperation(self, corpus, element, name, lda, defaults, glob=None):
303310
listing = [getattr(x.layer, glob) for x in corpus]
304311
listing = [{name: x} for x in listing]
305312
else:
306-
listing = self._grabList(element, corpus)
313+
te = dict(techniqueID=element['techniqueID'])
314+
if 'tactic' in element:
315+
te['tactic'] = element['tactic']
316+
listing = self._grabList(te, corpus)
307317
listing = [x.get_dict() if not isinstance(x, dict)
308318
else dict() for x in listing]
309319
values = []
@@ -323,7 +333,10 @@ def _applyOperation(self, corpus, element, name, lda, defaults, glob=None):
323333
for k in corpus.keys():
324334
listing[k] = {glob: getattr(corpus[k].layer, glob)}
325335
else:
326-
temp = self._grabDict(element, corpus)
336+
te = dict(techniqueID=element['techniqueID'])
337+
if 'tactic' in element:
338+
te['tactic'] = element['tactic']
339+
temp = self._grabDict(te, corpus)
327340
listing = {}
328341
for k in temp.keys():
329342
if temp[k] != {}:

scripts/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ This folder contains one-off scripts for working with ATT&CK content. These scri
77
| [techniques_from_data_source.py](techniques_from_data_source.py) | Fetches the current ATT&CK STIX 2.0 objects from the ATT&CK TAXII server, prints all of the data sources listed in Enterprise ATT&CK, and then lists all the Enterprise techniques containing a given data source. Run `python3 techniques_from_data_source.py -h` for usage instructions. |
88
| [techniques_data_sources_vis.py](techniques_data_sources_vis.py) | Generate the csv data used to create the "Techniques Mapped to Data Sources" visualization in the ATT&CK roadmap. Run `python3 techniques_data_sources_vis.py -h` for usage instructions. |
99
| [diff_stix.py](diff_stix.py) | Create markdown and/or ATT&CK Navigator layers reporting on the changes between two versions of the STIX2 bundles representing the ATT&CK content. For default operation, put [enterprise-attack.json](https://github.com/mitre/cti/blob/master/enterprise-attack/enterprise-attack.json), [mobile-attack.json](https://github.com/mitre/cti/blob/master/mobile-attack/mobile-attack.json), and [pre-attack.json](https://github.com/mitre/cti/blob/master/pre-attack/pre-attack.json) bundles in 'old' and 'new' folders for the script to compare. Run `python3 diff_stix.py -h` for full usage instructions. |
10+
| [technique_mappings_to_csv.py](technique_mappings_to_csv.py) | Fetches the current ATT&CK content expressed as STIX2 and creates spreadsheet mapping Techniques with Mitigations, Groups or Software. Run `python3 technique_mappings_to_csv.py -h` for usage instructions. |

scripts/diff_stix.py

Lines changed: 125 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,16 @@ def __init__(
122122
# }
123123
# software...
124124
}
125+
self.stixIDToName = {} # stixID to object name
126+
self.new_subtechnique_of_rels = [] # all subtechnique-of relationships in the new data
127+
self.old_subtechnique_of_rels = [] # all subtechnique-of relationships in the old data
128+
self.new_id_to_technique = {} # stixID => technique for every technique in the new data
129+
self.old_id_to_technique = {} # stixID => technique for every technique in the old data
130+
# build the bove data structures
125131
self.load_data()
132+
# remove duplicate relationships
133+
self.new_subtechnique_of_rels = [i for n, i in enumerate(self.new_subtechnique_of_rels) if i not in self.new_subtechnique_of_rels[n+1:]]
134+
self.old_subtechnique_of_rels = [i for n, i in enumerate(self.old_subtechnique_of_rels) if i not in self.old_subtechnique_of_rels[n+1:]]
126135

127136

128137
def verboseprint(self, *args, **kwargs):
@@ -186,26 +195,43 @@ def load_datastore(data_store):
186195
"data_store": data_store
187196
}
188197

198+
def parse_subtechniques(data_store, new=False):
199+
# parse dataStore sub-technique-of relationships
200+
if new:
201+
for technique in list(data_store.query(attackTypeToStixFilter["technique"])):
202+
self.new_id_to_technique[technique["id"]] = technique
203+
self.new_subtechnique_of_rels += list(data_store.query([
204+
Filter("type", "=", "relationship"),
205+
Filter("relationship_type", "=", "subtechnique-of")
206+
]))
207+
else:
208+
for technique in list(data_store.query(attackTypeToStixFilter["technique"])):
209+
self.old_id_to_technique[technique["id"]] = technique
210+
self.old_subtechnique_of_rels += list(data_store.query([
211+
Filter("type", "=", "relationship"),
212+
Filter("relationship_type", "=", "subtechnique-of")
213+
]))
214+
189215
# load data from directory according to domain
190-
def load_dir(dir):
216+
def load_dir(dir, new=False):
191217
data_store = MemoryStore()
192218
datafile = os.path.join(dir, domain + ".json")
193219
data_store.load_from_file(datafile)
194-
220+
parse_subtechniques(data_store, new)
195221
return load_datastore(data_store)
196222

197223
# load data from TAXII server according to domain
198-
def load_taxii():
224+
def load_taxii(new=False):
199225
collection = Collection("https://cti-taxii.mitre.org/stix/collections/" + domainToTaxiiCollectionId[domain])
200226
data_store = TAXIICollectionSource(collection)
201-
227+
parse_subtechniques(data_store, new)
202228
return load_datastore(data_store)
203229

204230
if self.use_taxii:
205-
old = load_taxii()
231+
old = load_taxii(False)
206232
else:
207-
old = load_dir(self.old)
208-
new = load_dir(self.new)
233+
old = load_dir(self.old, False)
234+
new = load_dir(self.new, True)
209235

210236
intersection = old["keys"] & new["keys"]
211237
additions = new["keys"] - old["keys"]
@@ -226,7 +252,12 @@ def load_taxii():
226252
Filter('type', '=', 'relationship'),
227253
Filter('relationship_type', '=', 'revoked-by'),
228254
Filter('source_ref', '=', key)
229-
])[0]["target_ref"]
255+
])
256+
if (len(revoked_by_key) == 0):
257+
print("WARNING: revoked object", key, "has no revoked-by relationship")
258+
continue
259+
else: revoked_by_key = revoked_by_key[0]["target_ref"]
260+
230261
new["id_to_obj"][key]["revoked_by"] = new["id_to_obj"][revoked_by_key]
231262

232263
revocations.add(key)
@@ -240,11 +271,11 @@ def load_taxii():
240271
try:
241272
old_version = float(old["id_to_obj"][key]["x_mitre_version"])
242273
except:
243-
print("old\n\t" +key)
274+
print("ERROR: cannot get old version for object: " + key)
244275
try:
245276
new_version = float(new["id_to_obj"][key]["x_mitre_version"])
246277
except:
247-
print("new\n\t" + key)
278+
print("ERROR: cannot get new version for object: " + key)
248279

249280
# check for changes
250281
if new_version > old_version:
@@ -307,12 +338,92 @@ def get_md_key(self):
307338
key += "\n" + "* Object deletions: " + statusDescriptions['deletions']
308339
return f"{key}"
309340

341+
def has_subtechniques(self, sdo, new=False):
342+
"""return true or false depending on whether the SDO has sub-techniques. new determines whether to parse from the new or old data"""
343+
if new: return len(list(filter(lambda rel: rel["target_ref"] == sdo["id"], self.new_subtechnique_of_rels))) > 0
344+
else: return len(list(filter(lambda rel: rel["target_ref"] == sdo["id"], self.old_subtechnique_of_rels))) > 0
310345

311346
def get_markdown_string(self):
312347
"""
313348
Return a markdown string summarizing detected differences.
314349
"""
315350

351+
def getSectionList(items, obj_type, section):
352+
"""
353+
parse a list of items in a section and return a string for the items
354+
"""
355+
356+
# get parents which have children
357+
childless = list(filter(lambda item: not self.has_subtechniques(item, True) and not ("x_mitre_is_subtechnique" in item and item["x_mitre_is_subtechnique"]), items))
358+
parents = list(filter(lambda item: self.has_subtechniques(item, True) and not ("x_mitre_is_subtechnique" in item and item["x_mitre_is_subtechnique"]), items))
359+
children = { item["id"]: item for item in filter(lambda item: "x_mitre_is_subtechnique" in item and item["x_mitre_is_subtechnique"], items) }
360+
361+
subtechnique_of_rels = self.new_subtechnique_of_rels if section != "deletions" else self.old_subtechnique_of_rels
362+
id_to_technique = self.new_id_to_technique if section != "deletions" else self.old_id_to_technique
363+
364+
parentToChildren = {} # stixID => [ children ]
365+
for relationship in subtechnique_of_rels:
366+
if relationship["target_ref"] in parentToChildren:
367+
if relationship["source_ref"] in children:
368+
parentToChildren[relationship["target_ref"]].append(children[relationship["source_ref"]])
369+
else:
370+
if relationship["source_ref"] in children:
371+
parentToChildren[relationship["target_ref"]] = children[relationship["source_ref"]]
372+
parentToChildren[relationship["target_ref"]] = [ children[relationship["source_ref"]] ]
373+
374+
375+
# now group parents and children
376+
groupings = []
377+
378+
for parent in childless + parents:
379+
parent_children = parentToChildren.pop(parent["id"]) if parent["id"] in parentToChildren else []
380+
groupings.append({
381+
"parent": parent,
382+
"parentInSection": True,
383+
"children": parent_children
384+
})
385+
386+
for parentID in parentToChildren:
387+
groupings.append({
388+
"parent": id_to_technique[parentID],
389+
"parentInSection": False,
390+
"children": parentToChildren[parentID]
391+
})
392+
393+
groupings = sorted(groupings, key=lambda grouping: grouping["parent"]["name"])
394+
395+
def placard(item):
396+
"""get a section list item for the given SDO according to section type"""
397+
if section == "revocations":
398+
revoker = item['revoked_by']
399+
if "x_mitre_is_subtechnique" in revoker and revoker["x_mitre_is_subtechnique"]:
400+
# get revoking technique's parent for display
401+
parentID = list(filter(lambda rel: rel["source_ref"] == revoker["id"], subtechnique_of_rels))[0]["target_ref"]
402+
parentName = id_to_technique[parentID]["name"] if parentID in id_to_technique else "ERROR NO PARENT"
403+
return f"{item['name']} (revoked by { parentName}: [{revoker['name']}]({self.site_prefix}/{self.getUrlFromStix(revoker)}))"
404+
else:
405+
return f"{item['name']} (revoked by [{revoker['name']}]({self.site_prefix}/{self.getUrlFromStix(revoker)}))"
406+
if section == "deletions":
407+
return f"{item['name']}"
408+
else:
409+
return f"[{item['name']}]({self.site_prefix}/{self.getUrlFromStix(item)})"
410+
411+
412+
# build sectionList string
413+
sectionString = ""
414+
for grouping in groupings:
415+
if grouping["parentInSection"]:
416+
sectionString += f"* { placard(grouping['parent']) }\n"
417+
# else:
418+
# sectionString += f"* _{grouping['parent']['name']}_\n"
419+
for child in sorted(grouping["children"], key=lambda child: child["name"]):
420+
if grouping["parentInSection"]:
421+
sectionString += f"\t* {placard(child) }\n"
422+
else:
423+
sectionString += f"* { grouping['parent']['name'] }: { placard(child) }\n"
424+
425+
return sectionString
426+
316427
self.verboseprint("generating markdown string... ", end="", flush="true")
317428

318429
content = ""
@@ -321,17 +432,9 @@ def get_markdown_string(self):
321432
for domain in self.data[obj_type]:
322433
domain_sections = ""
323434
for section in self.data[obj_type][domain]:
324-
if section == "revocations":
325-
# handle revoked by
326-
section_items = list(map(lambda d: f"* {d['name']} (revoked by [{d['revoked_by']['name']}]({self.site_prefix}/{self.getUrlFromStix(d['revoked_by'])}))", self.data[obj_type][domain][section]))
327-
elif section == "deletions":
328-
section_items = list(map(lambda d: f"* {d['name']}", self.data[obj_type][domain][section]))
329-
else:
330-
section_items = list(map(lambda d: f"* [{d['name']}]({self.site_prefix}/{self.getUrlFromStix(d)})", self.data[obj_type][domain][section]))
331-
332-
if len(section_items) > 0:
333-
section_items = "\n".join(sorted(section_items))
334-
else:
435+
if len(self.data[obj_type][domain][section]) > 0: # if there are items in the section
436+
section_items = getSectionList(self.data[obj_type][domain][section], obj_type, section)
437+
else: # no items in section
335438
section_items = "No changes"
336439
header = sectionNameToSectionHeaders[section] + ":"
337440
if "{obj_type}" in header:
@@ -385,7 +488,7 @@ def get_layers_dict(self):
385488

386489
# build layer structure
387490
layer_json = {
388-
"version": "2.2",
491+
"version": "3.0",
389492
"name": f"{thedate} {domainToDomainLabel[domain]} Updates",
390493
"description": f"{domainToDomainLabel[domain]} updates for the {thedate} release of ATT&CK",
391494
"domain": domainToLayerFileDomain[domain],

0 commit comments

Comments
 (0)