@@ -61,11 +61,8 @@ def _create_kafka_producer(cfg):
6161 return None
6262
6363 brokers = None
64- try :
65- if cfg .has_option ('kafka' , 'brokers' ):
66- brokers = cfg .get ('kafka' , 'brokers' )
67- except Exception :
68- brokers = None
64+ if cfg .has_option ('kafka' , 'brokers' ):
65+ brokers = cfg .get ('kafka' , 'brokers' )
6966
7067 if not brokers :
7168 LOG .error ("No 'kafka.brokers' configured, kafka producer disabled" )
@@ -213,21 +210,8 @@ def run(config_path=None):
213210
214211 LOG .info ("Starting kafka producer (config: %s)" , config_path )
215212
216- # poll interval seconds
217- poll_interval = 2
218- try :
219- if cfg .has_option ('kafka' , 'poll_interval' ):
220- poll_interval = int (cfg .get ('kafka' , 'poll_interval' ))
221- except Exception :
222- poll_interval = 2
223-
224- # topic prefix default
225- topic_prefix = "nipap."
226- try :
227- if cfg .has_option ('kafka' , 'topic_prefix' ):
228- topic_prefix = cfg .get ('kafka' , 'topic_prefix' )
229- except Exception :
230- topic_prefix = "nipap."
213+ poll_interval = int (cfg .get ('kafka' , 'poll_interval' ))
214+ topic_prefix = cfg .get ('kafka' , 'topic_prefix' )
231215
232216 conn = _connect_db (cfg )
233217 cur = conn .cursor ()
@@ -331,6 +315,17 @@ def run(config_path=None):
331315 else :
332316 conn .rollback ()
333317
318+ except psycopg2 .InterfaceError as e :
319+ LOG .error ("Database interface error in kafka_producer loop: %s. Reconnecting to database." , e )
320+ try :
321+ conn .close ()
322+ except Exception :
323+ pass
324+ # reconnect to database and update cursor
325+ conn = _connect_db (cfg )
326+ cur = conn .cursor ()
327+ # backoff before next attempt
328+ time .sleep (max (1 , poll_interval ))
334329 except Exception as e :
335330 LOG .exception ("Unexpected error in kafka_producer loop: %s" , e )
336331 try :
0 commit comments