From f6c434e466ffdb292bbfbffd79ea62c2a7a773df Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Wed, 19 Jun 2024 18:58:07 +0200 Subject: [PATCH 01/12] feat: add kafka support --- caso/messenger/kafka.py | 115 ++++++++++++++++++++++++++++++++++++++++ caso/opts.py | 2 + requirements.txt | 23 -------- 3 files changed, 117 insertions(+), 23 deletions(-) create mode 100644 caso/messenger/kafka.py delete mode 100644 requirements.txt diff --git a/caso/messenger/kafka.py b/caso/messenger/kafka.py new file mode 100644 index 0000000..7813e10 --- /dev/null +++ b/caso/messenger/kafka.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- + +# Copyright 2014 Spanish National Research Council (CSIC) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Module containing a Logstask cASO messenger.""" + +import socket + +from oslo_config import cfg +from oslo_log import log +import six + +from caso import exception +import caso.messenger +#add json lib +import json +#add datetime lib +import datetime +#add confluent lib +from confluent_kafka import Producer, Consumer + +opts = [ + cfg.StrOpt("brokers", default="localhost:9092", help="Kafka host to send records to."), + cfg.StrOpt("topic", default="caso", help="Kafka server topic."), + cfg.StrOpt("serviceName", default="caso", help="Kafka server service name."), + cfg.StrOpt("username", default="username", help="Kafka server username."), + cfg.StrOpt("password", default="password", help="Kafka server password."), +] + +CONF = cfg.CONF +CONF.register_opts(opts, group="kafka") + +LOG = log.getLogger(__name__) + + +class KafkaMessenger(caso.messenger.BaseMessenger): + """Format and send records to a kafka host.""" + + def __init__(self, brokers=CONF.kafka.brokers, topic=CONF.kafka.topic, serviceName=CONF.kafka.serviceName, username=CONF.kafka.username, password=CONF.kafka.password): + """Get a logstash messenger for a given host and port.""" + super(KafkaMessenger, self).__init__() + self.brokers = CONF.kafka.brokers + self.topic = CONF.kafka.topic + self.serviceName = CONF.kafka.serviceName + self.username = CONF.kafka.username + self.password = CONF.kafka.password + + + def delivery_report(self, err, msg): + """ Called once for each message produced to indicate delivery result. + Triggered by poll() or flush(). """ + if err is not None: + print('Message delivery failed: {}'.format(err)) + else: + print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + + + + def push(self, records): + + # NOTE(acostantini): code for the serialization and push of the + # records in logstash. JSON format to be used and encoding UTF-8 + """Serialization of records to be sent to logstash via kafka""" + if not records: + return + + #Actual timestamp to be added on each record + cdt = datetime.datetime.now() + ct = int(datetime.datetime.now().timestamp()) + + # Producer with SSL support + conf = { + 'bootstrap.servers': self.brokers, + 'ssl.ca.location': "/var/private/ssl/accounting/ca.crt", + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'PLAIN', + 'sasl.kerberos.service.name': self.serviceName, + 'sasl.username': self.username, + 'sasl.password': self.password + + # We can tune as args batch_size and linger_ms + producer = Producer(**conf) + + + """Push records to logstash using tcp.""" + for record in records: + #serialization of record + rec=record.logstash_message() + #cASO timestamp added to each record + rec['caso-timestamp']=ct + + #Send the record to Kafka + + try: + producer.poll(0) + producer.produce(self.topic, value=json.dumps(rec).encode('utf-8'), + callback=self.delivery_report) + + except ValueError as err: + print("This alert can't be read" % (err)) + + producer.flush() + diff --git a/caso/opts.py b/caso/opts.py index c7955e1..e6908ad 100644 --- a/caso/opts.py +++ b/caso/opts.py @@ -25,6 +25,7 @@ import caso.manager import caso.messenger.logstash import caso.messenger.ssm +import caso.messenger.kafka def list_opts(): @@ -43,5 +44,6 @@ def list_opts(): ("benchmark", caso.extract.openstack.nova.benchmark_opts), ("keystone_auth", caso.keystone_client.opts), ("logstash", caso.messenger.logstash.opts), + ("kafka", caso.messenger.kafka.opts), ("ssm", caso.messenger.ssm.opts), ] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 2e02f3c..0000000 --- a/requirements.txt +++ /dev/null @@ -1,23 +0,0 @@ -# The order of packages is significant, because pip processes them in the order -# of appearance. Changing the order has an impact on the overall integration -# process, which may cause wedges in the gate later. -pbr>=4.1.0 -six>=1.9.0 - -dirq -python-dateutil>=2.4.2 - -oslo.config>=2.3.0 # Apache-2.0 -oslo.concurrency>=3.20.0 # Apache-2.0 -oslo.log>=1.8.0 # Apache-2.0 -oslo-utils>=4.10.1 - -python-cinderclient>=5.0.0 # Apache-2.0 -python-novaclient>=2.28.1 # Apache-2.0 -python-keystoneclient>=3.0.0 # Apache-2.0 -python-glanceclient>=0.18.0 # Apache-2.0 -python-neutronclient>=6.7.0 # Apache-2.0 -keystoneauth1>=3.4.0 # Apache-2.0 - -stevedore -pydantic>=2.0.0 From 88d807ff79918a065d6168fee2138f81079d0d6a Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 14:36:02 +0200 Subject: [PATCH 02/12] doc: Update configuration.rst with kafka options --- doc/source/configuration.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index d48be1c..fcb4ce0 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -201,6 +201,20 @@ messenger. Available options: * ``host`` (default: ``localhost``), host of Logstash server. * ``port`` (default: ``5000``), Logstash server port. +``[kafka]`` section +---------------------- + +Options defined here configure the `kafka `_ +messenger. Mandatory options: + +* ``brokers`` (default: ``localhost:9092``), host of Kafka server. Port must be provided. +* ``topic`` (default: ``caso``), Kafka topic. +* ``serviceName`` (default: ``caso``), Kafka service name. +* ``username`` (default: ``username``), Kafka username. +* ``password`` (default: ``password``), Kafka password. + +Note: the connection to Kafka is SSL enabled. The CA certificate must be provided under `/var/private/ssl/accounting/ca.crt` + Other cASO configuration options -------------------------------- From 6b9ec6cfef9738a5a49fc83bd2b95d228901f37a Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 14:47:26 +0200 Subject: [PATCH 03/12] doc: Update configuration.rst typo documentation --- doc/source/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index fcb4ce0..db81158 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -207,7 +207,7 @@ messenger. Available options: Options defined here configure the `kafka `_ messenger. Mandatory options: -* ``brokers`` (default: ``localhost:9092``), host of Kafka server. Port must be provided. +* ``brokers`` (default: ``localhost:9092``), endpoint of Kafka server. Port must be provided for each endpoint. * ``topic`` (default: ``caso``), Kafka topic. * ``serviceName`` (default: ``caso``), Kafka service name. * ``username`` (default: ``username``), Kafka username. From 0c1f694ff22aff46bd679a8e692332900eee8623 Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 14:59:35 +0200 Subject: [PATCH 04/12] doc: added kafka among messangers --- doc/source/configuration.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index db81158..8cca2af 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -144,6 +144,7 @@ of every option. You should check at least the following options: * ``ssm`` for publishing APEL records. * ``logstash`` for publishing to Logstash. + * ``kafka`` for publishing to Kafka. * ``noop`` do nothing at all. Note that there might be other messengers available in the system if they are From 5cfb720f351ef5686fc70d116901269a078e394d Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 15:17:40 +0200 Subject: [PATCH 05/12] doc: update doc with certificate info settings --- doc/source/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 8cca2af..6fbd76a 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -214,7 +214,7 @@ messenger. Mandatory options: * ``username`` (default: ``username``), Kafka username. * ``password`` (default: ``password``), Kafka password. -Note: the connection to Kafka is SSL enabled. The CA certificate must be provided under `/var/private/ssl/accounting/ca.crt` +Note: the connection to Kafka is SSL enabled. The CA certificate (base64) must be provided under `/var/private/ssl/accounting/ca.crt` of the host running cASO. Other cASO configuration options -------------------------------- From 46bea94e26f0fc57b9b058b2e2464c529c39dd31 Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 15:44:25 +0200 Subject: [PATCH 06/12] style: kafka messanger, comment on the code updated --- caso/messenger/kafka.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/caso/messenger/kafka.py b/caso/messenger/kafka.py index 7813e10..0d756a5 100644 --- a/caso/messenger/kafka.py +++ b/caso/messenger/kafka.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -"""Module containing a Logstask cASO messenger.""" +"""Module containing a Kafka cASO messenger.""" import socket @@ -49,7 +49,7 @@ class KafkaMessenger(caso.messenger.BaseMessenger): """Format and send records to a kafka host.""" def __init__(self, brokers=CONF.kafka.brokers, topic=CONF.kafka.topic, serviceName=CONF.kafka.serviceName, username=CONF.kafka.username, password=CONF.kafka.password): - """Get a logstash messenger for a given host and port.""" + """Get a kafka messenger for a given host and port.""" super(KafkaMessenger, self).__init__() self.brokers = CONF.kafka.brokers self.topic = CONF.kafka.topic @@ -71,7 +71,7 @@ def delivery_report(self, err, msg): def push(self, records): # NOTE(acostantini): code for the serialization and push of the - # records in logstash. JSON format to be used and encoding UTF-8 + # records in logstash via kafka. JSON format to be used and encoding UTF-8 """Serialization of records to be sent to logstash via kafka""" if not records: return @@ -90,11 +90,11 @@ def push(self, records): 'sasl.username': self.username, 'sasl.password': self.password - # We can tune as args batch_size and linger_ms + # Producer producer = Producer(**conf) - """Push records to logstash using tcp.""" + """Push records to be serialized using logstash_message definition.""" for record in records: #serialization of record rec=record.logstash_message() From 2ac0774b4bced5a37e7fb528eea4b0c85e4bdb15 Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 15:47:58 +0200 Subject: [PATCH 07/12] doc: update kafka documentation --- doc/source/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 6fbd76a..a4895f1 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -214,7 +214,7 @@ messenger. Mandatory options: * ``username`` (default: ``username``), Kafka username. * ``password`` (default: ``password``), Kafka password. -Note: the connection to Kafka is SSL enabled. The CA certificate (base64) must be provided under `/var/private/ssl/accounting/ca.crt` of the host running cASO. +Note: the connection to Kafka is Simple Authentication and Security Layer (SASL) authentication configuration enabled. SSL is also enabled. The CA certificate (base64) must be provided under `/var/private/ssl/accounting/ca.crt` of the host running cASO. Other cASO configuration options -------------------------------- From 8fdb0f100d050eb32550080b9da73390255cc66f Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 16:02:46 +0200 Subject: [PATCH 08/12] doc: update kafka documentation --- doc/source/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index a4895f1..f022ff6 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -206,7 +206,7 @@ messenger. Available options: ---------------------- Options defined here configure the `kafka `_ -messenger. Mandatory options: +messenger: * ``brokers`` (default: ``localhost:9092``), endpoint of Kafka server. Port must be provided for each endpoint. * ``topic`` (default: ``caso``), Kafka topic. From 155659193fd0e34916e7e532211f1afceebfe9ba Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 18:29:10 +0200 Subject: [PATCH 09/12] build: added confluent-kafka requirement --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index dfaef2f..677baab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,10 @@ types-six = "^1.16.21.20240513" types-python-dateutil = "^2.9.0.20240906" + +[tool.poetry.group.kafka.dependencies] +confluent-kafka = "^2.5.3" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" From a421baa48a39e115a98a92b781e990d51b73bff5 Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 18:30:38 +0200 Subject: [PATCH 10/12] doc: update for kafka --- doc/source/configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index f022ff6..e81387a 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -206,7 +206,7 @@ messenger. Available options: ---------------------- Options defined here configure the `kafka `_ -messenger: +messenger. based on `confluent-kafka `_: * ``brokers`` (default: ``localhost:9092``), endpoint of Kafka server. Port must be provided for each endpoint. * ``topic`` (default: ``caso``), Kafka topic. From 7d2bbd3beae2b4fe7735101f319afc6c9868053b Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 19:02:21 +0200 Subject: [PATCH 11/12] fix: added serialization of data --- caso/record.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/caso/record.py b/caso/record.py index 9939f01..b985f40 100644 --- a/caso/record.py +++ b/caso/record.py @@ -48,7 +48,21 @@ def ssm_message(self): """Render record as the expected SSM message.""" raise NotImplementedError("Method not implemented") - + def serialization_message(self): + """Render record as the expected logstash message.""" + opts = { + "by_alias": True, + "exclude_none": True, + } + # NOTE(acostatnini): part related to the definition of the logstash message to be + # serialized before to send data + # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this + # is just a dictionary representation of the object, where no serialization is + # done. In order to get objects correctly serialized we need to convert to JSON, + # then reload the model + serialized_record = json.loads(self.json(**opts)) + return serialized_record + class _ValidCloudStatus(str, enum.Enum): """This is a private class to enum valid cloud statuses.""" From ea937808cee253ee663c27a5db73e1cb0b726caf Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 19:03:34 +0200 Subject: [PATCH 12/12] fix: added serialization of message --- caso/messenger/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caso/messenger/kafka.py b/caso/messenger/kafka.py index 0d756a5..552fd01 100644 --- a/caso/messenger/kafka.py +++ b/caso/messenger/kafka.py @@ -97,7 +97,7 @@ def push(self, records): """Push records to be serialized using logstash_message definition.""" for record in records: #serialization of record - rec=record.logstash_message() + rec=record.serialization_message() #cASO timestamp added to each record rec['caso-timestamp']=ct