24
24
25
25
from caso import exception
26
26
import caso .messenger
27
-
27
+ #add json lib
28
+ import json
29
+ #add datetime lib
30
+ import datetime
28
31
29
32
opts = [
30
33
cfg .StrOpt ("host" , default = "localhost" , help = "Logstash host to send records to." ),
@@ -49,11 +52,29 @@ def __init__(self, host=CONF.logstash.host, port=CONF.logstash.port):
49
52
self .sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
50
53
51
54
def push (self , records ):
55
+
56
+ # NOTE(acostantini): code for the serialization and push of the
57
+ # records in logstash. JSON format to be used and encoding UTF-8
58
+ """Serialization of records to be sent to logstash"""
59
+ if not records :
60
+ return
61
+
62
+ #Actual timestamp to be added on each record
63
+ cdt = datetime .datetime .now ()
64
+ ct = int (datetime .datetime .now ().timestamp ())
65
+
66
+ #Open the connection with LS
67
+ self .sock .connect ((self .host , self .port ))
68
+
52
69
"""Push records to logstash using tcp."""
53
70
try :
54
- self .sock .connect ((self .host , self .port ))
55
- for _ , record in six .iteritems (records ):
56
- self .sock .sendall (record .as_json () + "\n " )
71
+ for record in records :
72
+ #serialization of record
73
+ rec = record .logstash_message ()
74
+ #cASO timestamp added to each record
75
+ rec ['caso-timestamp' ]= ct
76
+ #Send the record to LS
77
+ self .sock .send ((json .dumps (rec )+ '\n ' ).encode ('utf-8' ))
57
78
except socket .error as e :
58
79
raise exception .LogstashConnectionError (
59
80
host = self .host , port = self .port , exception = e
0 commit comments