Skip to content

Commit facf2c7

Browse files
authored
Merge pull request #1135 from basho/dr-wait-for-crdt-results
Wait for CRDT search results, rather than failing after the first attempt.
2 parents de73a49 + 61c1ea8 commit facf2c7

File tree

2 files changed

+57
-41
lines changed

2 files changed

+57
-41
lines changed

src/yokozuna_rt.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
check_exists/2,
2727
clear_trees/1,
2828
commit/2,
29+
drain_solrqs/1,
2930
expire_trees/1,
3031
gen_keys/1,
3132
host_entries/1,
@@ -455,6 +456,13 @@ commit(Nodes, Index) ->
455456
rpc:multicall(Nodes, yz_solr, commit, [Index]),
456457
ok.
457458

459+
-spec drain_solrqs(node() | cluster()) -> ok.
460+
drain_solrqs(Cluster) when is_list(Cluster) ->
461+
[drain_solrqs(Node) || Node <- Cluster];
462+
drain_solrqs(Node) ->
463+
rpc:call(Node, yz_solrq_drain_mgr, drain, []),
464+
ok.
465+
458466
-spec override_schema(pid(), [node()], index_name(), schema_name(), string()) ->
459467
{ok, [node()]}.
460468
override_schema(Pid, Cluster, Index, Schema, RawUpdate) ->

tests/yz_crdt.erl

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -59,59 +59,67 @@ confirm() ->
5959
?KEY,
6060
riakc_map:to_op(Map2)),
6161

62+
yokozuna_rt:drain_solrqs(Nodes),
6263
yokozuna_rt:commit(Nodes, ?INDEX),
63-
6464
%% Perform simple queries, check for register, set fields.
65-
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
65+
ok = rt:wait_until(
66+
fun() ->
67+
validate_search_results(Pid)
68+
end),
69+
%% Stop PB connection.
70+
riakc_pb_socket:stop(Pid),
71+
72+
pass.
73+
74+
validate_search_results(Pid) ->
75+
try
76+
{ok, {search_results, Results1a, _, _}} = riakc_pb_socket:search(
6677
Pid, ?INDEX, <<"name_register:Chris*">>),
67-
lager:info("Search name_register:Chris*: ~p~n", [Results1a]),
68-
?assertEqual(length(Results1a), 1),
69-
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)),
70-
list_to_binary(?KEY)),
71-
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)),
72-
<<"thing">>),
73-
74-
{ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search(
78+
lager:info("Search name_register:Chris*: ~p~n", [Results1a]),
79+
?assertEqual(length(Results1a), 1),
80+
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results1a)),
81+
list_to_binary(?KEY)),
82+
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results1a)),
83+
<<"thing">>),
84+
85+
{ok, {search_results, Results2a, _, _}} = riakc_pb_socket:search(
7586
Pid, ?INDEX, <<"interests_set:thing*">>),
76-
lager:info("Search interests_set:thing*: ~p~n", [Results2a]),
77-
?assertEqual(length(Results2a), 1),
78-
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)),
79-
list_to_binary(?KEY)),
80-
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)),
81-
<<"thing">>),
82-
83-
{ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search(
87+
lager:info("Search interests_set:thing*: ~p~n", [Results2a]),
88+
?assertEqual(length(Results2a), 1),
89+
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results2a)),
90+
list_to_binary(?KEY)),
91+
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results2a)),
92+
<<"thing">>),
93+
94+
{ok, {search_results, Results3a, _, _}} = riakc_pb_socket:search(
8495
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
85-
lager:info("Search testbucket: ~p~n", [Results3a]),
86-
?assertEqual(length(Results3a), 1),
87-
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)),
88-
list_to_binary(?KEY)),
89-
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)),
90-
<<"thing">>),
91-
92-
%% Redo queries and check if results are equal
93-
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
96+
lager:info("Search testbucket: ~p~n", [Results3a]),
97+
?assertEqual(length(Results3a), 1),
98+
?assertEqual(?GET(<<"name_register">>, ?GET(?INDEX, Results3a)),
99+
list_to_binary(?KEY)),
100+
?assertEqual(?GET(<<"interests_set">>, ?GET(?INDEX, Results3a)),
101+
<<"thing">>),
102+
103+
%% Redo queries and check if results are equal
104+
{ok, {search_results, Results1b, _, _}} = riakc_pb_socket:search(
94105
Pid, ?INDEX, <<"name_register:Chris*">>),
95-
?assertEqual(number_of_fields(Results1a),
96-
number_of_fields(Results1b)),
106+
?assertEqual(number_of_fields(Results1a),
107+
number_of_fields(Results1b)),
97108

98-
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
109+
{ok, {search_results, Results2b, _, _}} = riakc_pb_socket:search(
99110
Pid, ?INDEX, <<"interests_set:thing*">>),
100-
?assertEqual(number_of_fields(Results2a),
101-
number_of_fields(Results2b)),
111+
?assertEqual(number_of_fields(Results2a),
112+
number_of_fields(Results2b)),
102113

103-
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
114+
{ok, {search_results, Results3b, _, _}} = riakc_pb_socket:search(
104115
Pid, ?INDEX, <<"_yz_rb:testbucket">>),
105116
?assertEqual(number_of_fields(Results3a),
106117
number_of_fields(Results3b)),
107-
108-
%% Stop PB connection.
109-
riakc_pb_socket:stop(Pid),
110-
111-
%% Clean cluster.
112-
rt:clean_cluster(Nodes),
113-
114-
pass.
118+
true
119+
catch Err:Reason ->
120+
lager:info("Waiting for CRDT search results to converge. Error was ~p.", [{Err, Reason}]),
121+
false
122+
end.
115123

116124
%% @private
117125
number_of_fields(Resp) ->

0 commit comments

Comments
 (0)