Skip to content

Commit dbdaaaf

Browse files
committed
Merge pull request #576 from basho/merge/2.1-into-develop
Merge 2.1 into develop
2 parents 3e74951 + ba4dcfd commit dbdaaaf

35 files changed

+876
-375
lines changed

include/yokozuna.hrl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@
160160
app_helper:get_env(riak_core, platform_data_dir)++"/yz")).
161161
-define(YZ_TEMP_DIR, app_helper:get_env(?YZ_APP_NAME, temp_dir,
162162
app_helper:get_env(riak_core, platform_data_dir)++"/yz_temp")).
163+
%% The request timeout for Solr calls. Defaults to 60 seconds.
164+
-define(YZ_SOLR_REQUEST_TIMEOUT, app_helper:get_env(?YZ_APP_NAME,
165+
solr_request_timeout,
166+
60000)).
163167
-define(YZ_PRIV, code:priv_dir(?YZ_APP_NAME)).
164168
-define(YZ_CORE_CFG_FILE, "solrconfig.xml").
165169
-define(YZ_INDEX_CMD, #yz_index_cmd).
@@ -237,11 +241,12 @@
237241
{partition, lp()} |
238242
{limit, pos_integer()}].
239243
-type ed_continuation() :: none | base64().
244+
-type ed_pairs() :: [{DocID::binary(), Hash::base64()}].
240245

241246
-record(entropy_data, {
242247
more=false :: boolean(),
243248
continuation :: ed_continuation(),
244-
pairs :: [{DocID::binary(), Hash::base64()}]
249+
pairs :: ed_pairs()
245250
}).
246251
-type entropy_data() :: #entropy_data{}.
247252
-type keydiff() :: hashtree:keydiff().
@@ -288,6 +293,8 @@
288293
-define(ERROR(Fmt), lager:error(Fmt)).
289294
-define(ERROR(Fmt, Args), lager:error(Fmt, Args)).
290295
-define(INFO(Fmt, Args), lager:info(Fmt, Args)).
296+
-define(NOTICE(Fmt, Args), lager:notice(Fmt, Args)).
297+
-define(NOTICE(Fmt), lager:notice(Fmt)).
291298
-define(WARN(Fmt, Args), lager:warning(Fmt, Args)).
292299

293300
%%%===================================================================

java_src/com/basho/yokozuna/handler/EntropyData.java

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import java.io.IOException;
2020

21-
import org.apache.commons.codec.binary.Base64;
21+
import javax.xml.bind.DatatypeConverter;
2222

2323
import org.apache.lucene.util.Bits;
2424
import org.apache.lucene.util.BytesRef;
@@ -117,40 +117,41 @@ public void handleRequestBody(final SolrQueryRequest req, final SolrQueryRespons
117117
tmp = te.next();
118118
}
119119

120-
String text;
121-
String[] vals;
122-
String docPartition;
123-
String vsn;
124-
String riakBType;
125-
String riakBName;
126-
String riakKey;
127-
String hash;
128120
int count = 0;
129121
BytesRef current = null;
130122
final Bits liveDocs = rdr.getLiveDocs();
131123

132124
while(!endOfItr(tmp) && count < n) {
133125
if (isLive(liveDocs, te)) {
134126
current = BytesRef.deepCopyOf(tmp);
135-
text = tmp.utf8ToString();
127+
final String text = tmp.utf8ToString();
136128
if (log.isDebugEnabled()) {
137129
log.debug("text: " + text);
138130
}
139-
vals = text.split(" ");
131+
final String [] vals = text.split(" ");
140132

141-
vsn = vals[0];
142-
docPartition = vals[1];
143-
riakBType = decodeBase64DocPart(vals[2]);
144-
riakBName = decodeBase64DocPart(vals[3]);
145-
riakKey = decodeBase64DocPart(vals[4]);
146-
hash = vals[5];
133+
final String docPartition = vals[1];
147134

135+
/*
136+
If the partition matches the one we are looking for,
137+
parse the version, bkey, and object hash from the
138+
entropy data field (term).
139+
*/
148140
if (partition.equals(docPartition)) {
149-
SolrDocument tmpDoc = new SolrDocument();
141+
final String vsn = vals[0];
142+
143+
final String [] decoded = decodeForVersion(vsn,
144+
vals[2],
145+
vals[3],
146+
vals[4]);
147+
148+
final String hash = vals[5];
149+
150+
final SolrDocument tmpDoc = new SolrDocument();
150151
tmpDoc.addField("vsn", vsn);
151-
tmpDoc.addField("riak_bucket_type", riakBType);
152-
tmpDoc.addField("riak_bucket_name", riakBName);
153-
tmpDoc.addField("riak_key", riakKey);
152+
tmpDoc.addField("riak_bucket_type", decoded[0]);
153+
tmpDoc.addField("riak_bucket_name", decoded[1]);
154+
tmpDoc.addField("riak_key", decoded[2]);
154155
tmpDoc.addField("base64_hash", hash);
155156
docs.add(tmpDoc);
156157
count++;
@@ -163,7 +164,8 @@ public void handleRequestBody(final SolrQueryRequest req, final SolrQueryRespons
163164
rsp.add("more", false);
164165
} else {
165166
rsp.add("more", true);
166-
final String newCont = Base64.encodeBase64URLSafeString(current.bytes);
167+
final String newCont =
168+
org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString(current.bytes);
167169
// The continue context for next req to start where
168170
// this one finished.
169171
rsp.add("continuation", newCont);
@@ -182,7 +184,7 @@ static boolean isLive(final Bits liveDocs, final TermsEnum te) throws IOExceptio
182184
}
183185

184186
static BytesRef decodeCont(final String cont) {
185-
final byte[] bytes = Base64.decodeBase64(cont);
187+
final byte[] bytes = org.apache.commons.codec.binary.Base64.decodeBase64(cont);
186188
return new BytesRef(bytes);
187189
}
188190

@@ -209,12 +211,38 @@ public String getSource() {
209211
return "TODO: implement getSource";
210212
}
211213

214+
/**
215+
@param vsn a String vsn number referring to the item's ed handler version
216+
@param riakBType riak bucket-type
217+
@param riakBName riak bucket-name
218+
@param riakKey riak key
219+
@return a String array consisting of a Bucket Type, Bucket Name, and Riak Key
220+
*/
221+
private String [] decodeForVersion(String vsn, String riakBType, String riakBName, String riakKey) {
222+
final String [] bKeyInfo;
223+
switch(Integer.parseInt(vsn)) {
224+
case 1:
225+
bKeyInfo = new String [] {riakBType, riakBName, riakKey};
226+
break;
227+
default:
228+
bKeyInfo = new String []
229+
{
230+
decodeBase64DocPart(riakBType),
231+
decodeBase64DocPart(riakBName),
232+
decodeBase64DocPart(riakKey)
233+
};
234+
break;
235+
}
236+
return bKeyInfo;
237+
}
238+
212239
/**
213240
@param base64EncodedVal base64 encoded string
214241
@return a string of decoded base64 bytes
215-
*/
242+
*/
216243
private String decodeBase64DocPart(String base64EncodedVal) {
217-
byte[] bytes = Base64.decodeBase64(base64EncodedVal);
218-
return new String(bytes);
244+
return new String(DatatypeConverter.parseBase64Binary(
245+
base64EncodedVal));
219246
}
220247
}
248+

misc/bench/schemas/fruit_schema.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
<field name="text" type="text_ws" indexed="true" stored="false" multiValued="true"/>
88

9+
<field name="date_register" type="tdate" indexed="true" stored="true" multiValued="false" />
10+
911
<field name="_version_" type="long" indexed="true" stored="true"/>
1012

1113
<!-- Entropy Data: Data related to anti-entropy -->
@@ -42,6 +44,10 @@
4244

4345
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
4446

47+
<fieldType name="date" class="solr.TrieDateField" precisionStep="0" positionIncrementGap="0"/>
48+
<!-- A Trie based date field for faster date range queries and date faceting. -->
49+
<fieldType name="tdate" class="solr.TrieDateField" precisionStep="6" positionIncrementGap="0"/>
50+
4551
<!-- A text field that only splits on whitespace for exact matching of words -->
4652
<fieldType name="text_ws" class="solr.TextField" positionIncrementGap="100">
4753
<analyzer>

priv/yokozuna.schema

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,12 @@
5959
{datatype, directory},
6060
hidden
6161
]}.
62+
63+
%% @doc The timeout for ibrowse (ibrowse:send_req) requests to Solr endpoints.
64+
%% Defaults to 60 seconds. It will always round up to the nearest second, e.g.
65+
%% 1ms = 999 ms = 1s.
66+
{mapping, "search.solr.request_timeout", "yokozuna.solr_request_timeout", [
67+
{default, "60s"},
68+
{datatype, {duration, ms}},
69+
hidden
70+
]}.

rebar.config

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
{git, "git://github.com/etrepum/kvc.git", {tag, "v1.5.0"}}},
1515
{riak_kv, ".*",
1616
{git, "git://github.com/basho/riak_kv.git", {branch, "develop"}}},
17-
{ibrowse, "4.0.2",
18-
{git, "git://github.com/cmullaparthi/ibrowse.git", {tag, "v4.0.2"}}}
17+
{ibrowse, "4.0.1",
18+
{git, "git://github.com/cmullaparthi/ibrowse.git", {tag, "v4.0.1"}}}
1919
]}.
2020

2121
{pre_hooks, [{compile, "./tools/grab-solr.sh"}]}.

riak_test/aae_test.erl

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ aae_run(Cluster, Bucket, Index) ->
105105

106106
verify_no_repair_for_non_indexed_data(Cluster, Bucket, PBConns),
107107

108+
verify_count_and_repair_after_error_value(Cluster, Bucket, Index,
109+
PBConns),
110+
108111
yz_rt:close_pb_conns(PBConns),
109112

110113
verify_no_repair_after_restart(Cluster),
@@ -321,6 +324,39 @@ verify_no_repair_for_non_indexed_data(Cluster, {BType, _Bucket}, PBConns) ->
321324
verify_no_repair_for_non_indexed_data(_Cluster, _Bucket, _PBConns) ->
322325
ok.
323326

327+
-spec verify_count_and_repair_after_error_value([node()], bucket(),
328+
index_name(), [pid()])
329+
-> ok.
330+
verify_count_and_repair_after_error_value(Cluster, {BType, _Bucket}, Index,
331+
PBConns) ->
332+
lager:info("verify total count and repair occurred for failed-to-index (bad) data"),
333+
Bucket = {BType, Index},
334+
335+
%% 1. write KV data to non-indexed bucket
336+
Conn = yz_rt:select_random(PBConns),
337+
lager:info("write 1 bad search field to bucket ~p", [Bucket]),
338+
Obj = riakc_obj:new(Bucket, <<"akey_bad_data">>, <<"{\"date_register\":3333}">>,
339+
"application/json"),
340+
341+
ok = riakc_pb_socket:put(Conn, Obj),
342+
343+
%% 2. setup tracing to count repair calls
344+
ok = yz_rt:count_calls(Cluster, ?REPAIR_MFA),
345+
346+
%% 3. wait for full exchange round
347+
ok = yz_rt:wait_for_full_exchange_round(Cluster, now()),
348+
ok = yz_rt:stop_tracing(),
349+
350+
%% 4. verify repair count is 0
351+
?assertEqual(?N * 1, yz_rt:get_call_count(Cluster, ?REPAIR_MFA)),
352+
353+
%% 5. verify count after expiration
354+
verify_exchange_after_expire(Cluster, Index),
355+
356+
ok;
357+
verify_count_and_repair_after_error_value(_Cluster, _Bucket, _Index, _PBConns) ->
358+
ok.
359+
324360
verify_num_match(Cluster, Index, Num) ->
325361
verify_num_match(yokozuna, Cluster, Index, Num).
326362

@@ -331,10 +367,10 @@ verify_num_match(Type, Cluster, Index, Num) ->
331367
if Type =:= solr ->
332368
Shards = [{N, yz_rt:node_solr_port(N)} || N <- Cluster],
333369
yz_rt:search_expect(Type, {Host, yz_rt:node_solr_port(Node)},
334-
Index, "text", "apricot", Shards, Num);
370+
Index, "*", "*", Shards, Num);
335371
true ->
336372
yz_rt:search_expect(Type, HP,
337-
Index, "text", "apricot", Num)
373+
Index, "*", "*", Num)
338374
end
339375
end,
340376
yz_rt:wait_until(Cluster, F).
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,25 @@
11
-module(yz_solr_intercepts).
22
-compile(export_all).
33

4+
-type index_name() :: binary().
5+
6+
-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).
7+
8+
-spec slow_cores() -> {ok, []}.
49
slow_cores() ->
510
timer:sleep(6000),
611
{ok, []}.
12+
13+
-spec entropy_data_cant_complete(index_name(), list()) -> {error, term()}.
14+
entropy_data_cant_complete(Core, Filter) ->
15+
Params = [{wt, json}|Filter] -- [{continuation, none}],
16+
Params2 = proplists:substitute_aliases([{continuation, continue},
17+
{limit,n}], Params),
18+
Opts = [{response_format, binary}],
19+
URL = ?FMT("~s/~s/entropy_data?~s",
20+
[yz_solr:base_url(), Core, mochiweb_util:urlencode(Params2)]),
21+
case ibrowse:send_req(URL, [], get, [], Opts, 0) of
22+
Error ->
23+
{error, Error}
24+
end.
25+

riak_test/yokozuna_essential.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ confirm() ->
6060
setup_indexing(Cluster, PBConns, YZBenchDir),
6161
verify_non_existent_index(Cluster, <<"froot">>),
6262
{0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS),
63-
%% wait for soft-commit
64-
timer:sleep(1100),
63+
yz_rt:commit(Cluster, ?INDEX),
6564
Ref = async_query(Cluster, YZBenchDir),
6665
%% Verify data exists before running join
6766
timer:sleep(30000),

0 commit comments

Comments
 (0)