55from elasticsearch import Elasticsearch , helpers
66import datetime
77
8- # Configure logging
98logging .basicConfig (
109 format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ,
1110 level = logging .INFO
@@ -62,7 +61,7 @@ def _create_index_if_not_exists(self):
6261 """Create the Elasticsearch index if it doesn't exist."""
6362 try :
6463 if not self .es_client .indices .exists (index = self .index_name ):
65- # Define index mapping
64+ # index mapping
6665 mappings = {
6766 "mappings" : {
6867 "properties" : {
@@ -75,7 +74,6 @@ def _create_index_if_not_exists(self):
7574 }
7675 }
7776
78- # Create the index
7977 self .es_client .indices .create (index = self .index_name , body = mappings )
8078 logger .info (f"Created Elasticsearch index: { self .index_name } " )
8179 else :
@@ -87,19 +85,15 @@ def _create_index_if_not_exists(self):
8785 def _prepare_document (self , result ):
8886 """Prepare the document for indexing in Elasticsearch."""
8987 try :
90- # Extract the original log information
9188 original_log = result .get ('original_log' , {})
9289 anomaly_detection = result .get ('anomaly_detection' , {})
9390
94- # Parse timestamps
9591 timestamp = datetime .datetime .now ().isoformat ()
9692
9793 log_timestamp = None
9894 if 'timestamp' in original_log and original_log ['timestamp' ]:
9995 try :
100- # Try to parse the timestamp into a proper date format for Elasticsearch
101- # This depends on the format of your timestamps
102- # Attempt multiple common formats
96+ # parse the timestamp into a proper date format for Elasticsearch
10397 timestamp_formats = [
10498 "%Y-%m-%d %H:%M:%S" ,
10599 "%d-%m %H:%M:%S.%f" ,
@@ -119,7 +113,6 @@ def _prepare_document(self, result):
119113 except Exception as e :
120114 logger .warning (f"Could not parse timestamp: { original_log .get ('timestamp' )} , error: { e } " )
121115
122- # Create the document
123116 document = {
124117 "timestamp" : timestamp ,
125118 "log_timestamp" : log_timestamp ,
@@ -133,7 +126,7 @@ def _prepare_document(self, result):
133126 return document
134127 except Exception as e :
135128 logger .error (f"Error preparing document: { e } " )
136- # Return a basic document to prevent pipeline failure
129+ # a basic document to prevent pipeline failure
137130 return {
138131 "timestamp" : datetime .datetime .now ().isoformat (),
139132 "message" : "Error preparing document" ,
@@ -144,19 +137,14 @@ def _prepare_document(self, result):
144137 def process_message (self , message ):
145138 """Process a message from Kafka and store it in Elasticsearch."""
146139 try :
147- # Extract data from message
148140 value = message .value
149141
150- # Prepare the document
151142 document = self ._prepare_document (value )
152-
153- # Index the document
143+
154144 response = self .es_client .index (index = self .index_name , document = document )
155145
156- # Log the response
157146 logger .info (f"Document indexed in Elasticsearch: id={ response ['_id' ]} " )
158147
159- # Log more details if it's an anomaly
160148 if document .get ('is_anomaly' , False ):
161149 logger .warning (
162150 f"message={ document .get ('message' )[:100 ]} ..."
@@ -172,7 +160,6 @@ def run(self):
172160 try :
173161 logger .info (f"Elasticsearch connector started, listening to topic: { self .topic } " )
174162
175- # Process messages continuously
176163 for message in self .consumer :
177164 self .process_message (message )
178165
@@ -194,4 +181,4 @@ def close(self):
194181
195182if __name__ == "__main__" :
196183 connector = ElasticsearchConnector ()
197- connector .run ()
184+ connector .run ()
0 commit comments