Skip to content

Commit c8572d1

Browse files
committed
MB-67857: Avoid menelaus_cbauth_worker message queue growing
Because we synchronously notify cbauth instances of creds info, and the creds info is generated outside the worker process, we can end up with a queue of updates filling up the menelaus_cbauth_worker process. The obvious solution would be to simply generate the creds info within the worker process, such that we could instead flush all notify messages each time we process one, avoiding the message queue backing up. The issue with this is that it would mean duplicating the management of creds info in each worker process, which is currently handled in just menelaus_cbauth. Instead, it is simpler just to flush the notify messages when we process one, while retaining the info from the last message. This avoids the message queue backing up, ensuring that each time we notify cbauth, it is with the latest info that the process received. Change-Id: I3c3b15dd7b81f005524a24d0614895a6e8fcb79f Reviewed-on: https://review.couchbase.org/c/ns_server/+/231636 Well-Formed: Build Bot <[email protected]> Reviewed-by: Timofey Barmin <[email protected]> Well-Formed: Restriction Checker Tested-by: Peter Searby <[email protected]>
1 parent 4e19e68 commit c8572d1

File tree

2 files changed

+68
-6
lines changed

2 files changed

+68
-6
lines changed

src/menelaus_cbauth.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,49 @@ cbauth_stats_t() ->
771771
end),
772772
[] = stats().
773773

774+
cbauth_many_notify_t() ->
775+
%% Wait for initial notify to be handled
776+
meck:wait(1, menelaus_cbauth_worker, notify, ['_', '_'],
777+
?LONG_TIMEOUT),
778+
meck:wait(1, json_rpc_connection, perform_call,
779+
['_', "AuthCacheSvc.UpdateDB", '_', '_'], ?LONG_TIMEOUT),
780+
%% Ensure that the next perform_call doesn't immediately return
781+
meck:expect(json_rpc_connection, perform_call,
782+
fun (_, "AuthCacheSvc.UpdateDB", _, _) ->
783+
receive return_from_call -> {ok, true}
784+
after ?LONG_TIMEOUT -> error(timeout) end;
785+
(_Label, _Call, _EJsonArg, _Opts) ->
786+
{ok, true}
787+
end),
788+
%% Send three updates, ensuring three notify calls with different values:
789+
%% - The first is to get the worker stuck in the above meck:expect
790+
%% - The second is to provide a value, which we want to become stale
791+
%% - The third will be the new value, which should get used, instead of the
792+
%% stale value, after the first update is processed
793+
lists:foreach(
794+
fun (N) ->
795+
fake_ns_config:update_snapshot(rest, [{port, N}]),
796+
meck:wait(N, menelaus_cbauth_worker, notify, ['_', '_'],
797+
?LONG_TIMEOUT)
798+
end, [2, 3, 4]),
799+
%% Extract value from next update
800+
meck:expect(json_rpc_connection, perform_call,
801+
fun (_, "AuthCacheSvc.UpdateDB", {Info}, _) ->
802+
[{Node}] = proplists:get_value(nodes, Info, []),
803+
[undefined, Port] = proplists:get_value(ports, Node),
804+
%% Expect the last value, not the stale value
805+
?assertEqual(4, Port),
806+
{ok, true};
807+
(_Label, _Call, _EJsonArg, _Opts) ->
808+
{ok, true}
809+
end),
810+
%% Release worker
811+
WorkerPid = whereis(list_to_atom("menelaus_cbauth_worker-" ++ ?LABEL)),
812+
WorkerPid ! return_from_call,
813+
%% Wait for both the stuck and final call to return
814+
meck:wait(3, json_rpc_connection, perform_call,
815+
['_', "AuthCacheSvc.UpdateDB", '_', '_'], ?LONG_TIMEOUT).
816+
774817
trigger_notification(ns_node_disco_events) ->
775818
%% To avoid needing to trick ns_node_disco into recognising fake nodes, just
776819
%% make the node list different by removing the only node.
@@ -823,6 +866,7 @@ cbauth_test_() ->
823866
{foreach, fun setup_t/0, fun teardown_t/1,
824867
[{"cbauth init test", fun cbauth_init_t/0},
825868
{"cbauth sync test", fun cbauth_sync_t/0},
826-
{"cbauth stats test", fun cbauth_stats_t/0}
869+
{"cbauth stats test", fun cbauth_stats_t/0},
870+
{"cbauth many notify test", fun cbauth_many_notify_t/0}
827871
| cbauth_notify_tests()]}.
828872
-endif.

src/menelaus_cbauth_worker.erl

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,18 @@ handle_info(heartbeat, State = #state{label = Label, connection = Pid,
8282
ok ->
8383
{noreply, State}
8484
end;
85-
handle_info({notify, InfoHidden}, State = #state{label = Label,
86-
connection = Pid,
87-
version = Version}) ->
88-
Info = ?UNHIDE(InfoHidden),
85+
handle_info({notify, PotentiallyStaleInfo},
86+
State = #state{label = Label, connection = Pid,
87+
version = Version}) ->
88+
LatestInfoHidden = receive_latest_notify(PotentiallyStaleInfo),
89+
LatestInfo = ?UNHIDE(LatestInfoHidden),
8990
Method = case Version of
9091
internal ->
9192
"AuthCacheSvc.UpdateDB";
9293
_ ->
9394
"AuthCacheSvc.UpdateDBExt"
9495
end,
95-
case invoke_no_return_method(Label, Method, Pid, Info) of
96+
case invoke_no_return_method(Label, Method, Pid, LatestInfo) of
9697
error ->
9798
terminate_jsonrpc_connection(Label, Pid),
9899
misc:wait_for_process(Pid, infinity),
@@ -136,6 +137,23 @@ send_heartbeat(Label, Pid) ->
136137
?log_debug("Skip heartbeat for label ~p", [Label])
137138
end.
138139

140+
receive_latest_notify(Info) ->
141+
receive_latest_notify(Info, 0).
142+
143+
receive_latest_notify(Info, N) ->
144+
%% Handle any remaining notify messages, to ensure that the mailbox is
145+
%% cleared of redundant messages
146+
receive
147+
{notify, NewInfo} -> receive_latest_notify(NewInfo, N + 1)
148+
after
149+
0 ->
150+
case N of
151+
0 -> ok;
152+
_ -> ?log_debug("Skipped ~p old notify entries", [N])
153+
end,
154+
Info
155+
end.
156+
139157
invoke_no_return_method(Label, Method, Pid, Info) ->
140158
case perform_call(Label, Method, Pid, {Info}, false) of
141159
{ok, Res} when Res =:= true orelse Res =:= null ->

0 commit comments

Comments
 (0)