Skip to content

Commit 7c1baed

Browse files
bug: bot library mode: clear internal messages before run and add test
before a message is processed in library mode, clear the internal queue of the pipeline and the cached message of the bot instance. in case of leftovers, which can happen when the process method raised exception, the leftover message would take precendence over any new messages and thus keep the bot in a endless loop of wrong errors this commit also adds a unittest to cover this behaviour
1 parent 360d058 commit 7c1baed

File tree

3 files changed

+39
-3
lines changed

3 files changed

+39
-3
lines changed

intelmq/lib/bot.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,13 +1023,17 @@ def process_message(self, *messages: Union[libmessage.Message, dict]):
10231023
10241024
Access the output queue e.g. with return_value['output']
10251025
"""
1026-
print('process_message, destination state:', self.__destination_pipeline.state, 'self.destination_queues', self.destination_queues)
10271026
if self.bottype == BotType.COLLECTOR:
10281027
if messages:
10291028
raise exceptions.InvalidArgument('Collector Bots take no messages as processing input')
10301029
else:
10311030
# reset source queue
10321031
self.__source_pipeline.state[self.source_queue] = []
1032+
# reset internal queue
1033+
if self.__source_pipeline._has_message:
1034+
self.__source_pipeline.acknowledge()
1035+
self.__current_message = None
1036+
10331037
for message in messages:
10341038
# convert to Message objects, it the message is a dict
10351039
# use an appropriate default message type, not requiring __type keys in the message

intelmq/lib/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import time
1717
from itertools import chain
18-
from typing import Dict, Optional
18+
from typing import Optional
1919
import ssl
2020

2121
import redis

intelmq/tests/lib/test_bot_library_mode.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
Software engineering by BSI & Intevation GmbH
99
10-
This tests IntelMQ bots in library mode (IEP007)
10+
This file tests IntelMQ bots in library mode (IEP007)
1111
"""
1212
import json
1313
import unittest
@@ -28,13 +28,32 @@
2828
'source.urlpath': '/',
2929
'protocol.application': 'http',
3030
'protocol.transport': 'tcp'}
31+
EXAMPLE_IP_INPUT = {"source.ip": "192.0.43.7", # icann.org.
32+
"destination.ip": "192.0.43.8", # iana.org.
33+
"time.observation": "2015-01-01T00:00:00+00:00",
34+
}
3135

3236

3337
class BrokenInitExpertBot(ExpertBot):
3438
def init(self):
3539
raise ValueError('This initialization intionally raises an error!')
3640

3741

42+
class RaisesOnFirstRunExpertBot(ExpertBot):
43+
counter = 0
44+
45+
def init(self):
46+
self.counter = 0
47+
48+
def process(self):
49+
event = self.receive_message()
50+
self.counter += 1
51+
if self.counter == 1:
52+
raise ValueError('This initialization intionally raises an error!')
53+
self.send_message(event)
54+
self.acknowledge_message()
55+
56+
3857
def assertMessageEqual(actual, expected):
3958
"""
4059
Compare two messages as dicts.
@@ -109,5 +128,18 @@ def test_bot_multi_message():
109128
assert queues['output'] == [EXAMPLE_DATA_URL_OUT] * 2
110129

111130

131+
def test_bot_raises_and_second_message():
132+
"""
133+
The first message raises an error and the second message
134+
This test is based on an issue where the exception-raising message was not cleared from the internal message store of the Bot/Pipeline instance and thus re-used on the second run
135+
"""
136+
raises_on_first_run = RaisesOnFirstRunExpertBot('raises', settings=BotLibSettings)
137+
with raises(ValueError):
138+
raises_on_first_run.process_message(EXAMPLE_DATA_URL)
139+
queues = raises_on_first_run.process_message(EXAMPLE_IP_INPUT)
140+
assert len(queues['output']) == 1
141+
assertMessageEqual(queues['output'][0], EXAMPLE_IP_INPUT)
142+
143+
112144
if __name__ == '__main__': # pragma: no cover
113145
unittest.main()

0 commit comments

Comments
 (0)