@@ -85,7 +85,10 @@ groups() ->
8585 lazy_queue_v2
8686 ]},
8787 {classic_queue_regressions , [], [
88- reg_v1_full_recover_only_journal
88+ reg_v1_full_recover_only_journal ,
89+ reg_v1_no_del_jif ,
90+ reg_v1_no_del_idx ,
91+ reg_v1_no_del_idx_unclean
8992 ]}
9093 ].
9194
@@ -1191,6 +1194,244 @@ do_reg_v1_full_recover_only_journal(Config) ->
11911194
11921195 Res15 = cmd_restart_vhost_clean (St14 ),
11931196 true = postcondition (St14 , {call , undefined , cmd_restart_vhost_clean , [St14 ]}, Res15 ),
1194- _ = next_state (St14 , Res15 , {call , undefined , cmd_restart_vhost_clean , [St14 ]}),
1197+ St15 = next_state (St14 , Res15 , {call , undefined , cmd_restart_vhost_clean , [St14 ]}),
1198+
1199+ cmd_teardown_queue (St15 ),
11951200
11961201 true .
1202+
1203+ % % The following reg_v1_no_del_* cases test when a classic queue has a
1204+ % % published message before an upgrade to 3.10. In that case there is
1205+ % % no delivery marker in the v1 queue index.
1206+
1207+ % % After upgrade to 3.10 there is a published message in the journal file.
1208+ % % Consuming and acknowledging the message should work fine.
1209+ reg_v1_no_del_jif (Config ) ->
1210+ try
1211+ true = rabbit_ct_broker_helpers :rpc (
1212+ Config , 0 , ? MODULE , do_reg_v1_no_del_jif , [Config ])
1213+ catch exit :{exception , Reason } ->
1214+ exit (Reason )
1215+ end .
1216+
1217+ do_reg_v1_no_del_jif (Config ) ->
1218+ St0 = # cq {name = prop_classic_queue_v1 , mode = lazy , version = 1 ,
1219+ config = minimal_config (Config )},
1220+
1221+ Res1 = cmd_setup_queue (St0 ),
1222+ St3 = St0 # cq {amq = Res1 },
1223+
1224+ {St4 , Ch } = cmd (cmd_channel_open , St3 , []),
1225+
1226+ % % Simulate pre-3.10.0 behaviour by making deliver a noop
1227+ ok = meck :new (rabbit_queue_index , [passthrough ]),
1228+ ok = meck :expect (rabbit_queue_index , deliver , fun (_ , State ) -> State end ),
1229+
1230+ {St5 , _Res5 } = cmd (cmd_channel_publish , St4 , [Ch , 4 , _Persistent = 2 , _NotMandatory = false , _NoExpiration = undefined ]),
1231+
1232+ % % Enforce syncing journal to disk
1233+ % % (Not strictly necessary as vhost restart also triggers a sync)
1234+ % % At this point there should be a publish entry in the journal and no segment files
1235+ rabbit_amqqueue :pid_of (St5 # cq .amq ) ! timeout ,
1236+
1237+ {SyncTime , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , sync , '_' , 1000 ) end ),
1238+ ct :pal (" wait for sync took ~p ms" , [SyncTime div 1000 ]),
1239+
1240+ % % Simulate RabbitMQ version upgrade by a clean vhost restart
1241+ % % (also reset delivery to normal operation)
1242+ ok = meck :delete (rabbit_queue_index , deliver , 2 ),
1243+ {St10 , _ } = cmd (cmd_restart_vhost_clean , St5 , []),
1244+
1245+ meck :reset (rabbit_queue_index ),
1246+
1247+ % % Consume the message and acknowledge it
1248+ % % The queue index should not crash when finding a pub+ack but no_del in the journal
1249+ % % (It used to crash in `action_to_entry/3' with a case_clause)
1250+ {St6 , _Tag } = cmd (cmd_channel_consume , St10 , [Ch ]),
1251+ receive SomeMsg -> self () ! SomeMsg
1252+ after 5000 -> ct :fail (no_message_consumed )
1253+ end ,
1254+ {St7 , _Msg = # amqp_msg {}} = cmd (cmd_channel_receive_and_ack , St6 , [Ch ]),
1255+
1256+ % % enforce syncing journal to disk
1257+ rabbit_amqqueue :pid_of (St7 # cq .amq ) ! timeout ,
1258+
1259+ {SyncTime2 , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , sync , '_' , 1000 ) end ),
1260+ ct :pal (" wait for sync took ~p ms" , [SyncTime2 div 1000 ]),
1261+
1262+ validate_and_teaddown (St7 ).
1263+
1264+ % % After upgrade to 3.10 there is a published message in a segment file.
1265+ % % Consuming and acknowledging the message inserts an ack entry in the journal file.
1266+ % % A subsequent restart (of the queue/vhost/node) should work fine.
1267+ reg_v1_no_del_idx (Config ) ->
1268+ try
1269+ true = rabbit_ct_broker_helpers :rpc (
1270+ Config , 0 , ? MODULE , do_reg_v1_no_del_idx , [Config ])
1271+ catch exit :{exception , Reason } ->
1272+ exit (Reason )
1273+ end .
1274+
1275+ do_reg_v1_no_del_idx (Config ) ->
1276+ St0 = # cq {name = prop_classic_queue_v1 , mode = lazy , version = 1 ,
1277+ config = minimal_config (Config )},
1278+
1279+ Res1 = cmd_setup_queue (St0 ),
1280+ St3 = St0 # cq {amq = Res1 },
1281+
1282+ {St4 , Ch } = cmd (cmd_channel_open , St3 , []),
1283+
1284+ % % Simulate pre-3.10.0 behaviour by making deliver a noop
1285+ ok = meck :new (rabbit_queue_index , [passthrough ]),
1286+ ok = meck :expect (rabbit_queue_index , deliver , fun (_ , State ) -> State end ),
1287+
1288+ ok = meck :new (rabbit_variable_queue , [passthrough ]),
1289+
1290+ {St5 , _Res5 } = cmd (cmd_channel_publish , St4 , [Ch , 4 , _Persistent = 2 , _NotMandatory = false , _NoExpiration = undefined ]),
1291+
1292+ % % Wait for the queue process to get hibernated
1293+ % % handle_pre_hibernate syncs and flushes the journal
1294+ % % At this point there should be a publish entry in the segment file and an empty journal
1295+ {Time , ok } = timer :tc (fun () -> meck :wait (rabbit_variable_queue , handle_pre_hibernate , '_' , 10000 ) end ),
1296+ ct :pal (" wait for hibernate took ~p ms" , [Time div 1000 ]),
1297+ ok = meck :unload (rabbit_variable_queue ),
1298+
1299+ % % Simulate RabbitMQ version upgrade by a clean vhost restart
1300+ % % (also reset delivery to normal operation)
1301+ ok = meck :delete (rabbit_queue_index , deliver , 2 ),
1302+ {St10 , _ } = cmd (cmd_restart_vhost_clean , St5 , []),
1303+
1304+ % % Consume the message and acknowledge it
1305+ {St6 , _Tag } = cmd (cmd_channel_consume , St10 , [Ch ]),
1306+ receive SomeMsg -> self () ! SomeMsg
1307+ after 5000 -> ct :fail (no_message_consumed )
1308+ end ,
1309+ {St7 , _Msg = # amqp_msg {}} = cmd (cmd_channel_receive_and_ack , St6 , [Ch ]),
1310+
1311+ meck :reset (rabbit_queue_index ),
1312+
1313+ % % enforce syncing journal to disk
1314+ % % At this point there should be a publish entry in the segment file and an ack in the journal
1315+ rabbit_amqqueue :pid_of (St7 # cq .amq ) ! timeout ,
1316+ {SyncTime , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , sync , '_' , 1000 ) end ),
1317+ ct :pal (" wait for sync took ~p ms" , [SyncTime div 1000 ]),
1318+
1319+ meck :reset (rabbit_queue_index ),
1320+
1321+ % % Another clean vhost restart
1322+ % % The queue index should not crash when finding a pub in a
1323+ % % segment, an ack in the journal, but no_del
1324+ % % (It used to crash in `segment_plus_journal1/2' with a function_clause)
1325+ catch cmd (cmd_restart_vhost_clean , St7 , []),
1326+
1327+ {ReadTime , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , read , '_' , 1000 ) end ),
1328+ ct :pal (" wait for queue read took ~p ms" , [ReadTime div 1000 ]),
1329+
1330+ validate_and_teaddown (St7 ).
1331+
1332+ % % After upgrade to 3.10 there is a published message in a segment file.
1333+ % % Consuming and acknowledging the message inserts an ack entry in the journal file.
1334+ % % The recovery after a subsequent unclean shutdown (of the queue/vhost/node) should work fine.
1335+ reg_v1_no_del_idx_unclean (Config ) ->
1336+ try
1337+ true = rabbit_ct_broker_helpers :rpc (
1338+ Config , 0 , ? MODULE , do_reg_v1_no_del_idx_unclean , [Config ])
1339+ catch exit :{exception , Reason } ->
1340+ exit (Reason )
1341+ end .
1342+
1343+ do_reg_v1_no_del_idx_unclean (Config ) ->
1344+ St0 = # cq {name = prop_classic_queue_v1 , mode = lazy , version = 1 ,
1345+ config = minimal_config (Config )},
1346+
1347+ Res1 = cmd_setup_queue (St0 ),
1348+ St3 = St0 # cq {amq = Res1 },
1349+
1350+ {St4 , Ch } = cmd (cmd_channel_open , St3 , []),
1351+
1352+ % % Simulate pre-3.10.0 behaviour by making deliver a noop
1353+ ok = meck :new (rabbit_queue_index , [passthrough ]),
1354+ ok = meck :expect (rabbit_queue_index , deliver , fun (_ , State ) -> State end ),
1355+
1356+ ok = meck :new (rabbit_variable_queue , [passthrough ]),
1357+
1358+ {St5 , _Res5 } = cmd (cmd_channel_publish , St4 , [Ch , 4 , _Persistent = 2 , _NotMandatory = false , _NoExpiration = undefined ]),
1359+
1360+ % % Wait for the queue process to get hibernated
1361+ % % handle_pre_hibernate syncs and flushes the journal
1362+ % % At this point there should be a publish entry in the segment file and an empty journal
1363+ {Time , ok } = timer :tc (fun () -> meck :wait (rabbit_variable_queue , handle_pre_hibernate , '_' , 10000 ) end ),
1364+ ct :pal (" wait for hibernate took ~p ms" , [Time div 1000 ]),
1365+ ok = meck :unload (rabbit_variable_queue ),
1366+
1367+ % % Simulate RabbitMQ version upgrade by a clean vhost restart
1368+ % % (also reset delivery to normal operation)
1369+ ok = meck :delete (rabbit_queue_index , deliver , 2 ),
1370+ {St10 , _ } = cmd (cmd_restart_vhost_clean , St5 , []),
1371+
1372+ % % Consume the message and acknowledge it
1373+ {St6 , _Tag } = cmd (cmd_channel_consume , St10 , [Ch ]),
1374+ receive SomeMsg -> self () ! SomeMsg
1375+ after 5000 -> ct :fail (no_message_consumed )
1376+ end ,
1377+ meck :reset (rabbit_queue_index ),
1378+ {St7 , _Msg = # amqp_msg {}} = cmd (cmd_channel_receive_and_ack , St6 , [Ch ]),
1379+
1380+ % % (need to ensure that the queue processed the ack before triggering the sync)
1381+ {AckTime , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , ack , '_' , 1000 ) end ),
1382+ ct :pal (" wait for ack took ~p ms" , [AckTime div 1000 ]),
1383+
1384+ % % enforce syncing journal to disk
1385+ % % At this point there should be a publish entry in the segment file and an ack in the journal
1386+ rabbit_amqqueue :pid_of (St7 # cq .amq ) ! timeout ,
1387+ {SyncTime , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , sync , '_' , 1000 ) end ),
1388+ ct :pal (" wait for sync took ~p ms" , [SyncTime div 1000 ]),
1389+
1390+ meck :reset (rabbit_queue_index ),
1391+
1392+ % % Recovery after unclean queue shutdown
1393+ % % The queue index should not crash when finding a pub in a
1394+ % % segment, an ack in the journal, but no_del
1395+ % % (It used to crash in `journal_minus_segment1/2' with a function_clause)
1396+ {St20 , _ } = cmd (cmd_restart_queue_dirty , St7 , []),
1397+
1398+ {RecoverTime , ok } = timer :tc (fun () -> meck :wait (rabbit_queue_index , recover , '_' , 1000 ) end ),
1399+ ct :pal (" wait for queue recover took ~p ms" , [RecoverTime div 1000 ]),
1400+
1401+ validate_and_teaddown (St20 ).
1402+
1403+ cmd (CmdName , StIn , ExtraArgs ) ->
1404+ Res0 = apply (? MODULE , CmdName , [StIn | ExtraArgs ]),
1405+ true = postcondition (StIn , {call , undefined , CmdName , [StIn | ExtraArgs ]}, Res0 ),
1406+ StOut = next_state (StIn , Res0 , {call , undefined , CmdName , [StIn | ExtraArgs ]}),
1407+ {StOut , Res0 }.
1408+
1409+ validate_and_teaddown (St ) ->
1410+ try
1411+ case meck :validate (rabbit_queue_index ) of
1412+ true ->
1413+ true ;
1414+ false ->
1415+ FailedCalls =
1416+ [Hist || Hist = {_CallerPid , _MFA , _Class , _Reason , _ST }
1417+ <- meck :history (rabbit_queue_index )],
1418+ ct :pal (" Failed call(s) to rabbit_queue_index:~n~p " , [FailedCalls ]),
1419+
1420+ {_ , _ , _ , _ , [{_M , F , _A , _Loc }|_ ]} = hd (FailedCalls ),
1421+ ct :fail ({queue_index_crashed , F })
1422+ end
1423+ after
1424+ ok = meck :unload (rabbit_queue_index ),
1425+ safe_teardown_queue (St )
1426+ end .
1427+
1428+ safe_teardown_queue (St ) ->
1429+ try cmd_teardown_queue (St )
1430+ catch _ :_ ->
1431+ % % It is possible that asking a queue process in cyclic
1432+ % % crashing to stop fails.
1433+ VHostDir = rabbit_vhost :msg_store_dir_path (<<" /" >>),
1434+ [ok = file :delete (QIFile )
1435+ || QIFile <- filelib :wildcard (filename :join (VHostDir , " queues/*/*" ))],
1436+ cmd_teardown_queue (St )
1437+ end .
0 commit comments