Skip to content

Commit c25904c

Browse files
committed
[client] Implement logger
1 parent 95b5613 commit c25904c

File tree

2 files changed

+94
-40
lines changed

2 files changed

+94
-40
lines changed

pyobas/helpers.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import pika
1313
import yaml
1414

15-
from pyobas import OpenBAS
15+
from pyobas import OpenBAS, utils
1616

1717
TRUTHY: List[str] = ["yes", "true", "True"]
1818
FALSY: List[str] = ["no", "false", "False"]
@@ -152,6 +152,7 @@ def __init__(
152152
self,
153153
config: Dict,
154154
injector_config: Dict,
155+
logger,
155156
callback,
156157
) -> None:
157158
threading.Thread.__init__(self)
@@ -162,6 +163,7 @@ def __init__(
162163

163164
self.callback = callback
164165
self.config = config
166+
self.logger = logger
165167
self.host = injector_config.connection["host"]
166168
self.vhost = injector_config.connection["vhost"]
167169
self.use_ssl = injector_config.connection["use_ssl"]
@@ -197,10 +199,10 @@ def _data_handler(self, json_data) -> None:
197199
self.callback(json_data)
198200

199201
def run(self) -> None:
200-
print("Starting ListenQueue thread")
202+
self.logger.info("Starting ListenQueue thread")
201203
while not self.exit_event.is_set():
202204
try:
203-
print("ListenQueue connecting to rabbitMq.")
205+
self.logger.info("ListenQueue connecting to RabbitMQ.")
204206
# Connect the broker
205207
self.pika_credentials = pika.PlainCredentials(self.user, self.password)
206208
self.pika_parameters = pika.ConnectionParameters(
@@ -221,7 +223,7 @@ def run(self) -> None:
221223
# when not in cluster mode this line raise an exception
222224
self.channel.confirm_delivery()
223225
except Exception as err: # pylint: disable=broad-except
224-
print(str(err))
226+
self.logger.error(str(err))
225227
self.channel.basic_qos(prefetch_count=1)
226228
assert self.channel is not None
227229
self.channel.basic_consume(
@@ -232,25 +234,26 @@ def run(self) -> None:
232234
try:
233235
self.pika_connection.close()
234236
except Exception as errInException:
235-
print(str(errInException))
237+
self.logger.error(str(errInException))
236238
traceback.print_exc()
237239
# Wait some time and then retry ListenQueue again.
238240
time.sleep(10)
239241

240242
def stop(self):
241-
print("Preparing ListenQueue for clean shutdown")
243+
self.logger.info("Preparing ListenQueue for clean shutdown")
242244
self.exit_event.set()
243245
self.pika_connection.close()
244246
if self.thread:
245247
self.thread.join()
246248

247249

248250
class PingAlive(threading.Thread):
249-
def __init__(self, api, config, ping_type) -> None:
251+
def __init__(self, api, config, logger, ping_type) -> None:
250252
threading.Thread.__init__(self)
251253
self.ping_type = ping_type
252254
self.api = api
253255
self.config = config
256+
self.logger = logger
254257
self.in_error = False
255258
self.exit_event = threading.Event()
256259

@@ -262,15 +265,15 @@ def ping(self) -> None:
262265
else:
263266
self.api.collector.create(self.config, False)
264267
except Exception as e: # pylint: disable=broad-except
265-
print(str(e))
268+
self.logger.error(str(e))
266269
self.exit_event.wait(40)
267270

268271
def run(self) -> None:
269-
print("Starting PingAlive thread")
272+
self.logger.info("Starting PingAlive thread")
270273
self.ping()
271274

272275
def stop(self) -> None:
273-
print("Preparing PingAlive for clean shutdown")
276+
self.logger.info("Preparing PingAlive for clean shutdown")
274277
self.exit_event.set()
275278

276279

@@ -309,12 +312,20 @@ def __init__(self, config: OpenBASConfigHelper, icon) -> None:
309312
url=config.get_conf("openbas_url"),
310313
token=config.get_conf("openbas_token"),
311314
)
315+
312316
self.config = {
313317
"collector_id": config.get_conf("collector_id"),
314318
"collector_name": config.get_conf("collector_name"),
315319
"collector_type": config.get_conf("collector_type"),
316320
"collector_period": config.get_conf("collector_period"),
317321
}
322+
323+
self.logger_class = utils.logger(
324+
config.get_conf("collector_log_level", default="info").upper(),
325+
config.get_conf("collector_json_logging", default=True),
326+
)
327+
self.collector_logger = self.logger_class(config.get_conf("collector_name"))
328+
318329
icon_name = config.get_conf("collector_id") + ".png"
319330
collector_icon = (icon_name, icon, "image/png")
320331
self.api.collector.create(self.config, collector_icon)
@@ -323,7 +334,9 @@ def __init__(self, config: OpenBASConfigHelper, icon) -> None:
323334
self.scheduler = sched.scheduler(time.time, time.sleep)
324335
# Start ping thread
325336
if not self.connect_run_and_terminate:
326-
self.ping = PingAlive(self.api, self.config, "collector")
337+
self.ping = PingAlive(
338+
self.api, self.config, self.collector_logger, "collector"
339+
)
327340
self.ping.start()
328341
self.listen_queue = None
329342

@@ -371,21 +384,28 @@ def __init__(self, config: OpenBASConfigHelper, icon) -> None:
371384
"injector_executor_clear_commands", default=None
372385
),
373386
}
387+
388+
self.logger_class = utils.logger(
389+
config.get_conf("injector_log_level", default="info").upper(),
390+
config.get_conf("injector_json_logging", default=True),
391+
)
392+
self.injector_logger = self.logger_class(config.get_conf("injector_name"))
393+
374394
icon_name = config.get_conf("injector_type") + ".png"
375395
injector_icon = (icon_name, icon, "image/png")
376396
self.injector_config = self.api.injector.create(self.config, injector_icon)
377397
self.connect_run_and_terminate = False
378398
self.scheduler = sched.scheduler(time.time, time.sleep)
379399
# Start ping thread
380400
if not self.connect_run_and_terminate:
381-
self.ping = PingAlive(self.api, self.config, "injector")
401+
self.ping = PingAlive(
402+
self.api, self.config, self.injector_logger, "injector"
403+
)
382404
self.ping.start()
383405
self.listen_queue = None
384406

385407
def listen(self, message_callback: Callable[[Dict], None]) -> None:
386408
self.listen_queue = ListenQueue(
387-
self.config,
388-
self.injector_config,
389-
message_callback,
409+
self.config, self.injector_config, self.injector_logger, message_callback
390410
)
391411
self.listen_queue.start()

pyobas/utils.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import dataclasses
2+
import datetime
23
import email.message
34
import json
45
import logging
56
import urllib.parse
67
from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Tuple, Union
78

89
import requests
10+
from pythonjsonlogger import jsonlogger
911

1012

1113
class _StdoutStream:
@@ -20,31 +22,6 @@ def get_content_type(content_type: Optional[str]) -> str:
2022
return message.get_content_type()
2123

2224

23-
class MaskingFormatter(logging.Formatter):
24-
"""A logging formatter that can mask credentials"""
25-
26-
def __init__(
27-
self,
28-
fmt: Optional[str] = logging.BASIC_FORMAT,
29-
datefmt: Optional[str] = None,
30-
style: Literal["%", "{", "$"] = "%",
31-
validate: bool = True,
32-
masked: Optional[str] = None,
33-
) -> None:
34-
super().__init__(fmt, datefmt, style, validate)
35-
self.masked = masked
36-
37-
def _filter(self, entry: str) -> str:
38-
if not self.masked:
39-
return entry
40-
41-
return entry.replace(self.masked, "[MASKED]")
42-
43-
def format(self, record: logging.LogRecord) -> str:
44-
original = logging.Formatter.format(self, record)
45-
return self._filter(original)
46-
47-
4825
def response_content(
4926
response: requests.Response,
5027
streamed: bool,
@@ -136,3 +113,60 @@ def validate_attrs(
136113
f"Must provide one of these attributes: "
137114
f"{', '.join(self.exclusive)}"
138115
)
116+
117+
118+
class CustomJsonFormatter(jsonlogger.JsonFormatter):
119+
def add_fields(self, log_record, record, message_dict):
120+
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
121+
if not log_record.get("timestamp"):
122+
# This doesn't use record.created, so it is slightly off
123+
now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
124+
log_record["timestamp"] = now
125+
if log_record.get("level"):
126+
log_record["level"] = log_record["level"].upper()
127+
else:
128+
log_record["level"] = record.levelname
129+
130+
131+
def logger(level, json_logging=True):
132+
# Exceptions
133+
logging.getLogger("urllib3").setLevel(logging.WARNING)
134+
logging.getLogger("pika").setLevel(logging.ERROR)
135+
# Exceptions
136+
if json_logging:
137+
log_handler = logging.StreamHandler()
138+
log_handler.setLevel(level)
139+
formatter = CustomJsonFormatter("%(timestamp)s %(level)s %(name)s %(message)s")
140+
log_handler.setFormatter(formatter)
141+
logging.basicConfig(handlers=[log_handler], level=level, force=True)
142+
else:
143+
logging.basicConfig(level=level)
144+
145+
class AppLogger:
146+
def __init__(self, name):
147+
self.local_logger = logging.getLogger(name)
148+
149+
@staticmethod
150+
def prepare_meta(meta=None):
151+
return None if meta is None else {"attributes": meta}
152+
153+
@staticmethod
154+
def setup_logger_level(lib, log_level):
155+
logging.getLogger(lib).setLevel(log_level)
156+
157+
def debug(self, message, meta=None):
158+
self.local_logger.debug(message, extra=AppLogger.prepare_meta(meta))
159+
160+
def info(self, message, meta=None):
161+
self.local_logger.info(message, extra=AppLogger.prepare_meta(meta))
162+
163+
def warning(self, message, meta=None):
164+
self.local_logger.warning(message, extra=AppLogger.prepare_meta(meta))
165+
166+
def error(self, message, meta=None):
167+
# noinspection PyTypeChecker
168+
self.local_logger.error(
169+
message, exc_info=1, extra=AppLogger.prepare_meta(meta)
170+
)
171+
172+
return AppLogger

0 commit comments

Comments
 (0)