Skip to content

Commit ad4b84e

Browse files
committed
Merge remote-tracking branch 'couchbase/trinity' into phoenix
* couchbase/trinity: MB-67857: Avoid menelaus_cbauth_worker message queue growing MB-67857: Basic unit tests for menelaus_cbauth Change-Id: If61a27dab6211af0f5bb4c6b4be4ab7ada68ef6f
2 parents 5bc8eed + c8572d1 commit ad4b84e

File tree

4 files changed

+332
-14
lines changed

4 files changed

+332
-14
lines changed

apps/ns_server/src/menelaus_cbauth.erl

Lines changed: 232 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@
3737
-include("ns_common.hrl").
3838
-include_lib("ns_common/include/cut.hrl").
3939

40+
-ifdef(TEST).
41+
-include_lib("eunit/include/eunit.hrl").
42+
-endif.
43+
4044
-define(VERSION_1, "v1").
45+
-define(WORKER_SYNC_TIMEOUT, 5000).
4146

4247
handle_rpc_connect(?VERSION_1, Label, Req) ->
4348
case ns_config_auth:is_system_provisioned() of
@@ -76,6 +81,9 @@ sync() ->
7681
sync(node()).
7782

7883
sync(Node) ->
84+
sync(Node, ?WORKER_SYNC_TIMEOUT).
85+
86+
sync(Node, WorkerSyncTimeout) ->
7987
InternalConnections =
8088
gen_server:call({?MODULE, Node}, get_internal_connections, infinity),
8189
%% If the above call succeeds, but a worker exits before we make the below
@@ -86,7 +94,7 @@ sync(Node) ->
8694
%% caller
8795
misc:parallel_map(
8896
fun (Pid) ->
89-
try menelaus_cbauth_worker:sync(Pid)
97+
try menelaus_cbauth_worker:sync(Pid, WorkerSyncTimeout)
9098
catch exit:{noproc, _} ->
9199
?log_error("Process ~p no longer exists", [Pid])
92100
end
@@ -643,3 +651,226 @@ service_to_label(xdcr) ->
643651
"goxdcr-cbauth";
644652
service_to_label(Service) ->
645653
atom_to_list(Service) ++ "-cbauth".
654+
655+
-ifdef(TEST).
656+
-define(SERVICE, "<service>").
657+
-define(LABEL, "<service>-cbauth").
658+
-define(HEARTBEAT_TIME_S, 1).
659+
%% Short timeout for waiting to confirm that something doesn't immediately
660+
%% happen, without waiting so long that it significantly increases test duration
661+
-define(SHORT_TIMEOUT, 500).
662+
%% Timeout for things that should eventually happen, but not necessarily
663+
%% immediately
664+
-define(LONG_TIMEOUT, 60_000).
665+
666+
setup_t() ->
667+
meck:expect(config_profile, get,
668+
fun () ->
669+
?DEFAULT_EMPTY_PROFILE_FOR_TESTS
670+
end),
671+
672+
fake_ns_config:setup(),
673+
fake_chronicle_kv:setup(),
674+
%% Test setups return a map of pids for later shutdown in the teardown
675+
PidMap = mock_helpers:setup_mocks([json_rpc_events,
676+
ns_node_disco_events,
677+
user_storage_events,
678+
ssl_service_events,
679+
json_rpc_connection_sup,
680+
ns_ssl_services_setup,
681+
menelaus_users,
682+
ns_secrets,
683+
testconditions]),
684+
685+
%% Set config values for a few keys, since these are needed for greater
686+
%% coverage, and to avoid errors
687+
fake_chronicle_kv:update_snapshot(#{nodes_wanted => [node()],
688+
bucket_names => []}),
689+
fake_ns_config:update_snapshot([{rest, [{port, 8091}]},
690+
{rest_creds, placeholder},
691+
{memcached, [{admin_user, "user"},
692+
{admin_pass, "pass"}]}]),
693+
694+
meck:new(menelaus_cbauth_worker, [passthrough]),
695+
696+
{ok, Pid} = menelaus_cbauth:start_link(),
697+
start_fake_json_rpc_connection(?LABEL),
698+
PidMap#{?MODULE => Pid}.
699+
700+
teardown_t(PidMap) ->
701+
Name = list_to_atom("json_rpc_connection-" ++ ?LABEL),
702+
erlang:unregister(Name),
703+
mock_helpers:shutdown_processes(PidMap),
704+
fake_chronicle_kv:teardown(),
705+
fake_ns_config:teardown(),
706+
meck:unload().
707+
708+
start_fake_json_rpc_connection(Label) ->
709+
Name = list_to_atom("json_rpc_connection-" ++ Label),
710+
Pid = self(),
711+
true = erlang:register(Name, Pid),
712+
meck:expect(json_rpc_connection, perform_call,
713+
fun (_Label, _Call, _EJsonArg, _Opts) ->
714+
{ok, true}
715+
end),
716+
gen_event:notify(json_rpc_events,
717+
{started, Label, [internal,
718+
{heartbeat, ?HEARTBEAT_TIME_S}],
719+
self()}).
720+
721+
cbauth_init_t() ->
722+
%% UpdateDB gets called once almost immediately
723+
meck:wait(json_rpc_connection, perform_call,
724+
['_', "AuthCacheSvc.UpdateDB", '_', '_'], ?LONG_TIMEOUT),
725+
%% Heartbeat gets called once
726+
meck:wait(json_rpc_connection, perform_call,
727+
['_', "AuthCacheSvc.Heartbeat", '_', '_'],
728+
2_000 * ?HEARTBEAT_TIME_S),
729+
%% Heartbeat gets called again
730+
meck:reset(json_rpc_connection),
731+
meck:wait(json_rpc_connection, perform_call,
732+
['_', "AuthCacheSvc.Heartbeat", '_', '_'],
733+
2_000 * ?HEARTBEAT_TIME_S),
734+
%% UpdateDB isn't called immediately again
735+
?assertError(timeout,
736+
meck:wait(json_rpc_connection, perform_call,
737+
['_', "AuthCacheSvc.UpdateDB", '_', '_'],
738+
?SHORT_TIMEOUT)).
739+
740+
cbauth_sync_t() ->
741+
%% UpdateDB gets called once immediately
742+
meck:wait(json_rpc_connection, perform_call,
743+
['_', "AuthCacheSvc.UpdateDB", '_', '_'],
744+
?LONG_TIMEOUT),
745+
[ok] = sync(),
746+
%% Ensure that the next perform_call doesn't immediately return until after
747+
%% the sync call would time out
748+
meck:expect(json_rpc_connection, perform_call,
749+
fun (_, "AuthCacheSvc.UpdateDB", _, _) ->
750+
timer:sleep(2 * ?SHORT_TIMEOUT);
751+
(_, _, _, _) ->
752+
ok
753+
end),
754+
meck:reset(menelaus_cbauth_worker),
755+
%% Force an update by updating the snapshot
756+
fake_ns_config:update_snapshot(rest, [{port, 8092}]),
757+
%% Wait for the update to start being handled
758+
meck:wait(menelaus_cbauth_worker, notify, ['_', '_'], ?LONG_TIMEOUT),
759+
%% Sync with timeout half the perform_call sleep time, to ensure it gets hit
760+
?assertExit({timeout, _}, sync(node(), ?SHORT_TIMEOUT)).
761+
762+
cbauth_stats_t() ->
763+
meck:expect(json_rpc_connection, perform_call,
764+
fun (_Label, "AuthCacheSvc.GetStats", _EJsonArg, _Opts) ->
765+
{ok, {[ok]}};
766+
(_Label, _Call, _EJsonArg, _Opts) ->
767+
{ok, true}
768+
end),
769+
[{<<?SERVICE>>, ok}] = stats(),
770+
meck:expect(json_rpc_connection, perform_call,
771+
fun (_Label, "AuthCacheSvc.GetStats", _EJsonArg, _Opts) ->
772+
{error, error};
773+
(_Label, _Call, _EJsonArg, _Opts) ->
774+
{ok, true}
775+
end),
776+
[] = stats().
777+
778+
cbauth_many_notify_t() ->
779+
%% Wait for initial notify to be handled
780+
meck:wait(1, menelaus_cbauth_worker, notify, ['_', '_'],
781+
?LONG_TIMEOUT),
782+
meck:wait(1, json_rpc_connection, perform_call,
783+
['_', "AuthCacheSvc.UpdateDB", '_', '_'], ?LONG_TIMEOUT),
784+
%% Ensure that the next perform_call doesn't immediately return
785+
meck:expect(json_rpc_connection, perform_call,
786+
fun (_, "AuthCacheSvc.UpdateDB", _, _) ->
787+
receive return_from_call -> {ok, true}
788+
after ?LONG_TIMEOUT -> error(timeout) end;
789+
(_Label, _Call, _EJsonArg, _Opts) ->
790+
{ok, true}
791+
end),
792+
%% Send three updates, ensuring three notify calls with different values:
793+
%% - The first is to get the worker stuck in the above meck:expect
794+
%% - The second is to provide a value, which we want to become stale
795+
%% - The third will be the new value, which should get used, instead of the
796+
%% stale value, after the first update is processed
797+
lists:foreach(
798+
fun (N) ->
799+
fake_ns_config:update_snapshot(rest, [{port, N}]),
800+
meck:wait(N, menelaus_cbauth_worker, notify, ['_', '_'],
801+
?LONG_TIMEOUT)
802+
end, [2, 3, 4]),
803+
%% Extract value from next update
804+
meck:expect(json_rpc_connection, perform_call,
805+
fun (_, "AuthCacheSvc.UpdateDB", {Info}, _) ->
806+
[{Node}] = proplists:get_value(nodes, Info, []),
807+
[undefined, Port] = proplists:get_value(ports, Node),
808+
%% Expect the last value, not the stale value
809+
?assertEqual(4, Port),
810+
{ok, true};
811+
(_Label, _Call, _EJsonArg, _Opts) ->
812+
{ok, true}
813+
end),
814+
%% Release worker
815+
WorkerPid = whereis(list_to_atom("menelaus_cbauth_worker-" ++ ?LABEL)),
816+
WorkerPid ! return_from_call,
817+
%% Wait for both the stuck and final call to return
818+
meck:wait(3, json_rpc_connection, perform_call,
819+
['_', "AuthCacheSvc.UpdateDB", '_', '_'], ?LONG_TIMEOUT).
820+
821+
trigger_notification(ns_node_disco_events) ->
822+
%% To avoid needing to trick ns_node_disco into recognising fake nodes, just
823+
%% make the node list different by removing the only node.
824+
%% Note, this tests ns_node_disco_events as well as chronicle_kv, as the
825+
%% event handler for chronicle_compat_events in this module does not cover
826+
%% the nodes_wanted key (although the ns_node_disco handler does cover this
827+
%% key, hence how this tests the ns_node_disco_events handler)
828+
fake_chronicle_kv:update_snapshot(nodes_wanted, []);
829+
trigger_notification(ns_config_events) ->
830+
%% This is just an arbitrary key in ns_config that we use for the cbauth
831+
%% info, that is covered by the chronicle_compat_events handler
832+
fake_ns_config:update_snapshot(rest, [{port, 8092}]);
833+
trigger_notification(chronicle_kv) ->
834+
%% While the bucket_names key isn't subscribed to, the collections key is,
835+
%% and we need to update both for the info to be updated
836+
fake_chronicle_kv:update_snapshot(
837+
#{bucket_names => ["test"],
838+
{bucket, "test", collections} => [{uid, 0}],
839+
{bucket, "test", props} => [{type, membase}]});
840+
trigger_notification(user_storage_events) ->
841+
meck:expect(menelaus_users, get_users_version, 0, {1, 0}),
842+
%% It isn't worth the complexity to trigger a user version change through
843+
%% less artificial means, so just manually trigger the event
844+
gen_event:notify(user_storage_events, event);
845+
trigger_notification(ssl_service_events) ->
846+
%% It isn't worth the complexity to trigger an ssl_service_event through
847+
%% less artificial means, so just manually trigger the event
848+
gen_event:notify(ssl_service_events, client_cert_changed).
849+
850+
cbauth_notify_tests() ->
851+
%% Check that expected events cause notifications
852+
[{"cbauth notify " ++ atom_to_list(EventManager) ++ " test",
853+
fun () ->
854+
%% UpdateDB gets called once almost immediately
855+
meck:wait(json_rpc_connection, perform_call,
856+
['_', "AuthCacheSvc.UpdateDB", '_', '_'],
857+
?LONG_TIMEOUT),
858+
meck:reset(json_rpc_connection),
859+
trigger_notification(EventManager),
860+
meck:wait(json_rpc_connection, perform_call,
861+
['_', "AuthCacheSvc.UpdateDB", '_', '_'],
862+
?LONG_TIMEOUT)
863+
end} || EventManager <- [ns_node_disco_events,
864+
ns_config_events,
865+
chronicle_kv,
866+
user_storage_events,
867+
ssl_service_events]].
868+
869+
cbauth_test_() ->
870+
{foreach, fun setup_t/0, fun teardown_t/1,
871+
[{"cbauth init test", fun cbauth_init_t/0},
872+
{"cbauth sync test", fun cbauth_sync_t/0},
873+
{"cbauth stats test", fun cbauth_stats_t/0},
874+
{"cbauth many notify test", fun cbauth_many_notify_t/0}
875+
| cbauth_notify_tests()]}.
876+
-endif.

apps/ns_server/src/menelaus_cbauth_worker.erl

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
-behaviour(gen_server).
1414

15-
-export([start_monitor/4, notify/2, collect_stats/1, sync/1,
15+
-export([start_monitor/4, notify/2, collect_stats/1, sync/2,
1616
strip_cbauth_suffix/1]).
1717

1818
-export([init/1, handle_call/3, handle_cast/2,
@@ -50,8 +50,8 @@ notify(Pid, Info) ->
5050
collect_stats(Pid) ->
5151
gen_server:call(Pid, collect_stats).
5252

53-
sync(Pid) ->
54-
gen_server:call(Pid, sync).
53+
sync(Pid, Timeout) ->
54+
gen_server:call(Pid, sync, Timeout).
5555

5656
init([Label, Version, Pid, Params]) ->
5757
MRef = erlang:monitor(process, Pid),
@@ -82,17 +82,18 @@ handle_info(heartbeat, State = #state{label = Label, connection = Pid,
8282
ok ->
8383
{noreply, State}
8484
end;
85-
handle_info({notify, InfoHidden}, State = #state{label = Label,
86-
connection = Pid,
87-
version = Version}) ->
88-
Info = ?UNHIDE(InfoHidden),
85+
handle_info({notify, PotentiallyStaleInfo},
86+
State = #state{label = Label, connection = Pid,
87+
version = Version}) ->
88+
LatestInfoHidden = receive_latest_notify(PotentiallyStaleInfo),
89+
LatestInfo = ?UNHIDE(LatestInfoHidden),
8990
Method = case Version of
9091
internal ->
9192
"AuthCacheSvc.UpdateDB";
9293
_ ->
9394
"AuthCacheSvc.UpdateDBExt"
9495
end,
95-
case invoke_no_return_method(Label, Method, Pid, Info) of
96+
case invoke_no_return_method(Label, Method, Pid, LatestInfo) of
9697
error ->
9798
terminate_jsonrpc_connection(Label, Pid),
9899
misc:wait_for_process(Pid, infinity),
@@ -136,6 +137,23 @@ send_heartbeat(Label, Pid) ->
136137
?log_debug("Skip heartbeat for label ~p", [Label])
137138
end.
138139

140+
receive_latest_notify(Info) ->
141+
receive_latest_notify(Info, 0).
142+
143+
receive_latest_notify(Info, N) ->
144+
%% Handle any remaining notify messages, to ensure that the mailbox is
145+
%% cleared of redundant messages
146+
receive
147+
{notify, NewInfo} -> receive_latest_notify(NewInfo, N + 1)
148+
after
149+
0 ->
150+
case N of
151+
0 -> ok;
152+
_ -> ?log_debug("Skipped ~p old notify entries", [N])
153+
end,
154+
Info
155+
end.
156+
139157
invoke_no_return_method(Label, Method, Pid, Info) ->
140158
case perform_call(Label, Method, Pid, {Info}, false) of
141159
{ok, Res} when Res =:= true orelse Res =:= null ->

0 commit comments

Comments
 (0)