Skip to content

Commit 71ca4d6

Browse files
authored
Merge pull request #384 from teresalves/teresalves/fix-python3-inconsistencies
feat: increase configuration options and fix bugs after the python3 upgrade
2 parents cc19a10 + b939ba8 commit 71ca4d6

File tree

13 files changed

+89
-14
lines changed

13 files changed

+89
-14
lines changed

.github/workflows/code.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
strategy:
1717
max-parallel: 4
1818
matrix:
19-
python-version: [3.7, 3.8, 3.9, 3.10.8]
19+
python-version: [3.7, 3.8, 3.9, 3.10.13]
2020

2121
steps:
2222
- uses: actions/checkout@v2

docs/index.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,26 @@ e.g.: the CLI option ``auth-address`` becomes ``auth_address`` in the
220220
- 10.10.10.1
221221
- 10.10.10.2
222222
- 10.10.10.3
223+
opts:
224+
batch_size: 16384
225+
max_request_size: 1048576
226+
buffer_memory: 33554432
227+
send_buffer_bytes: 131072
228+
max_in_flight_requests_per_connection: 5
229+
retries: 0
230+
max_block_ms: 60000
231+
linger_ms: 1000
223232
224233
The configuration above listens to the syslog messages from the Kafka bootstrap
225234
servers ``10.10.10.1``, ``10.10.10.2`` and ``10.10.10.3`` then publishes the
226235
structured objects encrypted and serialized via ZeroMQ, serving them at the
227236
address ``172.17.17.2``, port ``49017``.
228237

238+
The opts listed there are the kafka producer options that the napalm-logs exposes
239+
They are directly named in the same way as the kafka python3 package:
240+
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
241+
These opts are optional - you can chose to add them completely, only partially or not at all.
242+
229243
Check the complete list of configuration options under
230244
:ref:`configuration-options`.
231245

docs/options/index.rst

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,34 @@ Configuration file example:
344344
345345
hwm: 0
346346
347+
.. _configuration-backlog:
348+
349+
``backlog``: 100
350+
-------------
351+
352+
.. versionadded:: 0.11.0
353+
354+
The zmq backlog option shall set the maximum length of the queue of outstanding peer
355+
connections for the specified socket; this only applies to connection-oriented transports.
356+
This is used for both external zmq publishers but also but the internally defined zmq that
357+
saves the messages before sending them to the configured publishers.
358+
359+
This option can be used to tune the performances of the napalm-logs.
360+
While the default limit should be generally enough, in environments with extremely high
361+
density of syslog messages to be processed, it is recommended to increase this value.
362+
363+
CLI usage example:
364+
365+
.. code-block:: bash
366+
367+
$ napalm-logs --backlog 0
368+
369+
Configuration file example:
370+
371+
.. code-block:: yaml
372+
373+
backlog: 0
374+
347375
.. _configuration-options-keyfile:
348376

349377
``keyfile``

napalm_logs/base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(
7575
device_blacklist=[],
7676
device_whitelist=[],
7777
hwm=None,
78+
backlog=None,
7879
device_worker_processes=1,
7980
serializer="msgpack",
8081
buffer=None,
@@ -128,6 +129,7 @@ def __init__(
128129
self.serializer = serializer
129130
self.device_worker_processes = device_worker_processes
130131
self.hwm = hwm
132+
self.backlog = backlog
131133
self._buffer_cfg = buffer
132134
self._buffer = None
133135
# Setup the environment
@@ -209,6 +211,7 @@ def _post_preparation(self):
209211
already setup).
210212
"""
211213
self.opts["hwm"] = CONFIG.ZMQ_INTERNAL_HWM if self.hwm is None else self.hwm
214+
self.opts["backlog"] = CONFIG.ZMQ_INTERNAL_BACKLOG if self.backlog is None else self.backlog
212215
self.opts["_server_send_unknown"] = False
213216
for pub in self.publisher:
214217
pub_name = list(pub.keys())[0]
@@ -631,7 +634,7 @@ def _start_srv_proc(self, started_os_proc):
631634

632635
def _start_pub_px_proc(self):
633636
""" """
634-
px = NapalmLogsPublisherProxy(self.opts["hwm"])
637+
px = NapalmLogsPublisherProxy(self.opts["hwm"], self.opts["backlog"])
635638
proc = Process(target=px.start)
636639
proc.start()
637640
proc.description = "Publisher proxy process"

napalm_logs/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
LOG_FILE = os.path.join(ROOT_DIR, "var", "log", "napalm", "logs")
3030
LOG_FILE_CLI_OPTIONS = ("cli", "screen")
3131
ZMQ_INTERNAL_HWM = 1000
32+
ZMQ_INTERNAL_BACKLOG = 100
3233
METRICS_ADDRESS = "0.0.0.0"
3334
METRICS_PORT = 9443
3435
METRICS_DIR = "/tmp/napalm_logs_metrics"

napalm_logs/device.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,13 @@ def _parse(self, msg_dict):
177177
ret[key] = result
178178
return ret
179179
if error_present is True:
180-
log.info(
180+
log.debug(
181181
"Configured regex did not match for os: %s tag %s",
182182
self._name,
183183
msg_dict.get("tag", ""),
184184
)
185185
else:
186-
log.info(
186+
log.debug(
187187
"Syslog message not configured for os: %s tag %s",
188188
self._name,
189189
msg_dict.get("tag", ""),
@@ -248,7 +248,7 @@ def start(self):
248248
"Counter of failed OpenConfig object generations",
249249
["device_os"],
250250
)
251-
if self.opts.get("metrics_include_attributes", True):
251+
if self.opts.get("metrics_include_attributes", False):
252252
napalm_logs_device_published_messages_attrs = Counter(
253253
"napalm_logs_device_published_messages_attrs",
254254
"Counter of published messages, with more granular selection",
@@ -355,7 +355,7 @@ def start(self):
355355
self.pub.send(umsgpack.packb(to_publish))
356356
# self._publish(to_publish)
357357
napalm_logs_device_published_messages.labels(device_os=self._name).inc()
358-
if self.opts.get("metrics_include_attributes", True):
358+
if self.opts.get("metrics_include_attributes", False):
359359
napalm_logs_device_published_messages_attrs.labels(
360360
device_os=self._name,
361361
error=to_publish["error"],

napalm_logs/listener/kafka.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ def receive(self):
7070
log.error("Received kafka error: %s", error, exc_info=True)
7171
raise ListenerException(error)
7272
log_source = msg.key
73+
if isinstance(log_source, bytes):
74+
log_source = log_source.decode()
7375
try:
7476
decoded = json.loads(msg.value.decode("utf-8"))
7577
except ValueError:
7678
log.error("Not in json format: %s", msg.value.decode("utf-8"))
7779
return "", ""
7880
log_message = decoded.get("message")
79-
log.debug("[%s] Received %s from %s", log_message, log_source, time.time())
81+
log.debug("[%s] Received from kafka %s from %s", log_message, log_source, time.time())
8082
return log_message, log_source
8183

8284
def stop(self):

napalm_logs/listener/tcp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def _client_connection(self, conn, addr):
7272
# log.debug('Received empty message from %s', addr)
7373
# disabled ^ as it was too noisy
7474
continue
75-
log.debug("[%s] Received %s from %s", time.time(), msg, addr)
75+
log.debug("[%s] Received from tcp %s from %s", time.time(), msg, addr)
7676
messages = []
7777
if isinstance(msg, bytes):
7878
msg = msg.decode("utf-8")

napalm_logs/listener/udp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def receive(self):
7474
except socket.error as error:
7575
log.error("Received listener socket error: %s", error, exc_info=True)
7676
raise ListenerException(error)
77-
log.debug("[%s] Received %s from %s", msg, addr, time.time())
77+
log.debug("[%s] Received from udp %s from %s", msg, addr, time.time())
7878
return msg, addr[0]
7979

8080
def stop(self):

napalm_logs/listener_proc.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def start(self):
100100
while self.__up:
101101
try:
102102
log_message, log_source = self.listener.receive()
103+
if isinstance(log_source, bytes):
104+
log_source = log_source.decode()
103105
except ListenerException as lerr:
104106
if self.__up is False:
105107
log.info("Exiting on process shutdown")

0 commit comments

Comments
 (0)