@@ -51,51 +51,87 @@ def create_plugins(self, config: dict[str, Any]) -> None:
5151 """Create source and output plugins from configuration."""
5252 # Create sources
5353 sources_config = config .get ("sources" , [])
54- for source_config in sources_config :
54+ for i , source_config in enumerate ( sources_config ) :
5555 source_type = source_config .get ("type" )
56+ source_config_data = source_config .get ("config" , {})
5657 if not source_type :
57- logger .error ("Source configuration missing 'type' field" )
58+ logger .error (
59+ f"Source configuration { i } missing 'type' field: { source_config } "
60+ )
5861 continue
5962
6063 try :
61- source = create_source (source_type , source_config . get ( "config" , {}) )
64+ source = create_source (source_type , source_config_data )
6265 self .sources .append (source )
6366 logger .info (f"Created source: { source_type } " )
6467 except Exception as e :
6568 logger .error (f"Failed to create source { source_type } : { e } " )
69+ logger .debug (
70+ f"Source creation error details - type: { source_type } , "
71+ f"config: { source_config_data } " ,
72+ exc_info = True ,
73+ )
6674
6775 # Create outputs
6876 outputs_config = config .get ("outputs" , [])
69- for output_config in outputs_config :
77+ for i , output_config in enumerate ( outputs_config ) :
7078 output_type = output_config .get ("type" )
79+ output_config_data = output_config .get ("config" , {})
7180 if not output_type :
72- logger .error ("Output configuration missing 'type' field" )
81+ logger .error (
82+ f"Output configuration { i } missing 'type' field: { output_config } "
83+ )
7384 continue
7485
7586 try :
76- output = create_output (output_type , output_config . get ( "config" , {}) )
87+ output = create_output (output_type , output_config_data )
7788 self .outputs .append (output )
7889 logger .info (f"Created output: { output_type } " )
7990 except Exception as e :
8091 logger .error (f"Failed to create output { output_type } : { e } " )
92+ logger .debug (
93+ f"Output creation error details - type: { output_type } , "
94+ f"config: { output_config_data } " ,
95+ exc_info = True ,
96+ )
8197
8298 async def start_plugins (self ) -> None :
8399 """Start all plugins."""
100+ logger .info (
101+ f"Starting { len (self .sources )} sources and { len (self .outputs )} outputs"
102+ )
103+
84104 # Start sources
85- for source in self .sources :
105+ for i , source in enumerate (self .sources ):
106+ source_name = source .__class__ .__name__
86107 try :
108+ logger .debug (
109+ f"Starting source { i + 1 } /{ len (self .sources )} : { source_name } "
110+ )
87111 await source .start ()
88- logger .info (f"Started source: { source . __class__ . __name__ } " )
112+ logger .debug (f"✓ Successfully started source: { source_name } " )
89113 except Exception as e :
90- logger .error (f"Failed to start source { source .__class__ .__name__ } : { e } " )
114+ logger .error (
115+ f"✗ Failed to start source { source_name } : { e } " , exc_info = True
116+ )
117+ logger .info (f"Source { source_name } will retry connection in background" )
91118
92119 # Start outputs
93- for output in self .outputs :
120+ for i , output in enumerate (self .outputs ):
121+ output_name = output .__class__ .__name__
94122 try :
123+ logger .debug (
124+ f"Starting output { i + 1 } /{ len (self .outputs )} : { output_name } "
125+ )
95126 await output .start ()
96- logger .info (f"Started output: { output . __class__ . __name__ } " )
127+ logger .debug (f"✓ Successfully started output: { output_name } " )
97128 except Exception as e :
98- logger .error (f"Failed to start output { output .__class__ .__name__ } : { e } " )
129+ logger .error (
130+ f"✗ Failed to start output { output_name } : { e } " , exc_info = True
131+ )
132+ logger .info (f"Output { output_name } will retry connection in background" )
133+
134+ logger .info ("All plugins started successfully" )
99135
100136 async def stop_plugins (self ) -> None :
101137 """Stop all plugins."""
@@ -125,59 +161,108 @@ async def process_events(self) -> None:
125161 logger .warning ("No outputs configured" )
126162 return
127163
128- logger .info ("Starting event processing..." )
164+ logger .info (
165+ f"Starting event processing with { len (self .sources )} sources "
166+ f"and { len (self .outputs )} outputs..."
167+ )
129168 self ._running = True
130169
131170 # Create event processing tasks
132171 tasks = []
133- for source in self .sources :
172+ for i , source in enumerate (self .sources ):
173+ source_name = source .__class__ .__name__
174+ logger .debug (
175+ f"Creating processing task { i + 1 } /{ len (self .sources )} "
176+ f"for source: { source_name } "
177+ )
134178 task = asyncio .create_task (self ._process_source_events (source ))
135179 tasks .append (task )
136180
181+ logger .debug (
182+ f"Created { len (tasks )} event processing tasks, starting event loop..."
183+ )
184+
137185 # Wait for shutdown or task completion
138186 try :
139- await asyncio .gather (* tasks , return_exceptions = True )
187+ results = await asyncio .gather (* tasks , return_exceptions = True )
188+ logger .info (f"Event processing tasks completed with results: { results } " )
140189 except asyncio .CancelledError :
141190 logger .info ("Event processing cancelled" )
191+ except Exception as e :
192+ logger .error (f"Unexpected error in event processing: { e } " , exc_info = True )
142193 finally :
143194 self ._running = False
195+ logger .info ("Event processing loop stopped" )
144196
145197 async def _process_source_events (self , source : Any ) -> None :
146198 """Process events from a single source."""
147199 source_name = source .__class__ .__name__
148200 logger .debug (f"Starting event processing for source: { source_name } " )
149201
202+ event_count = 0
203+ last_heartbeat = asyncio .get_event_loop ().time ()
204+ heartbeat_interval = 30.0 # Log heartbeat every 30 seconds
205+
150206 try :
207+ logger .debug (f"Calling source.listen() for { source_name } ..." )
208+
151209 async for event in source .listen ():
210+ event_count += 1
211+ current_time = asyncio .get_event_loop ().time ()
212+
213+ # Periodic heartbeat to show we're alive
214+ if current_time - last_heartbeat > heartbeat_interval :
215+ logger .info (
216+ f"Event processing: { source_name } processed "
217+ f"{ event_count } events"
218+ )
219+ last_heartbeat = current_time
220+
152221 # Check for shutdown before processing
153222 if self ._shutdown_event .is_set ():
154- logger .debug (f"Shutdown requested, stopping { source_name } " )
223+ logger .info (f"Shutdown requested, stopping { source_name } " )
155224 break
156225
157- logger .debug (f"Received event from { source_name } : { event } " )
226+ logger .debug (
227+ f"Received event #{ event_count } from { source_name } : { event } "
228+ )
158229
159230 # Send event to all outputs
160231 for output in self .outputs :
232+ output_name = output .__class__ .__name__
161233 try :
234+ logger .debug (f"Sending event { event .id } to { output_name } ..." )
162235 success = await output .send_event (event )
163236 if success :
164237 logger .debug (
165- f"Successfully sent event via "
166- f"{ output .__class__ .__name__ } "
238+ f"Successfully sent event { event .id } via { output_name } "
167239 )
168240 else :
169241 logger .warning (
170- f"Failed to send event via { output . __class__ . __name__ } "
242+ f"Failed to send event { event . id } via { output_name } "
171243 )
172244 except Exception as e :
173245 logger .error (
174- f"Error sending event via { output .__class__ .__name__ } : { e } "
246+ f"Error sending event { event .id } via { output_name } : { e } " ,
247+ exc_info = True ,
175248 )
176249
177250 except Exception as e :
178- logger .error (f"Error processing events from { source_name } : { e } " )
251+ logger .error (
252+ f"Error processing events from { source_name } : { e } " , exc_info = True
253+ )
254+ raise # Re-raise to ensure task failure is visible
179255 finally :
180- logger .debug (f"Stopped processing events for source: { source_name } " )
256+ if event_count > 0 :
257+ logger .info (
258+ f"Event processing summary: { source_name } "
259+ f"processed { event_count } total events"
260+ )
261+ else :
262+ logger .debug (
263+ f"Stopped processing events for source: { source_name } "
264+ f"(no events processed)"
265+ )
181266
182267 def setup_signal_handlers (self ) -> None :
183268 """Setup signal handlers for graceful shutdown."""
@@ -211,23 +296,28 @@ async def run(self, config_path: Path) -> None:
211296 """Run the application with the given configuration."""
212297 try :
213298 # Load configuration
299+ logger .debug ("Loading configuration..." )
214300 config = self .load_config (config_path )
215301
216302 # Create plugins
303+ logger .debug ("Creating plugins..." )
217304 self .create_plugins (config )
218305
219306 if not self .sources and not self .outputs :
220307 logger .error ("No plugins configured" )
221308 return
222309
223310 # Setup signal handlers
311+ logger .debug ("Setting up signal handlers..." )
224312 self .setup_signal_handlers ()
225313
226314 # Start plugins
315+ logger .debug ("Starting plugins..." )
227316 await self .start_plugins ()
228- logger .info ("Application started successfully" )
317+ logger .info ("✓ Application started successfully" )
229318
230319 # Process events until shutdown
320+ logger .debug ("Beginning event processing..." )
231321 await self .process_events ()
232322
233323 except KeyboardInterrupt :
0 commit comments