14
14
# License for the specific language governing permissions and limitations
15
15
# under the License.
16
16
17
- """Module containing a Logstask cASO messenger."""
17
+ """Module containing a Kafka cASO messenger."""
18
18
19
19
import socket
20
20
@@ -49,7 +49,7 @@ class KafkaMessenger(caso.messenger.BaseMessenger):
49
49
"""Format and send records to a kafka host."""
50
50
51
51
def __init__ (self , brokers = CONF .kafka .brokers , topic = CONF .kafka .topic , serviceName = CONF .kafka .serviceName , username = CONF .kafka .username , password = CONF .kafka .password ):
52
- """Get a logstash messenger for a given host and port."""
52
+ """Get a kafka messenger for a given host and port."""
53
53
super (KafkaMessenger , self ).__init__ ()
54
54
self .brokers = CONF .kafka .brokers
55
55
self .topic = CONF .kafka .topic
@@ -71,7 +71,7 @@ def delivery_report(self, err, msg):
71
71
def push (self , records ):
72
72
73
73
# NOTE(acostantini): code for the serialization and push of the
74
- # records in logstash. JSON format to be used and encoding UTF-8
74
+ # records in logstash via kafka . JSON format to be used and encoding UTF-8
75
75
"""Serialization of records to be sent to logstash via kafka"""
76
76
if not records :
77
77
return
@@ -90,11 +90,11 @@ def push(self, records):
90
90
'sasl.username' : self .username ,
91
91
'sasl.password' : self .password
92
92
93
- # We can tune as args batch_size and linger_ms
93
+ # Producer
94
94
producer = Producer (** conf )
95
95
96
96
97
- """Push records to logstash using tcp ."""
97
+ """Push records to be serialized using logstash_message definition ."""
98
98
for record in records :
99
99
#serialization of record
100
100
rec = record .logstash_message ()
0 commit comments