diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 6c19632..fc6a217 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -16,11 +16,12 @@ """Module containing a Logstask cASO messenger.""" +import json +import datetime import socket from oslo_config import cfg from oslo_log import log -import six from caso import exception import caso.messenger @@ -49,11 +50,24 @@ def __init__(self, host=CONF.logstash.host, port=CONF.logstash.port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def push(self, records): - """Push records to logstash using tcp.""" + + # NOTE(acostantini): code for the serialization and push of the + # records in logstash. JSON format to be used and encoding UTF-8 + if not records: + return + + # Actual timestamp to be added on each record + ct = int(datetime.datetime.now().timestamp()) + + # Open the connection with LS + self.sock.connect((self.host, self.port)) + try: - self.sock.connect((self.host, self.port)) - for _, record in six.iteritems(records): - self.sock.sendall(record.as_json() + "\n") + for record in records: + # serialization of record + rec = record.serialization_message() + rec["caso-timestamp"] = ct + self.sock.send((json.dumps(rec) + "\n").encode("utf-8")) except socket.error as e: raise exception.LogstashConnectionError( host=self.host, port=self.port, exception=e diff --git a/caso/record.py b/caso/record.py index 9939f01..855e637 100644 --- a/caso/record.py +++ b/caso/record.py @@ -48,6 +48,21 @@ def ssm_message(self): """Render record as the expected SSM message.""" raise NotImplementedError("Method not implemented") + def serialization_message(self): + """Render record as the expected logstash message.""" + opts = { + "by_alias": True, + "exclude_none": True, + } + # NOTE(acostatnini): part related to the definition of the logstash message to + # be serialized before to send data + # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this + # is just a dictionary representation of the object, where no serialization is + # done. In order to get objects correctly serialized we need to convert to JSON, + # then reload the model + serialized_record = json.loads(self.json(**opts)) + return serialized_record + class _ValidCloudStatus(str, enum.Enum): """This is a private class to enum valid cloud statuses."""