Skip to content

Commit e7c1235

Browse files
committed
STOMP bots: several updates, fixes and improvements + some refactoring
The updates, fixes and improvements regard the *STOMP collector* and *STOMP output* bots. Important changes are described below... From now on, newer versions of the `stomp.py` package are supported -- including the latest (8.1.0). Now both STOMP bots coerce the `port` configuration parameter to int -- so that a string representing an integer number is also acceptable (even if not recommended) as a value of that parameter. In the *STOMP output* bot, a bug has been fixed: `AttributeError` caused by attempts to get unset attributes (`ssl_ca_cert` and companions...). The *STOMP collector*'s reconnection mechanism has been fixed: from now on, no reconnection attempts are made after `shutdown()`. Apart from that, reconnection is not attempted at all for versions of `stomp.py` older than 4.1.21 (as it did not work properly anyway). Also regarding the *STOMP collector* bot, the following (undocumented and unused) attributes of `StompCollectorBot` instances are no longer set in `init()`: `ssl_ca_cert`, `ssl_cl_cert`, `ssl_cl_cert_key`. Various checks have been improved/enhanced. Now, for example, both STOMP bot classes implement the `check()` static/class method -- whose role is to check ("statically", without the need to run the bot) configuration parameters; in particular, it checks whether necessary certificate files are accessible. When it comes to runtime (on-initialization) checks, one notable improvement is that now also the *STOMP output* bot will raise a `MissingDependencyError` if the `stomp.py` version is older than 4.1.8 (an analogous check has already been implemented by *STOMP collector*). The code of those bot classes have also been significantly refactored -- in particular, several common operations have been factored out and placed in a new mix-in class: `intelmq.lib.mixins.StompMixin`; its definition resides in a new module: `intelmq.lib.mixins.stomp`.
1 parent 8c7bef1 commit e7c1235

File tree

4 files changed

+202
-62
lines changed

4 files changed

+202
-62
lines changed

intelmq/bots/collectors/stomp/collector.py

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

55
# -*- coding: utf-8 -*-
6-
import os.path
76

87
from intelmq.lib.bot import CollectorBot
9-
from intelmq.lib.exceptions import MissingDependencyError
8+
from intelmq.lib.mixins import StompMixin
109

1110
try:
1211
import stomp
12+
import stomp.exception
1313
except ImportError:
1414
stomp = None
1515
else:
@@ -18,9 +18,10 @@ class StompListener(stomp.PrintingListener):
1818
the stomp listener gets called asynchronously for
1919
every STOMP message
2020
"""
21-
def __init__(self, n6stompcollector, conn, destination):
21+
def __init__(self, n6stompcollector, conn, destination, connect_kwargs=None):
2222
self.stompbot = n6stompcollector
2323
self.conn = conn
24+
self.connect_kwargs = connect_kwargs
2425
self.destination = destination
2526
super().__init__()
2627
if stomp.__version__ >= (5, 0, 0):
@@ -29,15 +30,23 @@ def __init__(self, n6stompcollector, conn, destination):
2930

3031
def on_heartbeat_timeout(self):
3132
self.stompbot.logger.info("Heartbeat timeout. Attempting to re-connect.")
32-
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination)
33-
34-
def on_error(self, headers, message):
35-
self.stompbot.logger.error('Received an error: %r.', message)
36-
37-
def on_message(self, headers, message):
38-
self.stompbot.logger.debug('Receive message %r...', message[:500])
33+
if self.stompbot._auto_reconnect:
34+
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination,
35+
connect_kwargs=self.connect_kwargs)
36+
37+
def on_error(self, frame, body=None):
38+
if body is None:
39+
# `stomp.py >= 6.1.0`
40+
body = frame.body
41+
self.stompbot.logger.error('Received an error: %r.', body)
42+
43+
def on_message(self, frame, body=None):
44+
if body is None:
45+
# `stomp.py >= 6.1.0`
46+
body = frame.body
47+
self.stompbot.logger.debug('Receive message %r...', body[:500])
3948
report = self.stompbot.new_report()
40-
report.add("raw", message.rstrip())
49+
report.add("raw", body.rstrip())
4150
report.add("feed.url", "stomp://" +
4251
self.stompbot.server +
4352
":" + str(self.stompbot.port) +
@@ -46,19 +55,23 @@ def on_message(self, headers, message):
4655

4756
def on_disconnected(self):
4857
self.stompbot.logger.debug('Detected disconnect')
49-
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination)
58+
if self.stompbot._auto_reconnect:
59+
connect_and_subscribe(self.conn, self.stompbot.logger, self.destination,
60+
connect_kwargs=self.connect_kwargs)
5061

5162

52-
def connect_and_subscribe(conn, logger, destination, start=False):
63+
def connect_and_subscribe(conn, logger, destination, start=False, connect_kwargs=None):
5364
if start:
5465
conn.start()
55-
conn.connect(wait=True)
66+
if connect_kwargs is None:
67+
connect_kwargs = dict(wait=True)
68+
conn.connect(**connect_kwargs)
5669
conn.subscribe(destination=destination,
5770
id=1, ack='auto')
5871
logger.info('Successfully connected and subscribed.')
5972

6073

61-
class StompCollectorBot(CollectorBot):
74+
class StompCollectorBot(CollectorBot, StompMixin):
6275
"""Collect data from a STOMP Interface"""
6376
""" main class for the STOMP protocol collector """
6477
exchange: str = ''
@@ -73,36 +86,22 @@ class StompCollectorBot(CollectorBot):
7386
__conn = False # define here so shutdown method can check for it
7487

7588
def init(self):
76-
if stomp is None:
77-
raise MissingDependencyError("stomp")
78-
elif stomp.__version__ < (4, 1, 8):
79-
raise MissingDependencyError("stomp", version="4.1.8",
80-
installed=stomp.__version__)
81-
82-
self.ssl_ca_cert = self.ssl_ca_certificate
83-
self.ssl_cl_cert = self.ssl_client_certificate
84-
self.ssl_cl_cert_key = self.ssl_client_certificate_key
85-
86-
# check if certificates exist
87-
for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]:
88-
if not os.path.isfile(f):
89-
raise ValueError("Could not open file %r." % f)
90-
91-
_host = [(self.server, self.port)]
92-
self.__conn = stomp.Connection(host_and_ports=_host, use_ssl=True,
93-
ssl_key_file=self.ssl_cl_cert_key,
94-
ssl_cert_file=self.ssl_cl_cert,
95-
ssl_ca_certs=self.ssl_ca_cert,
96-
heartbeats=(self.heartbeat,
97-
self.heartbeat))
98-
99-
self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange))
89+
self.stomp_bot_runtime_initial_check()
90+
91+
# (note: older versions of `stomp.py` do not play well with reconnects)
92+
self._auto_reconnect = (stomp.__version__ >= (4, 1, 21))
93+
94+
self.__conn, connect_kwargs = self.prepare_stomp_connection()
95+
self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange,
96+
connect_kwargs=connect_kwargs))
10097
connect_and_subscribe(self.__conn, self.logger, self.exchange,
101-
start=stomp.__version__ < (4, 1, 20))
98+
start=stomp.__version__ < (4, 1, 20),
99+
connect_kwargs=connect_kwargs)
102100

103101
def shutdown(self):
104102
if not stomp or not self.__conn:
105103
return
104+
self._auto_reconnect = False
106105
try:
107106
self.__conn.disconnect()
108107
except stomp.exception.NotConnectedException:
@@ -111,5 +110,9 @@ def shutdown(self):
111110
def process(self):
112111
pass
113112

113+
@classmethod
114+
def check(cls, parameters):
115+
return cls.stomp_bot_parameters_check(parameters) or None
116+
114117

115118
BOT = StompCollectorBot

intelmq/bots/outputs/stomp/output.py

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,17 @@
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

55
# -*- coding: utf-8 -*-
6-
import os.path
76

87
from intelmq.lib.bot import OutputBot
9-
from intelmq.lib.exceptions import MissingDependencyError
10-
8+
from intelmq.lib.mixins import StompMixin
119

1210
try:
1311
import stomp
1412
except ImportError:
1513
stomp = None
1614

1715

18-
class StompOutputBot(OutputBot):
16+
class StompOutputBot(OutputBot, StompMixin):
1917
"""Send events to a STMOP server"""
2018
""" main class for the STOMP protocol output bot """
2119
exchange: str = "/exchange/_push"
@@ -35,29 +33,18 @@ class StompOutputBot(OutputBot):
3533
_conn = None
3634

3735
def init(self):
38-
if stomp is None:
39-
raise MissingDependencyError("stomp")
40-
41-
# check if certificates exist
42-
for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]:
43-
if not os.path.isfile(f):
44-
raise ValueError("Could not open SSL (certificate) file '%s'." % f)
45-
46-
_host = [(self.server, self.port)]
47-
self._conn = stomp.Connection(host_and_ports=_host, use_ssl=True,
48-
ssl_key_file=self.ssl_cl_cert_key,
49-
ssl_cert_file=self.ssl_cl_cert,
50-
ssl_ca_certs=self.ssl_ca_cert,
51-
heartbeats=(self.heartbeat,
52-
self.heartbeat))
36+
self.stomp_bot_runtime_initial_check()
37+
(self._conn,
38+
self._connect_kwargs) = self.prepare_stomp_connection()
5339
self.connect()
5440

5541
def connect(self):
5642
self.logger.debug('Connecting.')
5743
# based on the documentation at:
5844
# https://github.com/jasonrbriggs/stomp.py/wiki/Simple-Example
59-
self._conn.start()
60-
self._conn.connect(wait=True)
45+
if stomp.__version__ < (4, 1, 20):
46+
self._conn.start()
47+
self._conn.connect(**self._connect_kwargs)
6148
self.logger.debug('Connected.')
6249

6350
def shutdown(self):
@@ -73,5 +60,9 @@ def process(self):
7360
destination=self.exchange)
7461
self.acknowledge_message()
7562

63+
@classmethod
64+
def check(cls, parameters):
65+
return cls.stomp_bot_parameters_check(parameters) or None
66+
7667

7768
BOT = StompOutputBot

intelmq/lib/mixins/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@
55
from intelmq.lib.mixins.http import HttpMixin
66
from intelmq.lib.mixins.cache import CacheMixin
77
from intelmq.lib.mixins.sql import SQLMixin
8+
from intelmq.lib.mixins.stomp import StompMixin
89

9-
__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin']
10+
__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin', 'StompMixin']

intelmq/lib/mixins/stomp.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
""" StompMixin for IntelMQ
2+
3+
SPDX-FileCopyrightText: 2017 Sebastian Wagner, 2023 NASK
4+
SPDX-License-Identifier: AGPL-3.0-or-later
5+
"""
6+
7+
from typing import (
8+
Any,
9+
Callable,
10+
List,
11+
NoReturn,
12+
Tuple,
13+
)
14+
15+
try:
16+
import stomp
17+
except ImportError:
18+
stomp = None
19+
20+
from intelmq.lib.exceptions import MissingDependencyError
21+
22+
23+
class StompMixin:
24+
25+
"""A mixin that provides certain common methods for STOMP bots."""
26+
27+
#
28+
# STOMP bot attributes relevant to this mixin
29+
30+
server: str
31+
port: int
32+
heartbeat: int
33+
34+
ssl_ca_certificate: str # TODO: could be pathlib.Path
35+
ssl_client_certificate: str # TODO: could be pathlib.Path
36+
ssl_client_certificate_key: str # TODO: could be patlib.Path
37+
38+
#
39+
# Helper methods intended to be used in subclasses
40+
41+
@classmethod
42+
def stomp_bot_parameters_check(cls, parameters: dict) -> List[List[str]]:
43+
"""Intended to be used in bots' `check()` static/class method."""
44+
logs = []
45+
cls.__verify_parameters(
46+
get_param=parameters.get,
47+
on_error=lambda msg: logs.append(['error', msg]),
48+
)
49+
return logs
50+
51+
def stomp_bot_runtime_initial_check(self) -> None:
52+
"""Intended to be used in bots' `init()` instance method."""
53+
self.__verify_dependency()
54+
self.__verify_parameters(
55+
get_param=self.__get_own_attribute,
56+
on_error=self.__raise_value_error,
57+
)
58+
59+
def prepare_stomp_connection(self) -> Tuple['stomp.Connection', dict]:
60+
"""
61+
Get a `(<STOMP connection>, <STOMP connect arguments>)` pair.
62+
63+
* `<STOMP connection>` is a new instance of `stomp.Connection`,
64+
with the SSL stuff *already configured*, but *without* any
65+
invocations of `connect()` made yet;
66+
67+
* `<STOMP connect arguments>` is a dict of arguments -- ready
68+
to be passed to the `connect()` method of the aforementioned
69+
`<STOMP connection>` object.
70+
"""
71+
ssl_kwargs, connect_kwargs = self.__get_ssl_and_connect_kwargs()
72+
host_and_ports = [(self.server, int(self.port))]
73+
stomp_connection = stomp.Connection(host_and_ports=host_and_ports,
74+
heartbeats=(self.heartbeat,
75+
self.heartbeat))
76+
stomp_connection.set_ssl(host_and_ports, **ssl_kwargs)
77+
return stomp_connection, connect_kwargs
78+
79+
#
80+
# Implementation details
81+
82+
_DEPENDENCY_NAME_REMARK = (
83+
"Note that the actual name of the pip-installable "
84+
"distribution package is 'stomp.py', not 'stomp'.")
85+
86+
@classmethod
87+
def __verify_dependency(cls) -> None:
88+
# Note: the pip-installable package's name is 'stomp.py', but
89+
# e.g. the apt-installable package's name is 'python3-stomp' (or
90+
# similar) -- that's why we pass to the `MissingDependencyError`
91+
# constructor the name 'stomp', but also pass the value of the
92+
# `_DEPENDENCY_NAME_REMARK` constant as `additional_text`...
93+
if stomp is None:
94+
raise MissingDependencyError('stomp',
95+
additional_text=cls._DEPENDENCY_NAME_REMARK)
96+
if stomp.__version__ < (4, 1, 8):
97+
raise MissingDependencyError('stomp', version="4.1.8",
98+
installed=stomp.__version__,
99+
additional_text=cls._DEPENDENCY_NAME_REMARK)
100+
101+
@classmethod
102+
def __verify_parameters(cls,
103+
get_param: Callable[[str], Any],
104+
on_error: Callable[[str], None]) -> None:
105+
for param_name in [
106+
'ssl_ca_certificate',
107+
'ssl_client_certificate',
108+
'ssl_client_certificate_key',
109+
]:
110+
cls.__verify_file_param(param_name, get_param, on_error)
111+
112+
@classmethod
113+
def __verify_file_param(cls,
114+
param_name: str,
115+
get_param: Callable[[str], Any],
116+
on_error: Callable[[str], None]) -> None:
117+
path = get_param(param_name)
118+
if path is None:
119+
on_error(f"Parameter {param_name!r} is not given "
120+
f"(or is set to None).")
121+
return
122+
try:
123+
open(path, 'rb').close()
124+
except OSError as exc:
125+
# (note: the filename is mentioned in the included exc message)
126+
on_error(f"Cannot open file specified as parameter "
127+
f"{param_name!r} ({str(exc)!r}).")
128+
129+
def __get_own_attribute(self, param_name: str) -> Any:
130+
return getattr(self, param_name, None)
131+
132+
def __raise_value_error(self, msg: str) -> NoReturn:
133+
raise ValueError(msg)
134+
135+
def __get_ssl_and_connect_kwargs(self) -> Tuple[dict, dict]:
136+
# Note: the `ca_certs` argument to `set_ssl()` must always be
137+
# provided, otherwise the `stomp.py`'s machinery would *not*
138+
# perform any certificate verification!
139+
ssl_kwargs = dict(
140+
ca_certs=self.ssl_ca_certificate,
141+
cert_file=self.ssl_client_certificate,
142+
key_file=self.ssl_client_certificate_key,
143+
)
144+
connect_kwargs = dict(wait=True)
145+
return ssl_kwargs, connect_kwargs

0 commit comments

Comments
 (0)