Skip to content

Commit 316af6b

Browse files
laciKEsebix
authored andcommitted
Add TAXII Collector bot and STIX Parser bot
As a bare minimum, TAXII Collector currently collects only the objects of type indicator. These objects contain information about indicators and the detection patterns, e.g. in stix, pcre, sigma, snort, suricata, yara format. The pattern, pattern_type and valid_from properties are required, while confidence, description and labels are only optional properties. However, they are present in several TAXII feeds and could be used to determine classification.taxonomy and classification.type even without processing the relationships of the indicators (e.g. indicator indicates malware) STIX Parser is currently capable of parsing objects of type indicator (usually retrieved from the TAXII Collector). From the indicator objects, it extracts the detection pattern (currently only the single Observation Expressions in STIX format are supported). It supports IP addresses, Domains and URLs indicator values. Moreover, this parser also attempts to extract some optional properties of STIX objects such as description and labels, which can be useful for futher classification of the event with the Expert Bots TAXII Collector tests for missing parameters and mock the simple TAXII server providing minimal collection with simple indicator object STIX Parser tests fo indicator patterns parsing Improvements based on @sebix comments, collection title used as feed.code Fix codestyle in TAXII and STIX bots Fix Python 3.8 support in STIX Parser bot
1 parent c2cc657 commit 316af6b

File tree

7 files changed

+365
-0
lines changed

7 files changed

+365
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# SPDX-FileCopyrightText: 2025 Ladislav Baco
2+
# SPDX-License-Identifier: AGPL-3.0-or-later
3+
4+
taxii2-client>=2.3.0

intelmq/bots/collectors/taxii/__init__.py

Whitespace-only changes.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""
2+
SPDX-FileCopyrightText: 2025 Ladislav Baco
3+
SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
Get indicator objects from TAXII server
6+
7+
Configuration parameters: taxii collection (feed) url, username and password.
8+
"""
9+
10+
import datetime
11+
import json
12+
from requests.exceptions import HTTPError
13+
14+
from intelmq.lib.bot import CollectorBot
15+
from intelmq.lib.exceptions import MissingDependencyError
16+
17+
try:
18+
import taxii2client.v21 as taxii2
19+
except ImportError:
20+
taxii2 = None
21+
22+
23+
class TaxiiCollectorBot(CollectorBot):
24+
"""Collect data from TAXII Server"""
25+
collection: str = None
26+
username: str = None
27+
password: str = None
28+
rate_limit: int = 3600
29+
time_delta: int = 3600
30+
31+
def init(self):
32+
if taxii2 is None:
33+
raise MissingDependencyError('taxii2-client')
34+
35+
if self.collection is None:
36+
raise ValueError('No TAXII collection URL provided.')
37+
if self.username is None:
38+
raise ValueError('No TAXII username provided.')
39+
if self.password is None:
40+
raise ValueError('No TAXII password provided.')
41+
42+
self._date_after = datetime.datetime.now() - datetime.timedelta(seconds=int(self.time_delta))
43+
44+
self._taxii_collection = taxii2.Collection(self.collection, user=self.username, password=self.password)
45+
46+
def process(self):
47+
try:
48+
title = self._taxii_collection.title
49+
self.logger.info('Collection title: %r.', title)
50+
51+
# get the indicator objects
52+
objects = self._taxii_collection.get_objects(added_after=self._date_after, type='indicator').get('objects', [])
53+
for obj in objects:
54+
report = self.new_report()
55+
report.add('raw', json.dumps(obj))
56+
report.add('feed.url', self.collection)
57+
report.add('feed.code', title)
58+
self.send_message(report)
59+
60+
except HTTPError as e:
61+
self.logger.error('Connection error: %r!', e)
62+
63+
64+
BOT = TaxiiCollectorBot

intelmq/bots/parsers/stix/__init__.py

Whitespace-only changes.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""
2+
SPDX-FileCopyrightText: 2025 Ladislav Baco
3+
SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
Parse indicators objects in STIX format received from TAXII collector
6+
"""
7+
8+
import json
9+
10+
from intelmq.lib.bot import ParserBot
11+
12+
13+
class StixParserBot(ParserBot):
14+
"""Parse STIX indicators"""
15+
parse = ParserBot.parse_json_stream
16+
recover_line = ParserBot.recover_line_json_stream
17+
18+
def parse_line(self, line, report):
19+
""" Parse one STIX object of indicator type """
20+
object_type = line.get('type', '')
21+
if object_type == 'indicator':
22+
event = self.new_event(report)
23+
event.add('raw', json.dumps(line))
24+
event.add('comment', line.get('description', ''))
25+
event.add('extra.labels', line.get('labels', None))
26+
event.add('time.source', line.get('valid_from', '1970-01-01T00:00:00Z'))
27+
# classification will be determined by expert bot specific for given TAXII collection
28+
event.add('classification.type', 'undetermined')
29+
30+
pattern = line.get('pattern', '')
31+
# stix, pcre, sigma, snort, suricata, yara
32+
pattern_type = line.get('pattern_type', '')
33+
34+
if pattern_type == 'stix':
35+
indicator = self.parse_stix_pattern(pattern)
36+
if indicator:
37+
event.add(indicator[0], indicator[1])
38+
yield event
39+
else:
40+
self.logger.warning('Unexpected type of pattern expression: %r, pattern: %r', pattern_type, pattern)
41+
else:
42+
self.logger.warning('Unexpected type of STIX object: %r', object_type)
43+
44+
@staticmethod
45+
def parse_stix_pattern(pattern):
46+
"""
47+
STIX Patterning:
48+
https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_e8slinrhxcc9
49+
"""
50+
if pattern.count('[') != 1:
51+
print('Unsupported Pattern Expression. Only single Observation Expression is supported. Pattern: {}'.format(pattern))
52+
return
53+
54+
value = pattern.split("'")[1]
55+
if pattern.startswith('[url:value = '):
56+
return ('source.url', value)
57+
if pattern.startswith('[domain-name:value = '):
58+
return ('source.fqdn', value)
59+
if pattern.startswith('[ipv4-addr:value = '):
60+
# remove port, sometimes the port is present in ETI
61+
value = value.split(':')[0]
62+
# strip CIDR if IPv4 network contains single host only
63+
value = value[:-3] if value.endswith('/32') else value
64+
# check if pattern is in CIDR notation
65+
if value.rfind('/') > -1:
66+
return ('source.network', value)
67+
else:
68+
return ('source.ip', value)
69+
if pattern.startswith('[ipv6-addr:value = '):
70+
# strip CIDR if IPv6 network contains single host only
71+
value = value[:-4] if value.endswith('/128') else value
72+
# check if pattern is in CIDR notation
73+
if value.rfind('/') > -1:
74+
return ('source.network', value)
75+
else:
76+
return ('source.ip', value)
77+
78+
79+
BOT = StixParserBot
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# SPDX-FileCopyrightText: 2016 Sebastian Wagner, 2025 Ladislav Baco
2+
#
3+
# SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
# -*- coding: utf-8 -*-
6+
"""
7+
Test with reports, based on intelmq/tests/lib/test_collector_bot.py
8+
"""
9+
import unittest
10+
11+
import re
12+
import requests_mock
13+
14+
import intelmq.lib.bot as bot
15+
import intelmq.lib.test as test
16+
from intelmq.bots.collectors.taxii.collector import TaxiiCollectorBot
17+
18+
19+
EXAMPLE_REPORT = {'__type': 'Report',
20+
'feed.name': 'Taxii Feed',
21+
'feed.code': 'feed stix2.1',
22+
'feed.provider': 'Taxii Provider',
23+
'feed.documentation': 'Taxii Documentation',
24+
'feed.accuracy': 100.0,
25+
'feed.url': 'http://localhost/feed',
26+
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoifQ=='
27+
}
28+
29+
def prepare_mocker(mocker):
30+
mocker.get(
31+
'http://localhost/feed/',
32+
json={
33+
'id': 'feed',
34+
'title': 'feed stix2.1',
35+
'can_read': True,
36+
'can_write': False
37+
},
38+
headers={'Content-Type': 'application/taxii+json;version=2.1'}
39+
)
40+
mocker.get(
41+
re.compile('http://localhost/feed/objects/.*'),
42+
json={
43+
'id': 'feed',
44+
'title': 'feed stix2.1',
45+
'can_read': True,
46+
'can_write': False,
47+
'more': False,
48+
'objects': [{
49+
'id': 'indicator--0',
50+
'type': 'indicator',
51+
'spec_version': '2.1',
52+
'created': '1970-01-01T00:00:00.000Z',
53+
'modified': '1970-01-01T00:00:00.000Z',
54+
'pattern': "[url:value = 'http://example.org']",
55+
'pattern_type': 'stix',
56+
'valid_from': '1970-01-01T00:00:00Z'
57+
}]},
58+
headers={'Content-Type': 'application/taxii+json;version=2.1'}
59+
)
60+
61+
@test.skip_exotic()
62+
@requests_mock.Mocker()
63+
class TestTaxiiCollectorBot(test.BotTestCase, unittest.TestCase):
64+
"""
65+
A TestCase for a TaxiiCollectorBot.
66+
"""
67+
68+
@classmethod
69+
def set_bot(cls):
70+
cls.bot_reference = TaxiiCollectorBot
71+
cls.sysconfig = {'name': 'Taxii Feed',
72+
'provider': 'Taxii Provider',
73+
'documentation': 'Taxii Documentation',
74+
'collection': 'http://localhost/feed',
75+
'username': 'user',
76+
'password': 'pass'
77+
}
78+
79+
def test_event(self, mocker):
80+
""" Test if correct Event has been produced. """
81+
prepare_mocker(mocker)
82+
self.run_bot()
83+
self.assertMessageEqual(0, EXAMPLE_REPORT)
84+
85+
def test_missing_collection(self, mocker):
86+
""" Test if missing collection is detected. """
87+
with self.assertRaises(ValueError) as context:
88+
self.run_bot(parameters={'collection': None})
89+
exception = context.exception
90+
self.assertEqual(str(exception), 'No TAXII collection URL provided.')
91+
92+
def test_missing_username(self, mocker):
93+
""" Test if missing username is detected. """
94+
with self.assertRaises(ValueError) as context:
95+
self.run_bot(parameters={'username': None})
96+
exception = context.exception
97+
self.assertEqual(str(exception), 'No TAXII username provided.')
98+
99+
def test_missing_password(self, mocker):
100+
""" Test if missing password is detected. """
101+
with self.assertRaises(ValueError) as context:
102+
self.run_bot(parameters={'password': None})
103+
exception = context.exception
104+
self.assertEqual(str(exception), 'No TAXII password provided.')
105+
106+
107+
if __name__ == '__main__': # pragma: no cover
108+
unittest.main()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# SPDX-FileCopyrightText: 2025 Ladislav Baco
2+
#
3+
# SPDX-License-Identifier: AGPL-3.0-or-later
4+
5+
# -*- coding: utf-8 -*-
6+
"""
7+
Test with example reports (STIX objects usually collected from TAXII server)
8+
"""
9+
import unittest
10+
11+
import re
12+
import requests_mock
13+
14+
import intelmq.lib.bot as bot
15+
import intelmq.lib.test as test
16+
from intelmq.bots.parsers.stix.parser import StixParserBot
17+
18+
19+
EXAMPLE_REPORT = {'__type': 'Report',
20+
'feed.name': 'Taxii Feed',
21+
'feed.code': 'feed stix2.1',
22+
'feed.provider': 'Taxii Provider',
23+
'feed.documentation': 'Taxii Documentation',
24+
'feed.accuracy': 100.0,
25+
'feed.url': 'http://localhost/feed',
26+
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoifQ=='
27+
}
28+
29+
EXAMPLE_EVENT = {'__type': 'Event',
30+
'feed.name': 'Taxii Feed',
31+
'feed.code': 'feed stix2.1',
32+
'feed.provider': 'Taxii Provider',
33+
'feed.documentation': 'Taxii Documentation',
34+
'feed.accuracy': 100.0,
35+
'feed.url': 'http://localhost/feed',
36+
'source.url': 'http://example.org',
37+
'time.source': '1970-01-01T00:00:00+00:00',
38+
'classification.type': 'undetermined',
39+
'raw': 'eyJpZCI6ICJpbmRpY2F0b3ItLTAiLCAidHlwZSI6ICJpbmRpY2F0b3IiLCAic3BlY192ZXJzaW9uIjogIjIuMSIsICJjcmVhdGVkIjogIjE5NzAtMDEtMDFUMDA6MDA6MDAuMDAwWiIsICJtb2RpZmllZCI6ICIxOTcwLTAxLTAxVDAwOjAwOjAwLjAwMFoiLCAicGF0dGVybiI6ICJbdXJsOnZhbHVlID0gJ2h0dHA6Ly9leGFtcGxlLm9yZyddIiwgInBhdHRlcm5fdHlwZSI6ICJzdGl4IiwgInZhbGlkX2Zyb20iOiAiMTk3MC0wMS0wMVQwMDowMDowMFoifQ=='
40+
}
41+
42+
43+
class TestStixParserBot(test.BotTestCase, unittest.TestCase):
44+
"""
45+
A TestCase for a StixParserBot.
46+
"""
47+
48+
@classmethod
49+
def set_bot(cls):
50+
cls.bot_reference = StixParserBot
51+
cls.sysconfig = {}
52+
53+
def test_event(self):
54+
""" Test if correct Event has been produced. """
55+
self.input_message = EXAMPLE_REPORT
56+
self.run_bot()
57+
self.assertMessageEqual(0, EXAMPLE_EVENT)
58+
59+
60+
def test_pattern_url(self):
61+
""" Test if url pattern is parsed. """
62+
indicator = self.bot_reference.parse_stix_pattern("[url:value = 'http://example.org']")
63+
self.assertEqual(str(indicator[0]), 'source.url')
64+
self.assertEqual(str(indicator[1]), 'http://example.org')
65+
66+
def test_pattern_url(self):
67+
""" Test if domain pattern is parsed. """
68+
indicator = self.bot_reference.parse_stix_pattern("[domain-name:value = 'example.org']")
69+
self.assertEqual(str(indicator[0]), 'source.fqdn')
70+
self.assertEqual(str(indicator[1]), 'example.org')
71+
72+
def test_pattern_ipv4(self):
73+
""" Test if ipv4 pattern is parsed. """
74+
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.1']")
75+
self.assertEqual(str(indicator[0]), 'source.ip')
76+
self.assertEqual(str(indicator[1]), '127.0.0.1')
77+
78+
def test_pattern_ipv4_cidr(self):
79+
""" Test if ipv4 cidr pattern is parsed. """
80+
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.0/8']")
81+
self.assertEqual(str(indicator[0]), 'source.network')
82+
self.assertEqual(str(indicator[1]), '127.0.0.0/8')
83+
84+
def test_pattern_ipv4_cidr_single_host(self):
85+
""" Test if ipv4 cidr with single host pattern is parsed. """
86+
indicator = self.bot_reference.parse_stix_pattern("[ipv4-addr:value = '127.0.0.1/32']")
87+
self.assertEqual(str(indicator[0]), 'source.ip')
88+
self.assertEqual(str(indicator[1]), '127.0.0.1')
89+
90+
def test_pattern_ipv6(self):
91+
""" Test if ipv6 pattern is parsed. """
92+
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = '::1']")
93+
self.assertEqual(str(indicator[0]), 'source.ip')
94+
self.assertEqual(str(indicator[1]), '::1')
95+
96+
def test_pattern_ipv6_cidr(self):
97+
""" Test if ipv6 cidr pattern is parsed. """
98+
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = 'fe:80::/10']")
99+
self.assertEqual(str(indicator[0]), 'source.network')
100+
self.assertEqual(str(indicator[1]), 'fe:80::/10')
101+
102+
def test_pattern_ipv6_cidr_single_host(self):
103+
""" Test if ipv6 cidr with single host pattern is parsed. """
104+
indicator = self.bot_reference.parse_stix_pattern("[ipv6-addr:value = 'fe:80::1/128']")
105+
self.assertEqual(str(indicator[0]), 'source.ip')
106+
self.assertEqual(str(indicator[1]), 'fe:80::1')
107+
108+
109+
if __name__ == '__main__': # pragma: no cover
110+
unittest.main()

0 commit comments

Comments
 (0)