diff --git a/caso/messenger/kafka.py b/caso/messenger/kafka.py
new file mode 100644
index 0000000..552fd01
--- /dev/null
+++ b/caso/messenger/kafka.py
@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2014 Spanish National Research Council (CSIC)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Module containing a Kafka cASO messenger."""
+
+import socket
+
+from oslo_config import cfg
+from oslo_log import log
+import six
+
+from caso import exception
+import caso.messenger
+#add json lib
+import json
+#add datetime lib
+import datetime
+#add confluent lib
+from confluent_kafka import Producer, Consumer
+
+opts = [
+ cfg.StrOpt("brokers", default="localhost:9092", help="Kafka host to send records to."),
+ cfg.StrOpt("topic", default="caso", help="Kafka server topic."),
+ cfg.StrOpt("serviceName", default="caso", help="Kafka server service name."),
+ cfg.StrOpt("username", default="username", help="Kafka server username."),
+ cfg.StrOpt("password", default="password", help="Kafka server password."),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(opts, group="kafka")
+
+LOG = log.getLogger(__name__)
+
+
+class KafkaMessenger(caso.messenger.BaseMessenger):
+ """Format and send records to a kafka host."""
+
+ def __init__(self, brokers=CONF.kafka.brokers, topic=CONF.kafka.topic, serviceName=CONF.kafka.serviceName, username=CONF.kafka.username, password=CONF.kafka.password):
+ """Get a kafka messenger for a given host and port."""
+ super(KafkaMessenger, self).__init__()
+ self.brokers = CONF.kafka.brokers
+ self.topic = CONF.kafka.topic
+ self.serviceName = CONF.kafka.serviceName
+ self.username = CONF.kafka.username
+ self.password = CONF.kafka.password
+
+
+ def delivery_report(self, err, msg):
+ """ Called once for each message produced to indicate delivery result.
+ Triggered by poll() or flush(). """
+ if err is not None:
+ print('Message delivery failed: {}'.format(err))
+ else:
+ print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
+
+
+
+ def push(self, records):
+
+ # NOTE(acostantini): code for the serialization and push of the
+ # records in logstash via kafka. JSON format to be used and encoding UTF-8
+ """Serialization of records to be sent to logstash via kafka"""
+ if not records:
+ return
+
+ #Actual timestamp to be added on each record
+ cdt = datetime.datetime.now()
+ ct = int(datetime.datetime.now().timestamp())
+
+ # Producer with SSL support
+ conf = {
+ 'bootstrap.servers': self.brokers,
+ 'ssl.ca.location': "/var/private/ssl/accounting/ca.crt",
+ 'security.protocol': 'SASL_SSL',
+ 'sasl.mechanisms': 'PLAIN',
+ 'sasl.kerberos.service.name': self.serviceName,
+ 'sasl.username': self.username,
+ 'sasl.password': self.password
+
+ # Producer
+ producer = Producer(**conf)
+
+
+ """Push records to be serialized using logstash_message definition."""
+ for record in records:
+ #serialization of record
+ rec=record.serialization_message()
+ #cASO timestamp added to each record
+ rec['caso-timestamp']=ct
+
+ #Send the record to Kafka
+
+ try:
+ producer.poll(0)
+ producer.produce(self.topic, value=json.dumps(rec).encode('utf-8'),
+ callback=self.delivery_report)
+
+ except ValueError as err:
+ print("This alert can't be read" % (err))
+
+ producer.flush()
+
diff --git a/caso/opts.py b/caso/opts.py
index c7955e1..e6908ad 100644
--- a/caso/opts.py
+++ b/caso/opts.py
@@ -25,6 +25,7 @@
import caso.manager
import caso.messenger.logstash
import caso.messenger.ssm
+import caso.messenger.kafka
def list_opts():
@@ -43,5 +44,6 @@ def list_opts():
("benchmark", caso.extract.openstack.nova.benchmark_opts),
("keystone_auth", caso.keystone_client.opts),
("logstash", caso.messenger.logstash.opts),
+ ("kafka", caso.messenger.kafka.opts),
("ssm", caso.messenger.ssm.opts),
]
diff --git a/caso/record.py b/caso/record.py
index 9939f01..b985f40 100644
--- a/caso/record.py
+++ b/caso/record.py
@@ -48,7 +48,21 @@ def ssm_message(self):
"""Render record as the expected SSM message."""
raise NotImplementedError("Method not implemented")
-
+ def serialization_message(self):
+ """Render record as the expected logstash message."""
+ opts = {
+ "by_alias": True,
+ "exclude_none": True,
+ }
+ # NOTE(acostatnini): part related to the definition of the logstash message to be
+ # serialized before to send data
+ # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this
+ # is just a dictionary representation of the object, where no serialization is
+ # done. In order to get objects correctly serialized we need to convert to JSON,
+ # then reload the model
+ serialized_record = json.loads(self.json(**opts))
+ return serialized_record
+
class _ValidCloudStatus(str, enum.Enum):
"""This is a private class to enum valid cloud statuses."""
diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst
index d48be1c..e81387a 100644
--- a/doc/source/configuration.rst
+++ b/doc/source/configuration.rst
@@ -144,6 +144,7 @@ of every option. You should check at least the following options:
* ``ssm`` for publishing APEL records.
* ``logstash`` for publishing to Logstash.
+ * ``kafka`` for publishing to Kafka.
* ``noop`` do nothing at all.
Note that there might be other messengers available in the system if they are
@@ -201,6 +202,20 @@ messenger. Available options:
* ``host`` (default: ``localhost``), host of Logstash server.
* ``port`` (default: ``5000``), Logstash server port.
+``[kafka]`` section
+----------------------
+
+Options defined here configure the `kafka `_
+messenger. based on `confluent-kafka `_:
+
+* ``brokers`` (default: ``localhost:9092``), endpoint of Kafka server. Port must be provided for each endpoint.
+* ``topic`` (default: ``caso``), Kafka topic.
+* ``serviceName`` (default: ``caso``), Kafka service name.
+* ``username`` (default: ``username``), Kafka username.
+* ``password`` (default: ``password``), Kafka password.
+
+Note: the connection to Kafka is Simple Authentication and Security Layer (SASL) authentication configuration enabled. SSL is also enabled. The CA certificate (base64) must be provided under `/var/private/ssl/accounting/ca.crt` of the host running cASO.
+
Other cASO configuration options
--------------------------------
diff --git a/pyproject.toml b/pyproject.toml
index dfaef2f..677baab 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -126,6 +126,10 @@ types-six = "^1.16.21.20240513"
types-python-dateutil = "^2.9.0.20240906"
+
+[tool.poetry.group.kafka.dependencies]
+confluent-kafka = "^2.5.3"
+
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
diff --git a/requirements.txt b/requirements.txt
deleted file mode 100644
index 2e02f3c..0000000
--- a/requirements.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# The order of packages is significant, because pip processes them in the order
-# of appearance. Changing the order has an impact on the overall integration
-# process, which may cause wedges in the gate later.
-pbr>=4.1.0
-six>=1.9.0
-
-dirq
-python-dateutil>=2.4.2
-
-oslo.config>=2.3.0 # Apache-2.0
-oslo.concurrency>=3.20.0 # Apache-2.0
-oslo.log>=1.8.0 # Apache-2.0
-oslo-utils>=4.10.1
-
-python-cinderclient>=5.0.0 # Apache-2.0
-python-novaclient>=2.28.1 # Apache-2.0
-python-keystoneclient>=3.0.0 # Apache-2.0
-python-glanceclient>=0.18.0 # Apache-2.0
-python-neutronclient>=6.7.0 # Apache-2.0
-keystoneauth1>=3.4.0 # Apache-2.0
-
-stevedore
-pydantic>=2.0.0