33import sys
44from kafka_lib import ProducerThread , ConsumerThread
55from log_analyser .log_analyser import LogAnalyser
6+ import configparser
67
78
89class DatastrikePythonProcessing :
910 def __init__ (self ):
1011
1112 self .running = True
1213
13- self .producer_thread = ProducerThread ("localhost:29092" )
14+ self .config = configparser .ConfigParser ()
15+ self .config .read ("datastrike_python_processing.cfg" )
16+
17+
18+ self .kafka_url = self .config ["kafka" ]["url" ]
19+
20+ print (self .kafka_url )
21+ self .producer_thread = ProducerThread (self .kafka_url )
1422
15- self .consumer_thread = ConsumerThread ("localhost:29092" )
23+ self .consumer_thread = ConsumerThread (self . kafka_url )
1624 self .consumer_thread .add_topics ("analyse" , self .on_callback_test )
1725
1826 self .consumer_thread .start ()
@@ -28,13 +36,15 @@ def on_callback_test(self, topic, data):
2836
2937 if self .check_txt_extension (fileName ):
3038
31- la = LogAnalyser (filePath , fileName , teamId )
32- la .run ()
33- if la .map != None :
34- self .producer_thread .send ("analyse.report" , la .map .export_json ())
35- else :
36- self .producer_thread .send ("analyse.report" , {"error" : "File txt not correct" })
37-
39+ try :
40+ la = LogAnalyser (filePath , fileName , teamId )
41+ la .run ()
42+ if la .map != None :
43+ self .producer_thread .send ("analyse.report" , la .map .export_json ())
44+ else :
45+ self .producer_thread .send ("analyse.report" , {"error" : "File txt not correct" })
46+ except Exception as e :
47+ self .producer_thread .send ("analyse.report" , {"error" : "{}" .format (e )})
3848 else :
3949 self .producer_thread .send ("analyse.report" , {"error" : "File extension not correct" })
4050
0 commit comments