Skip to content

Commit 38d323c

Browse files
laciKEsebix
authored andcommitted
Improve STIX patterns parsing
Use the official STIX2 Pattern Validator to get thecomparison expressions and extracts simple IoCs from them. Support for URLs, Domains, IPv4, IPv6 and also for MD5, SHA-1 and SHA-256 hashes. Small fixes and workarounds implemented to address certain anomalies in STIX data provided by some vendors (e.g. ETI) - SHA1 and SHA256 keywords accepted, invalid objects reported as Domains or URLs are dropped without throwing the exceptions
1 parent fd8ed84 commit 38d323c

File tree

3 files changed

+154
-47
lines changed

3 files changed

+154
-47
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# SPDX-FileCopyrightText: 2025 Ladislav Baco
2+
# SPDX-License-Identifier: AGPL-3.0-or-later
3+
4+
stix2-patterns>=2.0.0

intelmq/bots/parsers/stix/parser.py

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,48 @@
77

88
import json
99

10+
1011
from intelmq.lib.bot import ParserBot
1112

13+
try:
14+
import stix2patterns.v21.pattern as stix2_pattern
15+
except ImportError:
16+
stix2_pattern = None
17+
1218

1319
class StixParserBot(ParserBot):
1420
"""Parse STIX indicators"""
1521
parse = ParserBot.parse_json_stream
1622
recover_line = ParserBot.recover_line_json_stream
1723

24+
def init(self):
25+
if stix2_pattern is None:
26+
raise MissingDependencyError('stix2-patterns')
27+
1828
def parse_line(self, line, report):
1929
""" Parse one STIX object of indicator type """
2030
object_type = line.get('type', '')
2131
if object_type == 'indicator':
22-
event = self.new_event(report)
23-
event.add('raw', json.dumps(line))
24-
event.add('comment', line.get('description', ''))
25-
event.add('extra.labels', line.get('labels', None))
26-
event.add('time.source', line.get('valid_from', '1970-01-01T00:00:00Z'))
27-
# classification will be determined by expert bot specific for given TAXII collection
28-
event.add('classification.type', 'undetermined')
29-
3032
pattern = line.get('pattern', '')
3133
# stix, pcre, sigma, snort, suricata, yara
3234
pattern_type = line.get('pattern_type', '')
3335

3436
if pattern_type == 'stix':
35-
indicator = self.parse_stix_pattern(pattern)
36-
if indicator:
37-
event.add(indicator[0], indicator[1])
37+
indicators = StixParserBot.parse_stix_pattern(pattern, self.logger)
38+
for indicator_type, indicator_value in indicators:
39+
event = self.new_event(report)
40+
event.add('raw', json.dumps(line))
41+
event.add('comment', line.get('description', ''))
42+
event.add('extra.labels', line.get('labels', None))
43+
event.add('time.source', line.get('valid_from', '1970-01-01T00:00:00Z'))
44+
45+
# IP address may be passed in Domain feeds or Domain may be passed in URL feeds
46+
# It violates the STIX format, however, in some sources it happens (e.g. in ETI)
47+
# Drop such events without failures and exceptions which slowing down the processing
48+
event.add(indicator_type, indicator_value, raise_failure=False)
49+
50+
# classification can be overridden by vendor-specific parser below
51+
event.add('classification.type', 'undetermined')
3852
self.parse_vendor_specific(event, line, report)
3953
yield event
4054
else:
@@ -51,38 +65,84 @@ def parse_vendor_specific(self, event, line, report):
5165
return
5266

5367
@staticmethod
54-
def parse_stix_pattern(pattern):
68+
def _get_value_from_comparison_expression(comparison, logger=None):
69+
"""
70+
STIX Comparison Expressions:
71+
https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_boiciucr9smf
72+
73+
comparison is a tuple obtained from stix2patterns.v21.pattern.Pattern(pattern).inspect().comparisons,
74+
e.g. (['value'], '=', "'http://example.org'"), (['value'], '=', "'127.0.0.1/32'")
75+
"""
76+
if len(comparison) != 3:
77+
if logger:
78+
logger.warning('Unexpected Comparison Expressions. Expression: {}'.format(comparison))
79+
return
80+
81+
property_name, operator, value = comparison
82+
supported_property_names = [['value'],
83+
['hashes', 'MD5'],
84+
['hashes', 'SHA-1'],
85+
['hashes', 'SHA-256'],
86+
# Based on 10.7 Hashing Algorithm Vocabulary, these keys are not valid, but they are used in some feeds (e.g. ETI)
87+
# https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_ths0b11wzxv3
88+
['hashes', 'SHA1'],
89+
['hashes', 'SHA256']
90+
]
91+
if not (property_name in supported_property_names) or (operator != '=') or not value.startswith("'") or not value.endswith("'"):
92+
if logger:
93+
logger.info('Unsupported Comparison Expression. Only Comparison Expressions with "equal" comparison operator and "value" or "hashes" property are supported. Expression: {}'.format(comparison))
94+
return
95+
96+
# remove single quotes from returned value
97+
return value[1:-1]
98+
99+
@staticmethod
100+
def parse_stix_pattern(pattern, logger=None):
55101
"""
56102
STIX Patterning:
57103
https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_e8slinrhxcc9
58104
"""
59-
if pattern.count('[') != 1:
60-
print('Unsupported Pattern Expression. Only single Observation Expression is supported. Pattern: {}'.format(pattern))
61-
return
62105

63-
value = pattern.split("'")[1]
64-
if pattern.startswith('[url:value = '):
65-
return ('source.url', value)
66-
if pattern.startswith('[domain-name:value = '):
67-
return ('source.fqdn', value)
68-
if pattern.startswith('[ipv4-addr:value = '):
69-
# remove port, sometimes the port is present in ETI
70-
value = value.split(':')[0]
71-
# strip CIDR if IPv4 network contains single host only
72-
value = value[:-3] if value.endswith('/32') else value
73-
# check if pattern is in CIDR notation
74-
if value.rfind('/') > -1:
75-
return ('source.network', value)
76-
else:
77-
return ('source.ip', value)
78-
if pattern.startswith('[ipv6-addr:value = '):
79-
# strip CIDR if IPv6 network contains single host only
80-
value = value[:-4] if value.endswith('/128') else value
81-
# check if pattern is in CIDR notation
82-
if value.rfind('/') > -1:
83-
return ('source.network', value)
84-
else:
85-
return ('source.ip', value)
106+
indicators = []
107+
comparisons = stix2_pattern.Pattern(pattern).inspect().comparisons
108+
for key in comparisons.keys():
109+
comparison_expressions = comparisons.get(key, [])
110+
for comparison in comparison_expressions:
111+
value = StixParserBot._get_value_from_comparison_expression(comparison, logger)
112+
if not value:
113+
pass
114+
if key == 'url':
115+
indicators.append(('source.url', value))
116+
elif key == 'domain-name':
117+
indicators.append(('source.fqdn', value))
118+
elif key == 'ipv4-addr':
119+
# remove port, sometimes the port is present in ETI
120+
value = value.split(':')[0]
121+
# strip CIDR if IPv4 network contains single host only
122+
value = value[:-3] if value.endswith('/32') else value
123+
# check if pattern is in CIDR notation
124+
if value.rfind('/') > -1:
125+
indicators.append(('source.network', value))
126+
else:
127+
indicators.append(('source.ip', value))
128+
elif key == 'ipv6-addr':
129+
# strip CIDR if IPv6 network contains single host only
130+
value = value[:-4] if value.endswith('/128') else value
131+
# check if pattern is in CIDR notation
132+
if value.rfind('/') > -1:
133+
indicators.append(('source.network', value))
134+
else:
135+
indicators.append(('source.ip', value))
136+
elif key == 'file':
137+
if len(comparison) == 3 and len(comparison[0]) == 2 and comparison[0][0] == 'hashes':
138+
# converts MD5, SHA-1, SHA1, SHA-256, SHA256 to md5, sha1, sha256 used in IntelMQ
139+
hash_algo = comparison[0][1].lower().replace('-', '')
140+
indicators.append(('malware.hash.' + hash_algo, value))
141+
else:
142+
if logger:
143+
logger.warning('Unsupported Object Type "{}" in Pattern Expression. Pattern: {}'.format(key, pattern))
144+
145+
return indicators
86146

87147

88148
BOT = StixParserBot

intelmq/tests/bots/parsers/stix/test_parser_bot.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
}
4141

4242

43+
@test.skip_exotic()
4344
class TestStixParserBot(test.BotTestCase, unittest.TestCase):
4445
"""
4546
A TestCase for a StixParserBot.
@@ -58,52 +59,94 @@ def test_event(self):
5859

5960
def test_pattern_url(self):
6061
""" Test if url pattern is parsed. """
61-
indicator = self.bot_reference.parse_stix_pattern("[url:value = 'http://example.org']")
62+
indicator = self.bot_reference.parse_stix_pattern("[url:value = 'http://example.org']")[0]
6263
self.assertEqual(str(indicator[0]), 'source.url')
6364
self.assertEqual(str(indicator[1]), 'http://example.org')
6465

65-
def test_pattern_url(self):
66+
def test_pattern_domain(self):
6667
""" Test if domain pattern is parsed. """
67-
indicator = self.bot_reference.parse_stix_pattern("[domain-name:value = 'example.org']")
68+
indicator = self.bot_reference.parse_stix_pattern("[domain-name:value = 'example.org']")[0]
6869
self.assertEqual(str(indicator[0]), 'source.fqdn')
6970
self.assertEqual(str(indicator[1]), 'example.org')
7071

7172
def test_pattern_ipv4(self):
7273
""" Test if ipv4 pattern is parsed. """
73-
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.1']")
74+
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.1']")[0]
7475
self.assertEqual(str(indicator[0]), 'source.ip')
7576
self.assertEqual(str(indicator[1]), '127.0.0.1')
7677

7778
def test_pattern_ipv4_cidr(self):
7879
""" Test if ipv4 cidr pattern is parsed. """
79-
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.0/8']")
80+
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.0/8']")[0]
8081
self.assertEqual(str(indicator[0]), 'source.network')
8182
self.assertEqual(str(indicator[1]), '127.0.0.0/8')
8283

8384
def test_pattern_ipv4_cidr_single_host(self):
8485
""" Test if ipv4 cidr with single host pattern is parsed. """
85-
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.1/32']")
86+
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.1/32']")[0]
8687
self.assertEqual(str(indicator[0]), 'source.ip')
8788
self.assertEqual(str(indicator[1]), '127.0.0.1')
8889

8990
def test_pattern_ipv6(self):
9091
""" Test if ipv6 pattern is parsed. """
91-
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = '::1']")
92+
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = '::1']")[0]
9293
self.assertEqual(str(indicator[0]), 'source.ip')
9394
self.assertEqual(str(indicator[1]), '::1')
9495

9596
def test_pattern_ipv6_cidr(self):
9697
""" Test if ipv6 cidr pattern is parsed. """
97-
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = 'fe:80::/10']")
98+
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = 'fe:80::/10']")[0]
9899
self.assertEqual(str(indicator[0]), 'source.network')
99100
self.assertEqual(str(indicator[1]), 'fe:80::/10')
100101

101102
def test_pattern_ipv6_cidr_single_host(self):
102103
""" Test if ipv6 cidr with single host pattern is parsed. """
103-
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = 'fe:80::1/128']")
104+
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = 'fe:80::1/128']")[0]
104105
self.assertEqual(str(indicator[0]), 'source.ip')
105106
self.assertEqual(str(indicator[1]), 'fe:80::1')
106107

108+
def test_pattern_hash_md5(self):
109+
""" Test if domain pattern is parsed. """
110+
indicator = self.bot_reference.parse_stix_pattern("[file:hashes.MD5 = '44d88612fea8a8f36de82e1278abb02f']")[0]
111+
self.assertEqual(str(indicator[0]), 'malware.hash.md5')
112+
self.assertEqual(str(indicator[1]), '44d88612fea8a8f36de82e1278abb02f')
113+
114+
def test_pattern_hash_sha1(self):
115+
""" Test if domain pattern is parsed. """
116+
indicator = self.bot_reference.parse_stix_pattern("[file:hashes.'SHA-1' = '3395856ce81f2b7382dee72602f798b642f14140']")[0]
117+
self.assertEqual(str(indicator[0]), 'malware.hash.sha1')
118+
self.assertEqual(str(indicator[1]), '3395856ce81f2b7382dee72602f798b642f14140')
119+
120+
# Based on 10.7 Hashing Algorithm Vocabulary, keys SHA1 and SHA256 are not valid, but they are used in some feeds (e.g. ETI)
121+
# https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_ths0b11wzxv3
122+
indicator = self.bot_reference.parse_stix_pattern("[file:hashes.SHA1 = '3395856ce81f2b7382dee72602f798b642f14140']")[0]
123+
self.assertEqual(str(indicator[0]), 'malware.hash.sha1')
124+
self.assertEqual(str(indicator[1]), '3395856ce81f2b7382dee72602f798b642f14140')
125+
126+
def test_pattern_hash_sha256(self):
127+
""" Test if domain pattern is parsed. """
128+
indicator = self.bot_reference.parse_stix_pattern("[file:hashes.'SHA-256' = '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f']")[0]
129+
self.assertEqual(str(indicator[0]), 'malware.hash.sha256')
130+
self.assertEqual(str(indicator[1]), '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f')
131+
132+
# Based on 10.7 Hashing Algorithm Vocabulary, keys SHA1 and SHA256 are not valid, but they are used in some feeds (e.g. ETI)
133+
# https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_ths0b11wzxv3
134+
indicator = self.bot_reference.parse_stix_pattern("[file:hashes.SHA256 = '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f']")[0]
135+
self.assertEqual(str(indicator[0]), 'malware.hash.sha256')
136+
self.assertEqual(str(indicator[1]), '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f')
137+
138+
def test_complex_pattern1(self):
139+
""" Test if complex pattern is parsed. """
140+
indicators = self.bot_reference.parse_stix_pattern("[url:value = 'http://example.org' AND ipv4-addr:value = '127.0.0.1/32']")
141+
self.assertEqual(('source.url', 'http://example.org') in indicators, True)
142+
self.assertEqual(('source.ip', '127.0.0.1') in indicators, True)
143+
144+
def test_complex_pattern2(self):
145+
""" Test if complex pattern is parsed. """
146+
indicators = self.bot_reference.parse_stix_pattern("[url:value = 'http://example.org'] AND [ipv4-addr:value = '127.0.0.1/32']")
147+
self.assertEqual(('source.url', 'http://example.org') in indicators, True)
148+
self.assertEqual(('source.ip', '127.0.0.1') in indicators, True)
149+
107150

108151
if __name__ == '__main__': # pragma: no cover
109152
unittest.main()

0 commit comments

Comments
 (0)