Skip to content

Commit f6c434e

Browse files
alexcos78alvarolopez
authored andcommitted
feat: add kafka support
1 parent 9be0fe3 commit f6c434e

File tree

3 files changed

+117
-23
lines changed

3 files changed

+117
-23
lines changed

caso/messenger/kafka.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2014 Spanish National Research Council (CSIC)
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License. You may obtain
7+
# a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
# License for the specific language governing permissions and limitations
15+
# under the License.
16+
17+
"""Module containing a Logstask cASO messenger."""
18+
19+
import socket
20+
21+
from oslo_config import cfg
22+
from oslo_log import log
23+
import six
24+
25+
from caso import exception
26+
import caso.messenger
27+
#add json lib
28+
import json
29+
#add datetime lib
30+
import datetime
31+
#add confluent lib
32+
from confluent_kafka import Producer, Consumer
33+
34+
opts = [
35+
cfg.StrOpt("brokers", default="localhost:9092", help="Kafka host to send records to."),
36+
cfg.StrOpt("topic", default="caso", help="Kafka server topic."),
37+
cfg.StrOpt("serviceName", default="caso", help="Kafka server service name."),
38+
cfg.StrOpt("username", default="username", help="Kafka server username."),
39+
cfg.StrOpt("password", default="password", help="Kafka server password."),
40+
]
41+
42+
CONF = cfg.CONF
43+
CONF.register_opts(opts, group="kafka")
44+
45+
LOG = log.getLogger(__name__)
46+
47+
48+
class KafkaMessenger(caso.messenger.BaseMessenger):
49+
"""Format and send records to a kafka host."""
50+
51+
def __init__(self, brokers=CONF.kafka.brokers, topic=CONF.kafka.topic, serviceName=CONF.kafka.serviceName, username=CONF.kafka.username, password=CONF.kafka.password):
52+
"""Get a logstash messenger for a given host and port."""
53+
super(KafkaMessenger, self).__init__()
54+
self.brokers = CONF.kafka.brokers
55+
self.topic = CONF.kafka.topic
56+
self.serviceName = CONF.kafka.serviceName
57+
self.username = CONF.kafka.username
58+
self.password = CONF.kafka.password
59+
60+
61+
def delivery_report(self, err, msg):
62+
""" Called once for each message produced to indicate delivery result.
63+
Triggered by poll() or flush(). """
64+
if err is not None:
65+
print('Message delivery failed: {}'.format(err))
66+
else:
67+
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
68+
69+
70+
71+
def push(self, records):
72+
73+
# NOTE(acostantini): code for the serialization and push of the
74+
# records in logstash. JSON format to be used and encoding UTF-8
75+
"""Serialization of records to be sent to logstash via kafka"""
76+
if not records:
77+
return
78+
79+
#Actual timestamp to be added on each record
80+
cdt = datetime.datetime.now()
81+
ct = int(datetime.datetime.now().timestamp())
82+
83+
# Producer with SSL support
84+
conf = {
85+
'bootstrap.servers': self.brokers,
86+
'ssl.ca.location': "/var/private/ssl/accounting/ca.crt",
87+
'security.protocol': 'SASL_SSL',
88+
'sasl.mechanisms': 'PLAIN',
89+
'sasl.kerberos.service.name': self.serviceName,
90+
'sasl.username': self.username,
91+
'sasl.password': self.password
92+
93+
# We can tune as args batch_size and linger_ms
94+
producer = Producer(**conf)
95+
96+
97+
"""Push records to logstash using tcp."""
98+
for record in records:
99+
#serialization of record
100+
rec=record.logstash_message()
101+
#cASO timestamp added to each record
102+
rec['caso-timestamp']=ct
103+
104+
#Send the record to Kafka
105+
106+
try:
107+
producer.poll(0)
108+
producer.produce(self.topic, value=json.dumps(rec).encode('utf-8'),
109+
callback=self.delivery_report)
110+
111+
except ValueError as err:
112+
print("This alert can't be read" % (err))
113+
114+
producer.flush()
115+

caso/opts.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import caso.manager
2626
import caso.messenger.logstash
2727
import caso.messenger.ssm
28+
import caso.messenger.kafka
2829

2930

3031
def list_opts():
@@ -43,5 +44,6 @@ def list_opts():
4344
("benchmark", caso.extract.openstack.nova.benchmark_opts),
4445
("keystone_auth", caso.keystone_client.opts),
4546
("logstash", caso.messenger.logstash.opts),
47+
("kafka", caso.messenger.kafka.opts),
4648
("ssm", caso.messenger.ssm.opts),
4749
]

requirements.txt

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)