Skip to content

Commit a8deb75

Browse files
Compaction: more logging, a test
1 parent 4b281fc commit a8deb75

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

src/rabbit_msg_store.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,6 +2068,8 @@ do_combine_files(SourceSummary, DestinationSummary,
20682068
{#file_summary.file_size, TotalValidData}]),
20692069

20702070
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
2071+
rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes",
2072+
[Source, Destination, Reclaimed]),
20712073
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
20722074
safe_file_delete_fun(Source, Dir, FileHandlesEts).
20732075

test/queue_parallel_SUITE.erl

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ groups() ->
6060
[
6161
{parallel_tests, [],
6262
[
63-
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
64-
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
63+
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
64+
trigger_message_store_compaction]},
65+
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
66+
trigger_message_store_compaction]},
6567
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
6668
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
6769
{quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
@@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) ->
327329
rabbit_ct_client_helpers:close_channel(Ch),
328330
ok.
329331

332+
trigger_message_store_compaction(Config) ->
333+
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
334+
QName = ?config(queue_name, Config),
335+
declare_queue(Ch, Config, QName),
336+
337+
N = 12000,
338+
[publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)],
339+
wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]),
340+
341+
AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N),
342+
ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags),
343+
344+
[amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag,
345+
multiple = false}) || Tag <- ToAck],
346+
347+
%% give compaction a moment to start in and finish
348+
timer:sleep(5000),
349+
amqp_channel:cast(Ch, #'queue.purge'{queue = QName}),
350+
rabbit_ct_client_helpers:close_channel(Ch),
351+
ok.
352+
330353
subscribe_and_requeue_multiple_nack(Config) ->
331354
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
332355
QName = ?config(queue_name, Config),

0 commit comments

Comments
 (0)