@@ -321,6 +321,10 @@ impl RewardSetEventPayload {
321
321
}
322
322
}
323
323
324
+ #[ cfg( test) ]
325
+ static TEST_EVENT_OBSERVER_SKIP_SEND_PAYLOAD : std:: sync:: Mutex < Option < bool > > =
326
+ std:: sync:: Mutex :: new ( None ) ;
327
+
324
328
impl EventObserver {
325
329
fn init_db ( db_path : & str ) -> Result < Connection , db_error > {
326
330
let conn = Connection :: open ( db_path) ?;
@@ -377,6 +381,16 @@ impl EventObserver {
377
381
}
378
382
379
383
fn process_pending_payloads ( conn : & Connection ) {
384
+ #[ cfg( test) ]
385
+ if TEST_EVENT_OBSERVER_SKIP_SEND_PAYLOAD
386
+ . lock ( )
387
+ . unwrap ( )
388
+ . unwrap_or ( false )
389
+ {
390
+ warn ! ( "Fault injection: skipping retry of payload" ) ;
391
+ return ;
392
+ }
393
+
380
394
let pending_payloads = match Self :: get_pending_payloads ( conn) {
381
395
Ok ( payloads) => payloads,
382
396
Err ( e) => {
@@ -2171,4 +2185,102 @@ mod test {
2171
2185
rx. recv_timeout ( Duration :: from_secs ( 5 ) )
2172
2186
. expect ( "Server did not receive request in time" ) ;
2173
2187
}
2188
+
2189
+ #[ test]
2190
+ fn test_send_payload_with_db_force_restart ( ) {
2191
+ let port = get_random_port ( ) ;
2192
+ let timeout = Duration :: from_secs ( 3 ) ;
2193
+ let dir = tempdir ( ) . unwrap ( ) ;
2194
+ let working_dir = dir. path ( ) . to_path_buf ( ) ;
2195
+
2196
+ // Set up a channel to notify when the server has processed the request
2197
+ let ( tx, rx) = channel ( ) ;
2198
+
2199
+ info ! ( "Starting mock server on port {}" , port) ;
2200
+ // Start a mock server in a separate thread
2201
+ let server = Server :: http ( format ! ( "127.0.0.1:{}" , port) ) . unwrap ( ) ;
2202
+ thread:: spawn ( move || {
2203
+ let mut attempt = 0 ;
2204
+ let mut _request_holder = None ;
2205
+ while let Ok ( mut request) = server. recv ( ) {
2206
+ attempt += 1 ;
2207
+ match attempt {
2208
+ 1 => {
2209
+ debug ! ( "Mock server received request attempt 1" ) ;
2210
+ // Do not reply, forcing the sender to timeout and retry,
2211
+ // but don't drop the request or it will receive a 500 error,
2212
+ _request_holder = Some ( request) ;
2213
+ }
2214
+ 2 => {
2215
+ debug ! ( "Mock server received request attempt 2" ) ;
2216
+
2217
+ // Verify the payload
2218
+ let mut payload = String :: new ( ) ;
2219
+ request. as_reader ( ) . read_to_string ( & mut payload) . unwrap ( ) ;
2220
+ let expected_payload = r#"{"key":"value"}"# ;
2221
+ assert_eq ! ( payload, expected_payload) ;
2222
+
2223
+ // Simulate a successful response on the second attempt
2224
+ let response = Response :: from_string ( "HTTP/1.1 200 OK" ) ;
2225
+ request. respond ( response) . unwrap ( ) ;
2226
+ }
2227
+ 3 => {
2228
+ debug ! ( "Mock server received request attempt 3" ) ;
2229
+
2230
+ // Verify the payload
2231
+ let mut payload = String :: new ( ) ;
2232
+ request. as_reader ( ) . read_to_string ( & mut payload) . unwrap ( ) ;
2233
+ let expected_payload = r#"{"key":"value2"}"# ;
2234
+ assert_eq ! ( payload, expected_payload) ;
2235
+
2236
+ // Simulate a successful response on the second attempt
2237
+ let response = Response :: from_string ( "HTTP/1.1 200 OK" ) ;
2238
+ request. respond ( response) . unwrap ( ) ;
2239
+
2240
+ // When we receive attempt 3 (message 1, re-sent message 1, message 2),
2241
+ // notify the test that the request was processed successfully
2242
+ tx. send ( ( ) ) . unwrap ( ) ;
2243
+ break ;
2244
+ }
2245
+ _ => panic ! ( "Unexpected request attempt" ) ,
2246
+ }
2247
+ }
2248
+ } ) ;
2249
+
2250
+ let observer = EventObserver :: new (
2251
+ Some ( working_dir. clone ( ) ) ,
2252
+ format ! ( "127.0.0.1:{}" , port) ,
2253
+ timeout,
2254
+ ) ;
2255
+
2256
+ let payload = json ! ( { "key" : "value" } ) ;
2257
+ let payload2 = json ! ( { "key" : "value2" } ) ;
2258
+
2259
+ // Disable retrying so that it sends the payload only once
2260
+ // and that payload will be ignored by the test server.
2261
+ TEST_EVENT_OBSERVER_SKIP_SEND_PAYLOAD
2262
+ . lock ( )
2263
+ . unwrap ( )
2264
+ . replace ( true ) ;
2265
+
2266
+ info ! ( "Sending payload 1" ) ;
2267
+
2268
+ // Send the payload
2269
+ observer. send_payload ( & payload, "/test" ) ;
2270
+
2271
+ // Re-enable retrying
2272
+ TEST_EVENT_OBSERVER_SKIP_SEND_PAYLOAD
2273
+ . lock ( )
2274
+ . unwrap ( )
2275
+ . replace ( false ) ;
2276
+
2277
+ info ! ( "Sending payload 2" ) ;
2278
+
2279
+ // Send another payload
2280
+ observer. send_payload ( & payload2, "/test" ) ;
2281
+
2282
+ // Wait for the server to process the requests
2283
+ rx. recv_timeout ( Duration :: from_secs ( 5 ) )
2284
+ . expect ( "Server did not receive request in time" ) ;
2285
+ }
2174
2286
}
0 commit comments