@@ -1070,4 +1070,322 @@ mod tests {
10701070 reactor. stop ( ) ;
10711071 handle. abort ( ) ;
10721072 }
1073+
1074+ // --- Configurable test handler for circuit breaker / error strategy tests ---
1075+
1076+ struct ConfigurableTestHandler {
1077+ id : HandlerId ,
1078+ name : String ,
1079+ filter : EventFilter ,
1080+ call_count : Arc < AtomicU64 > ,
1081+ should_fail : bool ,
1082+ priority : HandlerPriority ,
1083+ error_strategy : ErrorStrategy ,
1084+ }
1085+
1086+ #[ async_trait]
1087+ impl EventHandler for ConfigurableTestHandler {
1088+ fn metadata ( & self ) -> HandlerMetadata {
1089+ HandlerMetadata {
1090+ id : self . id ,
1091+ name : self . name . clone ( ) ,
1092+ filter : EventFilter {
1093+ categories : self . filter . categories . clone ( ) ,
1094+ min_severity : self . filter . min_severity ,
1095+ goal_id : self . filter . goal_id ,
1096+ task_id : self . filter . task_id ,
1097+ payload_types : self . filter . payload_types . clone ( ) ,
1098+ custom_predicate : None ,
1099+ } ,
1100+ priority : self . priority ,
1101+ error_strategy : self . error_strategy ,
1102+ }
1103+ }
1104+
1105+ async fn handle ( & self , _event : & UnifiedEvent , _ctx : & HandlerContext ) -> Result < Reaction , String > {
1106+ self . call_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
1107+ if self . should_fail {
1108+ Err ( "test failure" . to_string ( ) )
1109+ } else {
1110+ Ok ( Reaction :: None )
1111+ }
1112+ }
1113+ }
1114+
1115+ fn make_sequenced_event ( category : EventCategory , seq : u64 ) -> UnifiedEvent {
1116+ let mut event = make_test_event ( category) ;
1117+ event. sequence = SequenceNumber ( seq) ;
1118+ event
1119+ }
1120+
1121+ // --- Handler that records execution order for priority tests ---
1122+
1123+ struct OrderTrackingHandler {
1124+ id : HandlerId ,
1125+ name : String ,
1126+ priority : HandlerPriority ,
1127+ execution_order : Arc < tokio:: sync:: Mutex < Vec < String > > > ,
1128+ }
1129+
1130+ #[ async_trait]
1131+ impl EventHandler for OrderTrackingHandler {
1132+ fn metadata ( & self ) -> HandlerMetadata {
1133+ HandlerMetadata {
1134+ id : self . id ,
1135+ name : self . name . clone ( ) ,
1136+ filter : EventFilter :: default ( ) ,
1137+ priority : self . priority ,
1138+ error_strategy : ErrorStrategy :: LogAndContinue ,
1139+ }
1140+ }
1141+
1142+ async fn handle ( & self , _event : & UnifiedEvent , _ctx : & HandlerContext ) -> Result < Reaction , String > {
1143+ let mut order = self . execution_order . lock ( ) . await ;
1144+ order. push ( self . name . clone ( ) ) ;
1145+ Ok ( Reaction :: None )
1146+ }
1147+ }
1148+
1149+ #[ tokio:: test]
1150+ async fn test_circuit_breaker_trips_after_threshold ( ) {
1151+ let bus = Arc :: new ( EventBus :: new ( EventBusConfig :: default ( ) ) ) ;
1152+ let config = ReactorConfig {
1153+ circuit_breaker_threshold : 2 ,
1154+ circuit_breaker_window_secs : 600 ,
1155+ circuit_breaker_cooldown_secs : 300 ,
1156+ ..Default :: default ( )
1157+ } ;
1158+ let reactor = EventReactor :: new ( bus. clone ( ) , config) ;
1159+
1160+ let call_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
1161+ let handler = Arc :: new ( ConfigurableTestHandler {
1162+ id : HandlerId :: new ( ) ,
1163+ name : "cb-test" . to_string ( ) ,
1164+ filter : EventFilter :: default ( ) ,
1165+ call_count : call_count. clone ( ) ,
1166+ should_fail : true ,
1167+ priority : HandlerPriority :: NORMAL ,
1168+ error_strategy : ErrorStrategy :: CircuitBreak ,
1169+ } ) ;
1170+
1171+ reactor. register ( handler) . await ;
1172+ let handle = reactor. start ( ) ;
1173+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1174+
1175+ // Publish 3 events with unique sequence numbers
1176+ for i in 1 ..=3 {
1177+ bus. publish ( make_sequenced_event ( EventCategory :: Task , 100 + i) ) . await ;
1178+ tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
1179+ }
1180+
1181+ // After 2 failures the CB trips; the 3rd event should be skipped
1182+ let count = call_count. load ( Ordering :: Relaxed ) ;
1183+ assert_eq ! ( count, 2 , "Handler should be called exactly 2 times before CB trips, got {}" , count) ;
1184+
1185+ reactor. stop ( ) ;
1186+ handle. abort ( ) ;
1187+ }
1188+
1189+ #[ tokio:: test]
1190+ async fn test_circuit_breaker_auto_resets_after_cooldown ( ) {
1191+ let bus = Arc :: new ( EventBus :: new ( EventBusConfig :: default ( ) ) ) ;
1192+ let config = ReactorConfig {
1193+ circuit_breaker_threshold : 1 ,
1194+ circuit_breaker_cooldown_secs : 1 ,
1195+ circuit_breaker_window_secs : 600 ,
1196+ ..Default :: default ( )
1197+ } ;
1198+ let reactor = EventReactor :: new ( bus. clone ( ) , config) ;
1199+
1200+ let call_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
1201+ let handler = Arc :: new ( ConfigurableTestHandler {
1202+ id : HandlerId :: new ( ) ,
1203+ name : "cb-reset-test" . to_string ( ) ,
1204+ filter : EventFilter :: default ( ) ,
1205+ call_count : call_count. clone ( ) ,
1206+ should_fail : true ,
1207+ priority : HandlerPriority :: NORMAL ,
1208+ error_strategy : ErrorStrategy :: CircuitBreak ,
1209+ } ) ;
1210+
1211+ reactor. register ( handler) . await ;
1212+ let handle = reactor. start ( ) ;
1213+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1214+
1215+ // First event: handler fails → CB trips (threshold=1)
1216+ bus. publish ( make_sequenced_event ( EventCategory :: Task , 200 ) ) . await ;
1217+ tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
1218+ assert_eq ! ( call_count. load( Ordering :: Relaxed ) , 1 ) ;
1219+
1220+ // Second event while CB is tripped: handler should NOT be called
1221+ bus. publish ( make_sequenced_event ( EventCategory :: Task , 201 ) ) . await ;
1222+ tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
1223+ assert_eq ! ( call_count. load( Ordering :: Relaxed ) , 1 , "Handler should not be called while CB is tripped" ) ;
1224+
1225+ // Wait for cooldown to expire (1s cooldown + margin)
1226+ tokio:: time:: sleep ( Duration :: from_millis ( 1200 ) ) . await ;
1227+
1228+ // Third event after cooldown: CB auto-resets → handler called again
1229+ bus. publish ( make_sequenced_event ( EventCategory :: Task , 202 ) ) . await ;
1230+ tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
1231+ assert_eq ! ( call_count. load( Ordering :: Relaxed ) , 2 , "Handler should be called again after CB cooldown" ) ;
1232+
1233+ reactor. stop ( ) ;
1234+ handle. abort ( ) ;
1235+ }
1236+
1237+ #[ tokio:: test]
1238+ async fn test_dedup_skips_duplicate_sequence ( ) {
1239+ // Note: EventBus::publish assigns monotonically increasing sequence numbers,
1240+ // so duplicate sequences cannot occur through normal publish. The reactor's
1241+ // VecDeque-based dedup protects against duplicates during lag recovery
1242+ // (broadcast channel overflow). We test this by using an InMemoryEventStore:
1243+ // pre-populate the store with events, publish the same events through the bus
1244+ // (which assigns them new sequences), then trigger replay_missed_events()
1245+ // which uses the store's original sequences. The watermark-based replay
1246+ // skips events already processed.
1247+ use crate :: services:: event_store:: InMemoryEventStore ;
1248+
1249+ let store = Arc :: new ( InMemoryEventStore :: new ( ) ) ;
1250+ let bus = Arc :: new ( EventBus :: new ( EventBusConfig :: default ( ) ) ) ;
1251+ let config = ReactorConfig :: default ( ) ;
1252+ let reactor = EventReactor :: new ( bus. clone ( ) , config) . with_store ( store. clone ( ) ) ;
1253+
1254+ let call_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
1255+ let handler = Arc :: new ( TestHandler {
1256+ id : HandlerId :: new ( ) ,
1257+ name : "dedup-test" . to_string ( ) ,
1258+ filter : EventFilter {
1259+ categories : vec ! [ EventCategory :: Task ] ,
1260+ ..Default :: default ( )
1261+ } ,
1262+ call_count : call_count. clone ( ) ,
1263+ should_fail : false ,
1264+ } ) ;
1265+
1266+ reactor. register ( handler) . await ;
1267+
1268+ // Pre-populate the store with an event at sequence 10
1269+ let mut stored_event = make_test_event ( EventCategory :: Task ) ;
1270+ stored_event. sequence = SequenceNumber ( 10 ) ;
1271+ store. append ( & stored_event) . await . unwrap ( ) ;
1272+
1273+ // Set watermark to 0 so replay would try to replay seq=10
1274+ store. set_watermark ( "dedup-test" , SequenceNumber ( 0 ) ) . await . unwrap ( ) ;
1275+
1276+ let handle = reactor. start ( ) ;
1277+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1278+
1279+ // Publish an event normally — EventBus assigns it seq=0
1280+ bus. publish ( make_test_event ( EventCategory :: Task ) ) . await ;
1281+ tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
1282+ assert_eq ! ( call_count. load( Ordering :: Relaxed ) , 1 , "First event should be processed" ) ;
1283+
1284+ // Now call replay_missed_events — it replays seq=10 from store
1285+ // The handler should be called because seq=10 is not in the dedup set
1286+ // and watermark is 0 (below seq=10)
1287+ let replayed = reactor. replay_missed_events ( ) . await . unwrap ( ) ;
1288+ assert ! ( replayed > 0 , "Should have replayed at least one event" ) ;
1289+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1290+
1291+ // Handler should have been called twice: once from publish, once from replay
1292+ let count = call_count. load ( Ordering :: Relaxed ) ;
1293+ assert_eq ! ( count, 2 , "Handler should be called for both published and replayed events, got {}" , count) ;
1294+
1295+ reactor. stop ( ) ;
1296+ handle. abort ( ) ;
1297+ }
1298+
1299+ #[ tokio:: test]
1300+ async fn test_handlers_execute_in_priority_order ( ) {
1301+ let bus = Arc :: new ( EventBus :: new ( EventBusConfig :: default ( ) ) ) ;
1302+ let reactor = EventReactor :: new ( bus. clone ( ) , ReactorConfig :: default ( ) ) ;
1303+
1304+ let execution_order = Arc :: new ( tokio:: sync:: Mutex :: new ( Vec :: new ( ) ) ) ;
1305+
1306+ // Register handlers in reverse priority order (LOW first) to verify sorting
1307+ let low_handler = Arc :: new ( OrderTrackingHandler {
1308+ id : HandlerId :: new ( ) ,
1309+ name : "low" . to_string ( ) ,
1310+ priority : HandlerPriority :: LOW ,
1311+ execution_order : execution_order. clone ( ) ,
1312+ } ) ;
1313+ let normal_handler = Arc :: new ( OrderTrackingHandler {
1314+ id : HandlerId :: new ( ) ,
1315+ name : "normal" . to_string ( ) ,
1316+ priority : HandlerPriority :: NORMAL ,
1317+ execution_order : execution_order. clone ( ) ,
1318+ } ) ;
1319+ let system_handler = Arc :: new ( OrderTrackingHandler {
1320+ id : HandlerId :: new ( ) ,
1321+ name : "system" . to_string ( ) ,
1322+ priority : HandlerPriority :: SYSTEM ,
1323+ execution_order : execution_order. clone ( ) ,
1324+ } ) ;
1325+
1326+ // Register in deliberately wrong order
1327+ reactor. register ( low_handler) . await ;
1328+ reactor. register ( normal_handler) . await ;
1329+ reactor. register ( system_handler) . await ;
1330+
1331+ let handle = reactor. start ( ) ;
1332+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1333+
1334+ bus. publish ( make_sequenced_event ( EventCategory :: Task , 300 ) ) . await ;
1335+ tokio:: time:: sleep ( Duration :: from_millis ( 200 ) ) . await ;
1336+
1337+ let order = execution_order. lock ( ) . await ;
1338+ assert_eq ! ( order. len( ) , 3 , "All 3 handlers should have been called, got {}" , order. len( ) ) ;
1339+ assert_eq ! ( order[ 0 ] , "system" , "SYSTEM priority should execute first" ) ;
1340+ assert_eq ! ( order[ 1 ] , "normal" , "NORMAL priority should execute second" ) ;
1341+ assert_eq ! ( order[ 2 ] , "low" , "LOW priority should execute last" ) ;
1342+
1343+ reactor. stop ( ) ;
1344+ handle. abort ( ) ;
1345+ }
1346+
1347+ #[ tokio:: test]
1348+ async fn test_log_and_continue_does_not_trip_circuit_breaker ( ) {
1349+ let bus = Arc :: new ( EventBus :: new ( EventBusConfig :: default ( ) ) ) ;
1350+ let config = ReactorConfig {
1351+ circuit_breaker_threshold : 1 ,
1352+ circuit_breaker_window_secs : 600 ,
1353+ circuit_breaker_cooldown_secs : 300 ,
1354+ ..Default :: default ( )
1355+ } ;
1356+ let reactor = EventReactor :: new ( bus. clone ( ) , config) ;
1357+
1358+ let call_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
1359+ let handler = Arc :: new ( ConfigurableTestHandler {
1360+ id : HandlerId :: new ( ) ,
1361+ name : "log-continue-test" . to_string ( ) ,
1362+ // Use Task filter to avoid matching HandlerError reaction events
1363+ // (which have category Orchestrator and would cause cascading calls)
1364+ filter : EventFilter {
1365+ categories : vec ! [ EventCategory :: Task ] ,
1366+ ..Default :: default ( )
1367+ } ,
1368+ call_count : call_count. clone ( ) ,
1369+ should_fail : true ,
1370+ priority : HandlerPriority :: NORMAL ,
1371+ error_strategy : ErrorStrategy :: LogAndContinue ,
1372+ } ) ;
1373+
1374+ reactor. register ( handler) . await ;
1375+ let handle = reactor. start ( ) ;
1376+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1377+
1378+ // Publish 3 events — all should reach the handler despite failures
1379+ // because LogAndContinue doesn't trip the circuit breaker
1380+ for i in 1 ..=3 {
1381+ bus. publish ( make_sequenced_event ( EventCategory :: Task , 400 + i) ) . await ;
1382+ tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
1383+ }
1384+
1385+ let count = call_count. load ( Ordering :: Relaxed ) ;
1386+ assert_eq ! ( count, 3 , "LogAndContinue handler should be called all 3 times, got {}" , count) ;
1387+
1388+ reactor. stop ( ) ;
1389+ handle. abort ( ) ;
1390+ }
10731391}
0 commit comments