Skip to content

New Kafka module #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions caso/messenger/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-

# Copyright 2014 Spanish National Research Council (CSIC)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Module containing a Kafka cASO messenger."""

import socket

from oslo_config import cfg
from oslo_log import log
import six

from caso import exception
import caso.messenger
#add json lib
import json
#add datetime lib
import datetime
#add confluent lib
from confluent_kafka import Producer, Consumer

opts = [
cfg.StrOpt("brokers", default="localhost:9092", help="Kafka host to send records to."),
cfg.StrOpt("topic", default="caso", help="Kafka server topic."),
cfg.StrOpt("serviceName", default="caso", help="Kafka server service name."),
cfg.StrOpt("username", default="username", help="Kafka server username."),
cfg.StrOpt("password", default="password", help="Kafka server password."),
]

CONF = cfg.CONF
CONF.register_opts(opts, group="kafka")

LOG = log.getLogger(__name__)


class KafkaMessenger(caso.messenger.BaseMessenger):
"""Format and send records to a kafka host."""

def __init__(self, brokers=CONF.kafka.brokers, topic=CONF.kafka.topic, serviceName=CONF.kafka.serviceName, username=CONF.kafka.username, password=CONF.kafka.password):
"""Get a kafka messenger for a given host and port."""
super(KafkaMessenger, self).__init__()
self.brokers = CONF.kafka.brokers
self.topic = CONF.kafka.topic
self.serviceName = CONF.kafka.serviceName
self.username = CONF.kafka.username
self.password = CONF.kafka.password


def delivery_report(self, err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))



def push(self, records):

# NOTE(acostantini): code for the serialization and push of the
# records in logstash via kafka. JSON format to be used and encoding UTF-8
"""Serialization of records to be sent to logstash via kafka"""
if not records:
return

#Actual timestamp to be added on each record
cdt = datetime.datetime.now()
ct = int(datetime.datetime.now().timestamp())

# Producer with SSL support
conf = {
'bootstrap.servers': self.brokers,
'ssl.ca.location': "/var/private/ssl/accounting/ca.crt",
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.kerberos.service.name': self.serviceName,
'sasl.username': self.username,
'sasl.password': self.password

# Producer
producer = Producer(**conf)


"""Push records to be serialized using logstash_message definition."""
for record in records:
#serialization of record
rec=record.serialization_message()
#cASO timestamp added to each record
rec['caso-timestamp']=ct

#Send the record to Kafka

try:
producer.poll(0)
producer.produce(self.topic, value=json.dumps(rec).encode('utf-8'),
callback=self.delivery_report)

except ValueError as err:
print("This alert can't be read" % (err))

producer.flush()

2 changes: 2 additions & 0 deletions caso/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import caso.manager
import caso.messenger.logstash
import caso.messenger.ssm
import caso.messenger.kafka

Check warning on line 28 in caso/opts.py

View check run for this annotation

Codecov / codecov/patch

caso/opts.py#L28

Added line #L28 was not covered by tests


def list_opts():
Expand All @@ -43,5 +44,6 @@
("benchmark", caso.extract.openstack.nova.benchmark_opts),
("keystone_auth", caso.keystone_client.opts),
("logstash", caso.messenger.logstash.opts),
("kafka", caso.messenger.kafka.opts),
("ssm", caso.messenger.ssm.opts),
]
16 changes: 15 additions & 1 deletion caso/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@
"""Render record as the expected SSM message."""
raise NotImplementedError("Method not implemented")


def serialization_message(self):
"""Render record as the expected logstash message."""
opts = {

Check warning on line 53 in caso/record.py

View check run for this annotation

Codecov / codecov/patch

caso/record.py#L53

Added line #L53 was not covered by tests
"by_alias": True,
"exclude_none": True,
}
# NOTE(acostatnini): part related to the definition of the logstash message to be
# serialized before to send data
# NOTE(aloga): do not iter over the dictionary returned by record.dict() as this
# is just a dictionary representation of the object, where no serialization is
# done. In order to get objects correctly serialized we need to convert to JSON,
# then reload the model
serialized_record = json.loads(self.json(**opts))
return serialized_record

Check warning on line 64 in caso/record.py

View check run for this annotation

Codecov / codecov/patch

caso/record.py#L63-L64

Added lines #L63 - L64 were not covered by tests

class _ValidCloudStatus(str, enum.Enum):
"""This is a private class to enum valid cloud statuses."""

Expand Down
15 changes: 15 additions & 0 deletions doc/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ of every option. You should check at least the following options:

* ``ssm`` for publishing APEL records.
* ``logstash`` for publishing to Logstash.
* ``kafka`` for publishing to Kafka.
* ``noop`` do nothing at all.

Note that there might be other messengers available in the system if they are
Expand Down Expand Up @@ -201,6 +202,20 @@ messenger. Available options:
* ``host`` (default: ``localhost``), host of Logstash server.
* ``port`` (default: ``5000``), Logstash server port.

``[kafka]`` section
----------------------

Options defined here configure the `kafka <https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/>`_
messenger. based on `confluent-kafka <https://pypi.org/project/confluent-kafka/>`_:

* ``brokers`` (default: ``localhost:9092``), endpoint of Kafka server. Port must be provided for each endpoint.
* ``topic`` (default: ``caso``), Kafka topic.
* ``serviceName`` (default: ``caso``), Kafka service name.
* ``username`` (default: ``username``), Kafka username.
* ``password`` (default: ``password``), Kafka password.

Note: the connection to Kafka is Simple Authentication and Security Layer (SASL) authentication configuration enabled. SSL is also enabled. The CA certificate (base64) must be provided under `/var/private/ssl/accounting/ca.crt` of the host running cASO.

Other cASO configuration options
--------------------------------

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ types-six = "^1.16.21.20240513"
types-python-dateutil = "^2.9.0.20240906"



[tool.poetry.group.kafka.dependencies]
confluent-kafka = "^2.5.3"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
23 changes: 0 additions & 23 deletions requirements.txt

This file was deleted.

Loading