|
| 1 | +%% ------------------------------------------------------------------- |
| 2 | +%% |
| 3 | +%% Copyright (c) 2016 Basho Technologies, Inc. |
| 4 | +%% |
| 5 | +%% This file is provided to you under the Apache License, |
| 6 | +%% Version 2.0 (the "License"); you may not use this file |
| 7 | +%% except in compliance with the License. You may obtain |
| 8 | +%% a copy of the License at |
| 9 | +%% |
| 10 | +%% http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +%% |
| 12 | +%% Unless required by applicable law or agreed to in writing, |
| 13 | +%% software distributed under the License is distributed on an |
| 14 | +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | +%% KIND, either express or implied. See the License for the |
| 16 | +%% specific language governing permissions and limitations |
| 17 | +%% under the License. |
| 18 | +%% |
| 19 | +%% ------------------------------------------------------------------- |
| 20 | +%%% @doc r_t to test hll datatypes across a riak cluster |
| 21 | + |
| 22 | +-module(test_hll). |
| 23 | + |
| 24 | +-export([confirm/0]). |
| 25 | + |
| 26 | +-include_lib("eunit/include/eunit.hrl"). |
| 27 | + |
| 28 | +-define(HLL_TYPE1, <<"hlls1">>). |
| 29 | +-define(HLL_TYPE2, <<"hlls2">>). |
| 30 | +-define(HLL_TYPE3, <<"hlls3">>). |
| 31 | +-define(BUCKET1, {?HLL_TYPE1, <<"testbucket1">>}). |
| 32 | +-define(BUCKET2, {?HLL_TYPE2, <<"testbucket2">>}). |
| 33 | +-define(DEFAULT_P, 14). |
| 34 | +-define(SET_P, 16). |
| 35 | +-define(BAD_P, 1). |
| 36 | +-define(P_SETTING, hll_precision). |
| 37 | +-define(KEY, <<"flipit&reverseit">>). |
| 38 | +-define(CONFIG, |
| 39 | + [ |
| 40 | + {riak_core, |
| 41 | + [{ring_creation_size, 8}, |
| 42 | + {anti_entropy_build_limit, {100, 1000}}, |
| 43 | + {anti_entropy_concurrency, 8}, |
| 44 | + {handoff_concurrency, 16}] |
| 45 | + } |
| 46 | + ]). |
| 47 | + |
| 48 | +confirm() -> |
| 49 | + %% Configure cluster. |
| 50 | + Nodes = [N1, N2, N3, N4] = rt:build_cluster(4, ?CONFIG), |
| 51 | + |
| 52 | + NodeRand1A = rt:select_random([N1, N2]), |
| 53 | + NodeRand1B = rt:select_random([N1, N2]), |
| 54 | + NodeRand2A = rt:select_random([N3, N4]), |
| 55 | + NodeRand2B = rt:select_random([N3, N4]), |
| 56 | + |
| 57 | + lager:info("Create PB/HTTP Clients from first two nodes, and then" |
| 58 | + " the second two nodes, as we'll partition Nodes 1 & 2 from" |
| 59 | + " Nodes 3 & 4 later"), |
| 60 | + |
| 61 | + %% Create PB connection. |
| 62 | + PBC1 = rt:pbc(NodeRand1A), |
| 63 | + PBC2 = rt:pbc(NodeRand2A), |
| 64 | + riakc_pb_socket:set_options(PBC1, [queue_if_disconnected]), |
| 65 | + riakc_pb_socket:set_options(PBC2, [queue_if_disconnected]), |
| 66 | + |
| 67 | + %% Create HTTPC connection. |
| 68 | + HttpC1 = rt:httpc(NodeRand1B), |
| 69 | + HttpC2 = rt:httpc(NodeRand2B), |
| 70 | + |
| 71 | + ok = rt:create_activate_and_wait_for_bucket_type(Nodes, |
| 72 | + ?HLL_TYPE1, |
| 73 | + [{datatype, hll}, |
| 74 | + {?P_SETTING, ?SET_P}]), |
| 75 | + |
| 76 | + ok = rt:create_activate_and_wait_for_bucket_type(Nodes, |
| 77 | + ?HLL_TYPE2, |
| 78 | + [{datatype, hll}]), |
| 79 | + |
| 80 | + lager:info("Create a bucket-type w/ a HLL datatype and a bad HLL precision" |
| 81 | + " - This should throw an error"), |
| 82 | + ?assertError({badmatch, {error, [{hll_precision, _}]}}, |
| 83 | + rt:create_activate_and_wait_for_bucket_type(Nodes, |
| 84 | + ?HLL_TYPE3, |
| 85 | + [{datatype, hll}, |
| 86 | + {?P_SETTING, |
| 87 | + ?BAD_P}])), |
| 88 | + |
| 89 | + pb_tests(PBC1, PBC2, riakc_pb_socket, ?BUCKET1, Nodes), |
| 90 | + http_tests(HttpC1, HttpC2, rhc, ?BUCKET2, Nodes), |
| 91 | + |
| 92 | + %% Stop PB connections. |
| 93 | + riakc_pb_socket:stop(PBC1), |
| 94 | + riakc_pb_socket:stop(PBC2), |
| 95 | + |
| 96 | + pass. |
| 97 | + |
| 98 | +http_tests(C1, C2, CMod, Bucket, Nodes) -> |
| 99 | + lager:info("HTTP CLI TESTS: Create new Hll DT"), |
| 100 | + |
| 101 | + add_tests(C1, CMod, Bucket), |
| 102 | + |
| 103 | + HllSet0 = get_hll(C1, CMod, Bucket), |
| 104 | + check_precision_and_reduce_test(C1, CMod, Bucket, ?DEFAULT_P, HllSet0), |
| 105 | + |
| 106 | + partition_write_heal(C1, C2, CMod, Bucket, Nodes), |
| 107 | + |
| 108 | + HllSet1 = get_hll(C1, CMod, Bucket), |
| 109 | + check_precision_and_reduce_invalid_test(C1, CMod, Bucket, ?DEFAULT_P - 1, |
| 110 | + HllSet1), |
| 111 | + |
| 112 | + ok. |
| 113 | + |
| 114 | +pb_tests(C1, C2, CMod, Bucket, Nodes) -> |
| 115 | + lager:info("PB CLI TESTS: Create new Hll DT"), |
| 116 | + |
| 117 | + add_tests(C1, CMod, Bucket), |
| 118 | + |
| 119 | + HllSet0 = get_hll(C1, CMod, Bucket), |
| 120 | + check_precision_and_reduce_test(C1, CMod, Bucket, ?SET_P, HllSet0), |
| 121 | + |
| 122 | + partition_write_heal(C1, C2, CMod, Bucket, Nodes), |
| 123 | + |
| 124 | + HllSet1 = get_hll(C1, CMod, Bucket), |
| 125 | + check_precision_and_reduce_invalid_test(C1, CMod, Bucket, ?SET_P - 1, HllSet1), |
| 126 | + |
| 127 | + ok. |
| 128 | + |
| 129 | +add_tests(C, CMod, Bucket) -> |
| 130 | + S0 = riakc_hll:new(), |
| 131 | + |
| 132 | + add_element(C, CMod, Bucket, S0, <<"OH">>), |
| 133 | + {ok, S1} = CMod:fetch_type(C, Bucket, ?KEY), |
| 134 | + ?assertEqual(riakc_hll:value(S1), 1), |
| 135 | + |
| 136 | + add_elements(C, CMod, Bucket, S1, [<<"C">>, <<"A">>, <<"P">>]), |
| 137 | + {ok, S2} = CMod:fetch_type(C, Bucket, ?KEY), |
| 138 | + ?assertEqual(riakc_hll:value(S2), 4), |
| 139 | + |
| 140 | + add_redundant_element(C, CMod, Bucket, S2, <<"OH">>), |
| 141 | + {ok, S3} = CMod:fetch_type(C, Bucket, ?KEY), |
| 142 | + ?assertEqual(riakc_hll:value(S3), 4). |
| 143 | + |
| 144 | +partition_write_heal(C1, C2, CMod, Bucket, Nodes) -> |
| 145 | + lager:info("Partition cluster in two to force merge."), |
| 146 | + [N1, N2, N3, N4] = Nodes, |
| 147 | + PartInfo = rt:partition([N1, N2], [N3, N4]), |
| 148 | + |
| 149 | + try |
| 150 | + lager:info("Write to one side of the partition"), |
| 151 | + {ok, S0} = CMod:fetch_type(C1, Bucket, ?KEY), |
| 152 | + add_element(C1, CMod, Bucket, S0, <<"OH hello there">>), |
| 153 | + {ok, S1} = CMod:fetch_type(C1, Bucket, ?KEY), |
| 154 | + ?assertEqual(riakc_hll:value(S1), 5), |
| 155 | + |
| 156 | + lager:info("Write to the other side of the partition"), |
| 157 | + {ok, S2} = CMod:fetch_type(C2, Bucket, ?KEY), |
| 158 | + add_element(C2, CMod, Bucket, S2, <<"Riak 1.4.eva">>), |
| 159 | + {ok, S3} = CMod:fetch_type(C2, Bucket, ?KEY), |
| 160 | + ?assertEqual(riakc_hll:value(S3), 5), |
| 161 | + |
| 162 | + lager:info("Heal") |
| 163 | + after |
| 164 | + ok = rt:heal(PartInfo) |
| 165 | + end, |
| 166 | + |
| 167 | + ok = rt:wait_until_no_pending_changes(Nodes), |
| 168 | + ok = rt:wait_until_transfers_complete(Nodes), |
| 169 | + |
| 170 | + lager:info("Once healed, check both sides for the correct, merged value"), |
| 171 | + {ok, S4} = CMod:fetch_type(C1, Bucket, ?KEY), |
| 172 | + ?assertEqual(riakc_hll:value(S4), 6), |
| 173 | + {ok, S5} = CMod:fetch_type(C2, Bucket, ?KEY), |
| 174 | + ?assertEqual(riakc_hll:value(S5), 6). |
| 175 | + |
| 176 | +get_hll(C, CMod, Bucket) -> |
| 177 | + {ok, Obj} =CMod:get(C, Bucket, ?KEY), |
| 178 | + {ok, CRDT} = riak_kv_crdt:from_binary(riakc_obj:get_value(Obj)), |
| 179 | + {_, _, _, HllSet} = CRDT, |
| 180 | + HllSet. |
| 181 | + |
| 182 | +add_element(C, CMod, Bucket, S, Elem) -> |
| 183 | + lager:info("Add element to HLL DT"), |
| 184 | + CMod:update_type( |
| 185 | + C, Bucket, ?KEY, |
| 186 | + riakc_hll:to_op( |
| 187 | + riakc_hll:add_element(Elem, S))). |
| 188 | + |
| 189 | +add_elements(C, CMod, Bucket, S, Elems) -> |
| 190 | + lager:info("Add multiple elements to HLL DT"), |
| 191 | + CMod:update_type( |
| 192 | + C, Bucket, ?KEY, |
| 193 | + riakc_hll:to_op( |
| 194 | + riakc_hll:add_elements(Elems, S))). |
| 195 | + |
| 196 | +add_redundant_element(C, CMod, Bucket, S, Elem) -> |
| 197 | + lager:info("Add redundant element to HLL DT by calling" |
| 198 | + " add_element/3 again"), |
| 199 | + add_element(C, CMod, Bucket, S, Elem). |
| 200 | + |
| 201 | +check_precision_and_reduce_test(C, CMod, Bucket, ExpP, HllSet) -> |
| 202 | + {ok, Props0} = CMod:get_bucket(C, Bucket), |
| 203 | + ?assertEqual(proplists:get_value(?P_SETTING, Props0), ExpP), |
| 204 | + ?assertEqual(riak_kv_hll:precision(HllSet), ExpP), |
| 205 | + ok = CMod:set_bucket(C, Bucket, [{?P_SETTING, ExpP - 1}]), |
| 206 | + {ok, Props1} = CMod:get_bucket(C, Bucket), |
| 207 | + ?assertEqual(proplists:get_value(?P_SETTING, Props1), ExpP - 1). |
| 208 | + |
| 209 | +check_precision_and_reduce_invalid_test(C, CMod, Bucket, ExpP, HllSet) -> |
| 210 | + lager:info("HLL's can be reduced, but never increased.\n" |
| 211 | + " Test to make sure we don't allow invalid values."), |
| 212 | + |
| 213 | + ?assertEqual(riak_kv_hll:precision(HllSet), ExpP), |
| 214 | + {error, _} = CMod:set_bucket(C, Bucket, [{?P_SETTING, ExpP + 1}]), |
| 215 | + {ok, Props} = CMod:get_bucket(C, Bucket), |
| 216 | + ?assertEqual(proplists:get_value(?P_SETTING, Props), ExpP), |
| 217 | + ?assertEqual(riak_kv_hll:precision(HllSet), ExpP). |
0 commit comments