Skip to content

Commit e97db41

Browse files
[ENH] Using msgpack instead of json
Using msgpack instead of json results in faster (de)serialize and less memory usage. Redis is also capable of msgpack within its lua api i.e. https://github.com/kengonakajima/lua-msgpack-native. ====== Benchmark ======= JSON median size: 387 MSGPACK median size: 329 ------------------------ Diff: 16.20% JSON * Serialize: 39286 * Deserialize: 30713 MSGPACK * Serialize: 23483 * Deserialize: 12602 --------------------- DIFF * Serialize: 50.35% * Deserialize: 83.62% Data extracted from spamhaus-collector Measurements based on deduplicator-expert 460 events in total process by deducplicator-expert Signed-off-by: Sebastian Waldbauer <[email protected]>
1 parent 94fa7f8 commit e97db41

File tree

15 files changed

+106
-57
lines changed

15 files changed

+106
-57
lines changed

debian/control

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Build-Depends: debhelper (>= 4.1.16),
2020
python3-sphinx-rtd-theme,
2121
python3-termstyle,
2222
python3-tz,
23+
python3-msgpack,
2324
quilt,
2425
rsync,
2526
safe-rm
@@ -41,6 +42,7 @@ Depends: bash-completion,
4142
python3-ruamel.yaml,
4243
python3-termstyle (>= 0.1.10),
4344
python3-tz,
45+
python3-msgpack,
4446
redis-server,
4547
systemd,
4648
${misc:Depends},

intelmq/bots/parsers/json/parser.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ def process(self):
2828
for line in lines:
2929
new_event = MessageFactory.unserialize(line,
3030
harmonization=self.harmonization,
31-
default_type='Event')
31+
default_type='Event',
32+
use_packer="json")
33+
3234
event = self.new_event(report)
3335
event.update(new_event)
3436
if 'raw' not in event:

intelmq/lib/bot.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import inspect
1717
import io
1818
import json
19+
import msgpack
1920
import logging
2021
import os
2122
import re
@@ -329,8 +330,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True,
329330
self.logger.error('Pipeline failed.')
330331
self.__disconnect_pipelines()
331332

332-
except exceptions.DecodingError as exc:
333-
self.logger.exception('Could not decode message from pipeline. No retries useful.')
333+
except exceptions.UnserializationError as exc:
334+
self.logger.exception('Could not unserialize message from pipeline. No retries useful.')
334335

335336
# ensure that we do not re-process the faulty message
336337
self.__error_retries_counter = self.error_max_retries + 1

intelmq/lib/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,12 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None,
167167
suffixes.append('with reason %r' % exception.reason)
168168
suffix = (' ' + ' '.join(suffixes)) if suffixes else ''
169169
super().__init__("Could not decode string%s." % suffix)
170+
171+
172+
class UnserializationError(IntelMQException, ValueError):
173+
"""
174+
Unrecoverable error during message unserialization
175+
"""
176+
def __init__(self, exception: Exception = None, object: bytes = None):
177+
self.object = object
178+
super().__init__("Could not unserialize message%s." % exception)

intelmq/lib/message.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import warnings
1515
from collections import defaultdict
1616
from typing import Any, Dict, Iterable, Optional, Sequence, Union
17+
import msgpack
1718

1819
import intelmq.lib.exceptions as exceptions
1920
import intelmq.lib.harmonization
@@ -60,8 +61,8 @@ def from_dict(message: dict, harmonization=None,
6061
return class_reference(message, auto=True, harmonization=harmonization)
6162

6263
@staticmethod
63-
def unserialize(raw_message: str, harmonization: dict = None,
64-
default_type: Optional[str] = None) -> dict:
64+
def unserialize(raw_message: bytes, harmonization: dict = None,
65+
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
6566
"""
6667
Takes JSON-encoded Message object, returns instance of correct class.
6768
@@ -74,12 +75,12 @@ def unserialize(raw_message: str, harmonization: dict = None,
7475
MessageFactory.from_dict
7576
MessageFactory.serialize
7677
"""
77-
message = Message.unserialize(raw_message)
78+
message = Message.unserialize(raw_message, use_packer=use_packer)
7879
return MessageFactory.from_dict(message, harmonization=harmonization,
7980
default_type=default_type)
8081

8182
@staticmethod
82-
def serialize(message):
83+
def serialize(message) -> bytes:
8384
"""
8485
Takes instance of message-derived class and makes JSON-encoded Message.
8586
@@ -127,7 +128,7 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False,
127128
elif isinstance(message, tuple):
128129
self.iterable = dict(message)
129130
else:
130-
raise ValueError("Type %r of message can't be handled, must be dict or tuple.", type(message))
131+
raise ValueError("Type %r of message can't be handled, must be dict or tuple." % type(message))
131132
for key, value in self.iterable.items():
132133
if not self.add(key, value, sanitize=False, raise_failure=False):
133134
self.add(key, value, sanitize=True)
@@ -310,18 +311,32 @@ def deep_copy(self):
310311
harmonization={self.__class__.__name__.lower(): self.harmonization_config})
311312

312313
def __str__(self):
313-
return self.serialize()
314+
return self.serialize(use_packer="json")
314315

315-
def serialize(self):
316-
self['__type'] = self.__class__.__name__
317-
json_dump = utils.decode(json.dumps(self))
318-
del self['__type']
319-
return json_dump
316+
def serialize(self, use_packer: str = "msgpack"):
317+
delete_type = False
318+
if '__type' not in self:
319+
delete_type = True
320+
self['__type'] = self.__class__.__name__
321+
322+
if use_packer == "json":
323+
packed = json.dumps(self)
324+
else:
325+
packed = msgpack.packb(self)
326+
327+
if delete_type:
328+
del self['__type']
329+
return packed
320330

321331
@staticmethod
322-
def unserialize(message_string: str):
323-
message = json.loads(message_string)
324-
return message
332+
def unserialize(message: bytes, use_packer: str = "msgpack"):
333+
try:
334+
if use_packer == "json":
335+
return json.loads(message)
336+
else:
337+
return msgpack.unpackb(message, raw=False)
338+
except Exception as exc:
339+
raise exceptions.UnserializationError(exception=exc, object=message)
325340

326341
def __is_valid_key(self, key: str):
327342
try:
@@ -470,14 +485,18 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,
470485
json_dict_fp = json_dict_fp[subkey]
471486

472487
for key, value in jsondicts.items():
473-
new_dict[key] = json.dumps(value, ensure_ascii=False)
488+
new_dict[key] = json.dumps(value)
474489

475490
return new_dict
476491

477492
def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
478493
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
479494
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)
480495

496+
def to_msgpack(self, hierarchical=False, with_type=False):
497+
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
498+
return msgpack.packb(msgpack_dict)
499+
481500
def __eq__(self, other: dict) -> bool:
482501
"""
483502
Wrapper is necessary as we have additional members

intelmq/lib/pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,14 @@ def send(self, message: str, path: str = "_default",
125125
path_permissive: bool = False):
126126
raise NotImplementedError
127127

128-
def receive(self) -> str:
128+
def receive(self) -> bytes:
129129
if self._has_message:
130130
raise exceptions.PipelineError("There's already a message, first "
131131
"acknowledge the existing one.")
132132

133133
retval = self._receive()
134134
self._has_message = True
135-
return utils.decode(retval)
135+
return retval
136136

137137
def _receive(self) -> bytes:
138138
raise NotImplementedError

intelmq/lib/test.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io
1313
import inspect
1414
import json
15+
import msgpack
1516
import os
1617
import re
1718
import unittest
@@ -158,8 +159,7 @@ def setUpClass(cls):
158159
elif cls.bot_type != 'collector' and cls.default_input_message == '':
159160
cls.default_input_message = {'__type': 'Event'}
160161
if type(cls.default_input_message) is dict:
161-
cls.default_input_message = \
162-
utils.decode(json.dumps(cls.default_input_message))
162+
cls.default_input_message = msgpack.packb(cls.default_input_message)
163163

164164
if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
165165
password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \
@@ -176,10 +176,10 @@ def setUpClass(cls):
176176
harmonization = utils.load_configuration(pkg_resources.resource_filename('intelmq',
177177
'etc/harmonization.conf'))
178178

179-
def new_report(self, auto=False, examples=False):
179+
def new_report(self, auto=False, examples=False) -> message.Report:
180180
return message.Report(harmonization=self.harmonization, auto=auto)
181181

182-
def new_event(self):
182+
def new_event(self) -> message.Event:
183183
return message.Event(harmonization=self.harmonization)
184184

185185
def get_mocked_logger(self, logger):
@@ -247,7 +247,7 @@ def prepare_source_queue(self):
247247
self.input_queue = []
248248
for msg in self.input_message:
249249
if type(msg) is dict:
250-
self.input_queue.append(json.dumps(msg))
250+
self.input_queue.append(message.MessageFactory.serialize(msg))
251251
elif issubclass(type(msg), message.Message):
252252
self.input_queue.append(msg.serialize())
253253
else:
@@ -331,17 +331,17 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
331331

332332
""" Test if report has required fields. """
333333
if self.bot_type == 'collector':
334-
for report_json in self.get_output_queue():
335-
report = message.MessageFactory.unserialize(report_json,
334+
for report_data in self.get_output_queue():
335+
report = message.MessageFactory.unserialize(report_data,
336336
harmonization=self.harmonization)
337337
self.assertIsInstance(report, message.Report)
338338
self.assertIn('raw', report)
339339
self.assertIn('time.observation', report)
340340

341341
""" Test if event has required fields. """
342342
if self.bot_type == 'parser':
343-
for event_json in self.get_output_queue():
344-
event = message.MessageFactory.unserialize(event_json,
343+
for event_data in self.get_output_queue():
344+
event = message.MessageFactory.unserialize(event_data,
345345
harmonization=self.harmonization)
346346
self.assertIsInstance(event, message.Event)
347347
self.assertIn('classification.type', event)
@@ -408,7 +408,7 @@ def get_output_queue(self, path="_default"):
408408
"""Getter for items in the output queues of this bot. Use in TestCase scenarios
409409
If there is multiple queues in named queue group, we return all the items chained.
410410
"""
411-
return [utils.decode(text) for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])]
411+
return [text for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])]
412412
# return [utils.decode(text) for text in self.pipe.state["%s-output" % self.bot_id]]
413413

414414
def test_bot_name(self, *args, **kwargs):
@@ -539,9 +539,9 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d
539539
given queue position.
540540
"""
541541
event = self.get_output_queue(path=path)[queue_pos]
542-
self.assertIsInstance(event, str)
542+
self.assertIsInstance(event, bytes)
543543

544-
event_dict = json.loads(event)
544+
event_dict = msgpack.unpackb(event, raw=False)
545545
if isinstance(expected_msg, (message.Event, message.Report)):
546546
expected = expected_msg.to_dict(with_type=True)
547547
else:

intelmq/tests/bots/collectors/tcp/test_collector.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,10 @@ def test_intelmq_exchange(self):
126126
for i, msg in enumerate(self.get_output_queue()):
127127
report = MessageFactory.unserialize(msg, harmonization=self.harmonization, default_type='Event')
128128

129-
output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), harmonization=self.harmonization, default_type='Event')
129+
output = MessageFactory.unserialize(utils.base64_decode(report["raw"]),
130+
harmonization=self.harmonization,
131+
default_type='Event',
132+
use_packer="json")
130133
self.assertDictEqual(output, INPUT1)
131134

132135
del report['time.observation']

intelmq/tests/bots/experts/cymru_whois/test_expert.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# SPDX-License-Identifier: AGPL-3.0-or-later
44

55
# -*- coding: utf-8 -*-
6-
import json
6+
import msgpack
77
import unittest
88

99
import intelmq.lib.test as test
@@ -93,7 +93,7 @@ def test_6to4_result(self):
9393
"""
9494
self.input_message = EXAMPLE_6TO4_INPUT
9595
self.run_bot()
96-
actual = json.loads(self.get_output_queue()[0])
96+
actual = msgpack.loads(self.get_output_queue()[0])
9797
self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual)
9898
self.assertIn("source.asn", actual)
9999
self.assertIn("source.as_name", actual)

intelmq/tests/bots/experts/idea/test_expert.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
# -*- coding: utf-8 -*-
66
import unittest
77
import json
8+
import msgpack
89

910
import intelmq.lib.test as test
11+
from intelmq.lib.message import MessageFactory
1012
from intelmq.bots.experts.idea.expert import IdeaExpertBot
1113
from intelmq.lib.harmonization import ClassificationType
1214

@@ -86,10 +88,10 @@ def test_conversion(self):
8688
# The ID in the generated Idea event is random, so we have to extract
8789
# the data from the "output" field and compare after removing ID's
8890
event = self.get_output_queue()[0]
89-
self.assertIsInstance(event, str)
90-
event_dict = json.loads(event)
91+
self.assertIsInstance(event, bytes)
92+
event_dict = MessageFactory.unserialize(event)
9193
self.assertIsInstance(event_dict, dict)
92-
self.assertTrue("output" in event_dict)
94+
self.assertTrue(b"output" in event_dict)
9395
idea_event = json.loads(event_dict["output"])
9496
self.assertIsInstance(idea_event, dict)
9597
del TEST_OUTPUT1["ID"]

0 commit comments

Comments
 (0)