Skip to content

Commit 5e47e4e

Browse files
Merge branch 'develop' into fix-2169
2 parents e7c5fc9 + 2d7b9ca commit 5e47e4e

File tree

10 files changed

+113
-52
lines changed

10 files changed

+113
-52
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ CHANGELOG
2929
The `LogLevel` and `ReturnType` Enums were added to `intelmq.lib.datatypes`.
3030
- `intelmq.lib.bot`:
3131
- Enhance behaviour if an unconfigured bot is started (PR#2054 by Sebastian Wagner).
32+
- Fix line recovery and message dumping of the `ParserBot` (PR#2192 by Sebastian Wagner).
33+
- Previously the dumped message was always the last message of a report if the report contained muliple lines leading to data-loss.
3234

3335
### Development
3436

@@ -73,6 +75,7 @@ CHANGELOG
7375
- Added support for `Accessible AMQP`, `Device Identification Report` (IPv4 and IPv6) (PR#2134 by Mateo Durante).
7476
- Added file name mapping for `SSL-POODLE-Vulnerable-Servers IPv6` (file name `scan6_ssl_poodle`) (PR#2134 by Mateo Durante).
7577
- `intelmq.bots.parsers.cymru.parser_cap_program`: The parser mapped the hostname into `source.fqdn` which is not allowed by the IntelMQ Data Format. Added a check (PR#2215 by Sebastian Waldbauer, fixes #2169)
78+
- `intelmq.bots.parsers.generic.parser_csv`: Use RewindableFileHandle to use the original current line for line recovery (PR#2192 by Sebastian Wagner).
7679

7780
#### Experts
7881
- `intelmq.bots.experts.domain_valid`: New bot for checking domain's validity (PR#1966 by Marius Karotkis).

intelmq/bots/parsers/abusech/parser_ip.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def parse(self, report: dict):
6969
for line in data_lines:
7070
yield line.strip()
7171

72-
def parse_line(self, line, report):
72+
def parse_line(self, line: str, report):
7373
event = self.new_event(report)
7474
self.__process_defaults(event, line, report['feed.url'])
7575
self.__process_fields(event, line, report['feed.url'])
@@ -114,7 +114,7 @@ def __process_additional(event, line, feed_url):
114114
def __sanitize_csv_lines(s: str):
115115
return s.replace('"', '')
116116

117-
def recover_line(self, line):
117+
def recover_line(self, line: str):
118118
return '\n'.join(self.comments + [self.header_line, line])
119119

120120

intelmq/bots/parsers/cymru/parser_full_bogons.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def parse(self, report):
2626
for row in raw_report.splitlines():
2727
yield row.strip()
2828

29-
def parse_line(self, val, report):
29+
def parse_line(self, val: str, report):
3030
if not len(val) or val.startswith('#') or val.startswith('//'):
3131
return
3232

intelmq/bots/parsers/generic/parser_csv.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from intelmq.lib.bot import ParserBot
2929
from intelmq.lib.exceptions import InvalidArgument, InvalidValue
3030
from intelmq.lib.harmonization import DateTime
31+
from intelmq.lib.utils import RewindableFileHandle
3132

3233
TIME_CONVERSIONS = {'timestamp': DateTime.from_timestamp,
3334
'windows_nt': DateTime.from_windows_nt,
@@ -101,8 +102,10 @@ def parse(self, report):
101102
if self.skip_header:
102103
self.tempdata.append(raw_report[:raw_report.find('\n')])
103104
raw_report = raw_report[raw_report.find('\n') + 1:]
104-
for row in csv.reader(io.StringIO(raw_report),
105+
self._handle = RewindableFileHandle(io.StringIO(raw_report))
106+
for row in csv.reader(self._handle,
105107
delimiter=str(self.delimiter)):
108+
self._current_line = self._handle.current_line
106109

107110
if self.filter_text and self.filter_type:
108111
text_in_row = self.filter_text in self.delimiter.join(row)
@@ -115,7 +118,7 @@ def parse(self, report):
115118
else:
116119
yield row
117120

118-
def parse_line(self, row, report):
121+
def parse_line(self, row: list, report):
119122
event = self.new_event(report)
120123

121124
for keygroup, value, required in zip(self.columns, row, self.columns_required):

intelmq/bots/parsers/microsoft/parser_ctip.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,19 +210,23 @@ class MicrosoftCTIPParserBot(ParserBot):
210210
def parse(self, report):
211211
raw_report = utils.base64_decode(report.get("raw"))
212212
if raw_report.startswith('['):
213+
# Interflow
213214
self.recover_line = self.recover_line_json
214215
yield from self.parse_json(report)
215216
elif raw_report.startswith('{'):
217+
# Azure
216218
self.recover_line = self.recover_line_json_stream
217219
yield from self.parse_json_stream(report)
220+
else:
221+
raise ValueError("Can't parse the received message. It is neither a JSON list nor a JSON dictionary. Please report this bug.")
218222

219223
def parse_line(self, line, report):
220224
if line.get('version', None) == 1.5:
221225
yield from self.parse_interflow(line, report)
222226
else:
223227
yield from self.parse_azure(line, report)
224228

225-
def parse_interflow(self, line, report):
229+
def parse_interflow(self, line: dict, report):
226230
raw = self.recover_line(line)
227231
if line['indicatorthreattype'] != 'Botnet':
228232
raise ValueError('Unknown indicatorthreattype %r, only Botnet is supported.' % line['indicatorthreattype'])
@@ -257,7 +261,7 @@ def parse_interflow(self, line, report):
257261
yield event
258262

259263
def parse_azure(self, line, report):
260-
raw = self.recover_line(line)
264+
raw = self.recover_line()
261265

262266
event = self.new_event(report)
263267

intelmq/bots/parsers/shadowserver/parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def parse_line(self, row, report):
197197
# Now add additional constant fields.
198198
event.update(conf.get('constant_fields', {}))
199199

200-
event.add('raw', self.recover_line(row))
200+
event.add('raw', self.recover_line())
201201

202202
# Add everything which could not be resolved to extra.
203203
for f in fields:

intelmq/lib/bot.py

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
from datetime import datetime, timedelta
3131
from typing import Any, List, Optional, Union
3232

33-
import psutil
34-
3533
import intelmq.lib.message as libmessage
3634
from intelmq import (DEFAULT_LOGGING_PATH,
3735
HARMONIZATION_CONF_FILE,
@@ -942,7 +940,7 @@ class ParserBot(Bot):
942940
_csv_params = {}
943941
_ignore_lines_starting = []
944942
_handle = None
945-
_current_line = None
943+
_current_line: Optional[str] = None
946944

947945
def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
948946
disable_multithreading: bool = None):
@@ -956,6 +954,7 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
956954
def parse_csv(self, report: libmessage.Report):
957955
"""
958956
A basic CSV parser.
957+
The resulting lines are lists.
959958
"""
960959
raw_report: str = utils.base64_decode(report.get("raw")).strip()
961960
raw_report = raw_report.translate({0: None})
@@ -971,6 +970,7 @@ def parse_csv(self, report: libmessage.Report):
971970
def parse_csv_dict(self, report: libmessage.Report):
972971
"""
973972
A basic CSV Dictionary parser.
973+
The resulting lines are dictionaries with the column names as keys.
974974
"""
975975
raw_report: str = utils.base64_decode(report.get("raw")).strip()
976976
raw_report: str = raw_report.translate({0: None})
@@ -1024,6 +1024,7 @@ def parse(self, report: libmessage.Report):
10241024
for line in utils.base64_decode(report.get("raw")).splitlines():
10251025
line = line.strip()
10261026
if not any([line.startswith(prefix) for prefix in self._ignore_lines_starting]):
1027+
self._current_line = line
10271028
yield line
10281029

10291030
def parse_line(self, line: Any, report: libmessage.Report):
@@ -1063,14 +1064,14 @@ def process(self):
10631064
events: list[libmessage.Event] = [value]
10641065
except Exception:
10651066
self.logger.exception('Failed to parse line.')
1066-
self.__failed.append((traceback.format_exc(), line))
1067+
self.__failed.append((traceback.format_exc(), self._current_line))
10671068
else:
10681069
events_count += len(events)
10691070
self.send_message(*events)
10701071

1071-
for exc, line in self.__failed:
1072+
for exc, original_line in self.__failed:
10721073
report_dump: libmessage.Message = report.copy()
1073-
report_dump.change('raw', self.recover_line(line))
1074+
report_dump.change('raw', self.recover_line(original_line))
10741075
if self.error_dump_message:
10751076
self._dump_message(exc, report_dump)
10761077
if self.destination_queues and '_on_error' in self.destination_queues:
@@ -1115,21 +1116,34 @@ def recover_line(self, line: Optional[str] = None) -> str:
11151116
line = line if line else self._current_line
11161117
return '\n'.join(tempdata + [line])
11171118

1118-
def recover_line_csv(self, line: str) -> str:
1119-
out = io.StringIO()
1120-
writer = csv.writer(out, **self._csv_params)
1121-
writer.writerow(line)
1119+
def recover_line_csv(self, line: Optional[list]) -> str:
1120+
"""
1121+
Parameter:
1122+
line: Optional line as list. If absent, the current line is used as string.
1123+
"""
1124+
if line:
1125+
out = io.StringIO()
1126+
writer = csv.writer(out, **self._csv_params)
1127+
writer.writerow(line)
1128+
result = out.getvalue()
1129+
else:
1130+
result = self._current_line
11221131
tempdata = '\r\n'.join(self.tempdata) + '\r\n' if self.tempdata else ''
1123-
return tempdata + out.getvalue()
1132+
return tempdata + result
11241133

1125-
def recover_line_csv_dict(self, line: str) -> str:
1134+
def recover_line_csv_dict(self, line: Union[dict, str, None] = None) -> str:
11261135
"""
11271136
Converts dictionaries to csv. self.csv_fieldnames must be list of fields.
11281137
"""
11291138
out = io.StringIO()
11301139
writer = csv.DictWriter(out, self.csv_fieldnames, **self._csv_params)
11311140
writer.writeheader()
1132-
out.write(self._current_line)
1141+
if isinstance(line, dict):
1142+
writer.writerow(line)
1143+
elif isinstance(line, str):
1144+
out.write(line)
1145+
else:
1146+
out.write(self._current_line)
11331147

11341148
return out.getvalue().strip()
11351149

@@ -1138,20 +1152,29 @@ def recover_line_json(self, line: dict) -> str:
11381152
Reverse of parse for JSON pulses.
11391153
11401154
Recovers a fully functional report with only the problematic pulse.
1155+
Using a string as input here is not possible, as the input may span over multiple lines.
1156+
Output is not identical to the input, but has the same content.
1157+
1158+
Parameters:
1159+
The line as dict.
1160+
1161+
Returns:
1162+
str: The JSON-encoded line as string.
11411163
"""
11421164
return json.dumps([line])
11431165

1144-
def recover_line_json_stream(self, line=None) -> str:
1166+
def recover_line_json_stream(self, line: Optional[str] = None) -> str:
11451167
"""
1146-
recover_line for json streams, just returns the current line, unparsed.
1168+
recover_line for JSON streams (one JSON element per line, no outer structure),
1169+
just returns the current line, unparsed.
11471170
11481171
Parameters:
1149-
line: None, not required, only for compatibility with other recover_line methods
1172+
line: The line itself as dict, if available, falls back to original current line
11501173
11511174
Returns:
11521175
str: unparsed JSON line.
11531176
"""
1154-
return self._current_line
1177+
return line if line else self._current_line
11551178

11561179

11571180
class CollectorBot(Bot):

intelmq/lib/mixins/sql.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
""" SQLMixin for IntelMQ
22
3-
SPDX-FileCopyrightText: 2021 Birger Schacht
3+
SPDX-FileCopyrightText: 2021 Birger Schacht, 2022 Intevation GmbH
44
SPDX-License-Identifier: AGPL-3.0-or-later
55
66
Based on the former SQLBot base class
77
"""
8+
from intelmq.lib import exceptions
89

910

1011
class SQLMixin:
@@ -18,36 +19,39 @@ class SQLMixin:
1819

1920
POSTGRESQL = "postgresql"
2021
SQLITE = "sqlite"
21-
default_engine = "postgresql"
22+
_default_engine = "postgresql"
2223
engine = None
2324
# overwrite the default value from the OutputBot
2425
message_jsondict_as_string = True
2526

2627
def __init__(self, *args, **kwargs):
28+
self._init_sql()
29+
30+
super().__init__(*args, **kwargs)
31+
32+
def _init_sql(self):
2733
self.logger.debug("Running SQL Mixin initialization.")
28-
self.engine_name = getattr(self, 'engine', self.default_engine).lower()
34+
self._engine_name = getattr(self, 'engine', self._default_engine).lower()
2935
engines = {SQLMixin.POSTGRESQL: (self._init_postgresql, "%s"),
3036
SQLMixin.SQLITE: (self._init_sqlite, "?")}
3137
for key, val in engines.items():
32-
if self.engine_name == key:
38+
if self._engine_name == key:
3339
val[0]()
3440
self.format_char = val[1]
3541
break
3642
else:
37-
raise ValueError(f"Wrong parameter 'engine' {self.engine_name!r}, possible values are {engines}")
38-
39-
super().__init__()
43+
raise ValueError(f"Wrong parameter 'engine' {self._engine_name!r}, possible values are {engines}")
4044

4145
def _connect(self, engine, connect_args: dict, autocommitable: bool = False):
42-
self.engine = engine # imported external library that connects to the DB
46+
self._engine = engine # imported external library that connects to the DB
4347
self.logger.debug(f"Connecting to database with connect_args: {connect_args}.")
4448

4549
try:
46-
self.con = self.engine.connect(**connect_args)
50+
self.con = self._engine.connect(**connect_args)
4751
if autocommitable: # psycopg2 has it, sqlite3 has not
4852
self.con.autocommit = getattr(self, 'autocommit', True) # True prevents deadlocks
4953
self.cur = self.con.cursor()
50-
except (self.engine.Error, Exception):
54+
except (self._engine.Error, Exception):
5155
self.logger.exception('Failed to connect to database.')
5256
self.stop()
5357
self.logger.info("Connected to database.")
@@ -88,24 +92,24 @@ def execute(self, query: str, values: tuple, rollback=False):
8892
# note: this assumes, the DB was created with UTF-8 support!
8993
self.cur.execute(query, values)
9094
self.logger.debug('Done.')
91-
except (self.engine.InterfaceError, self.engine.InternalError,
92-
self.engine.OperationalError, AttributeError):
95+
except (self._engine.InterfaceError, self._engine.InternalError,
96+
self._engine.OperationalError, AttributeError):
9397
if rollback:
9498
try:
9599
self.con.rollback()
96100
self.logger.exception('Executed rollback command '
97101
'after failed query execution.')
98-
except self.engine.OperationalError:
102+
except self._engine.OperationalError:
99103
self.logger.exception('Executed rollback command '
100104
'after failed query execution.')
101-
self.init()
105+
self._init_sql()
102106
except Exception:
103107
self.logger.exception('Cursor has been closed, connecting '
104108
'again.')
105-
self.init()
109+
self._init_sql()
106110
else:
107111
self.logger.exception('Database connection problem, connecting again.')
108-
self.init()
112+
self._init_sql()
109113
else:
110114
return True
111115
return False

intelmq/lib/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
class PipelineFactory:
3030

3131
@staticmethod
32-
def create(logger, broker=None, direction=None, queues=None, pipeline_args=None, load_balance=False, is_multithreaded=False):
32+
def create(logger, broker=None, direction=None, queues=None, pipeline_args: Optional[dict] = None, load_balance=False, is_multithreaded=False):
3333
"""
3434
direction: "source" or "destination", optional, needed for queues
3535
queues: needs direction to be set, calls set_queues

0 commit comments

Comments
 (0)