diff --git a/include/yokozuna.hrl b/include/yokozuna.hrl index f9aec24c..5c9aed00 100644 --- a/include/yokozuna.hrl +++ b/include/yokozuna.hrl @@ -402,6 +402,13 @@ -define(YZ_ED_FIELD_XML, ?YZ_FIELD_XML(?YZ_ED_FIELD_S)). -define(YZ_ED_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_ed\" and @type=\"_yz_str\" and @indexed=\"true\" and @multiValued=\"false\"]"). +%% Entropy Data +-define(YZ_HA_FIELD, '_yz_ha'). +-define(YZ_HA_FIELD_S, "_yz_ha"). +-define(YZ_HA_FIELD_B, <<"_yz_ha">>). +-define(YZ_HA_FIELD_XML, ?YZ_FIELD_XML(?YZ_HA_FIELD_S)). +-define(YZ_HA_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_ha\" and @type=\"_yz_str\" and @stored=\"true\" and @multiValued=\"false\"]"). + %% First Partition Number -define(YZ_FPN_FIELD, '_yz_fpn'). -define(YZ_FPN_FIELD_S, "_yz_fpn"). diff --git a/misc/bench/schemas/fruit_schema.xml b/misc/bench/schemas/fruit_schema.xml index b6e55090..e96883a6 100644 --- a/misc/bench/schemas/fruit_schema.xml +++ b/misc/bench/schemas/fruit_schema.xml @@ -13,6 +13,9 @@ + + + diff --git a/priv/default_schema.xml b/priv/default_schema.xml index 32058ca6..0e184d86 100644 --- a/priv/default_schema.xml +++ b/priv/default_schema.xml @@ -112,6 +112,9 @@ + + + diff --git a/riak_test/yz_aae_test.erl b/riak_test/yz_aae_test.erl index 0daeafe5..8d6adf05 100644 --- a/riak_test/yz_aae_test.erl +++ b/riak_test/yz_aae_test.erl @@ -35,6 +35,7 @@ %% allow AAE to build trees and exchange rapidly {anti_entropy_build_limit, {100, 1000}}, {anti_entropy_concurrency, 8}, + {entropy_data_cursor, true}, {aae_throttle_limits, ?AAE_THROTTLE_LIMITS} ]} ]). diff --git a/src/yz_doc.erl b/src/yz_doc.erl index 8ee99398..d0e77960 100644 --- a/src/yz_doc.erl +++ b/src/yz_doc.erl @@ -94,7 +94,7 @@ make_doc(O, Hash, LiveSiblings, {MD, V}, FPN, Partition) -> EntropyData = gen_ed(O, Hash, Partition), Bkey = {yz_kv:get_obj_bucket(O), yz_kv:get_obj_key(O)}, Fields = make_fields({DocId, Bkey, FPN, - Partition, Vtag, EntropyData}), + Partition, Vtag, EntropyData, Hash}), ExtractedFields = extract_fields({MD, V}), Tags = extract_tags(MD), {doc, lists:append([Tags, ExtractedFields, Fields])}; @@ -120,17 +120,18 @@ encode_doc_part([$% | Rest], Acc) -> encode_doc_part([C | Rest], Acc) -> encode_doc_part(Rest, [C | Acc]). -make_fields({DocId, {Bucket, Key}, FPN, Partition, none, EntropyData}) -> +make_fields({DocId, {Bucket, Key}, FPN, Partition, none, EntropyData, Hash}) -> [{?YZ_ID_FIELD, DocId}, - {?YZ_ED_FIELD, EntropyData}, + {?YZ_ED_FIELD, EntropyData}, %% deprecated + {?YZ_HA_FIELD, base64:encode(Hash)}, {?YZ_FPN_FIELD, FPN}, {?YZ_PN_FIELD, Partition}, {?YZ_RK_FIELD, Key}, {?YZ_RT_FIELD, yz_kv:bucket_type(Bucket)}, {?YZ_RB_FIELD, yz_kv:bucket_name(Bucket)}]; -make_fields({DocId, BKey, FPN, Partition, Vtag, EntropyData}) -> - Fields = make_fields({DocId, BKey, FPN, Partition, none, EntropyData}), +make_fields({DocId, BKey, FPN, Partition, Vtag, EntropyData, Hash}) -> + Fields = make_fields({DocId, BKey, FPN, Partition, none, EntropyData, Hash}), [{?YZ_VTAG_FIELD, Vtag}|Fields]. %% @doc Get vtag for MetaData entry. diff --git a/src/yz_schema.erl b/src/yz_schema.erl index 90c294cd..aaf25063 100644 --- a/src/yz_schema.erl +++ b/src/yz_schema.erl @@ -152,6 +152,7 @@ verify_uk(Schema) -> verify_fields({ok, Schema}) -> Fields = [?YZ_ID_FIELD_XPATH, ?YZ_ED_FIELD_XPATH, + ?YZ_HA_FIELD_XPATH, ?YZ_FPN_FIELD_XPATH, ?YZ_VTAG_FIELD_XPATH, ?YZ_PN_FIELD_XPATH, diff --git a/src/yz_solr.erl b/src/yz_solr.erl index b600ffb9..da617306 100644 --- a/src/yz_solr.erl +++ b/src/yz_solr.erl @@ -57,6 +57,8 @@ {delete_data_dir, deleteDataDir}]). -define(FIELD_ALIASES, [{continuation, continue}, {limit, n}]). +-define(CURSOR_FIELD_ALIASES, [{continuation, cursorMark}, + {limit, rows}]). -define(QUERY(Bin), {struct, [{'query', Bin}]}). -define(LOCALHOST, "localhost"). @@ -200,6 +202,15 @@ delete(Index, Ops) -> -spec entropy_data(index_name(), ed_filter()) -> ED::entropy_data() | {error, term()}. entropy_data(Core, Filter) -> + case app_helper:get_env(?YZ_APP_NAME, entropy_data_cursor, false) of + false -> + entropy_data_request_handler(Core, Filter); + true -> + entropy_data_cursor(Core, Filter) + end. + + +entropy_data_request_handler(Core, Filter) -> Params = [{wt, json}|Filter] -- [{continuation, none}], Params2 = proplists:substitute_aliases(?FIELD_ALIASES, Params), Opts = [{response_format, binary}], @@ -216,6 +227,50 @@ entropy_data(Core, Filter) -> {error, X} end. +-define( + ED_FORMAT_STRING, + "~s/~s/select?q=_yz_pn:~p&wt=json&sort=_yz_id+asc&fl=_yz_rt,_yz_rb,_yz_rk,_yz_ha&~s" +). + +entropy_data_cursor(Core, Filter) -> + CursorMark = + case proplists:get_value(continuation, Filter) of + none -> "*"; + C -> C + end, + Params0 = proplists:delete(continuation, proplists:delete(partition, Filter)), + Params1 = [{cursorMark, CursorMark} | Params0], + Params2 = proplists:substitute_aliases(?CURSOR_FIELD_ALIASES, Params1), + Partition = proplists:get_value(partition, Filter), + URL = ?FMT( + ?ED_FORMAT_STRING, + [base_url(), Core, Partition, mochiweb_util:urlencode(Params2)] + ), + %lager:info("FDUSHIN> URL: ~s", [URL]), + Opts = [{response_format, binary}], + case ibrowse:send_req(URL, [], get, [], Opts) of + {ok, "200", _Headers, Body} -> + R = mochijson2:decode(Body), + NextCursorMark = kvc:path([<<"nextCursorMark">>], R), + %lager:info("FDUSHIN> CursorMark: ~p NextCursorMark: ~p", [CursorMark, NextCursorMark]), + More = NextCursorMark /= CursorMark, + Pairs = get_cursor_pairs(R), + make_ed(More, NextCursorMark, Pairs); + X -> + {error, X} + end. + +%% @doc Index the given `Docs'. +index(Core, Docs) -> + index(Core, Docs, []). + +-spec index(index_name(), list(), [delete_op()]) -> ok. +index(Core, Docs, DelOps) -> + Ops = {struct, + [{delete, encode_delete(Op)} || Op <- DelOps] ++ + [{add, encode_doc(D)} || D <- Docs]}, + JSON = mochijson2:encode(Ops), + index_batch(Core, Ops) -> ?EQC_DEBUG("index_batch: About to send entries. ~p~n", [Ops]), JSON = encode_json(Ops), @@ -501,6 +556,10 @@ get_pairs(R) -> Docs = kvc:path([<<"response">>, <<"docs">>], R), [to_pair(DocStruct) || DocStruct <- Docs]. +get_cursor_pairs(R) -> + Docs = kvc:path([<<"response">>, <<"docs">>], R), + [to_cursor_pair(DocStruct) || DocStruct <- Docs]. + %% @doc Convert a doc struct into a pair. Remove the bucket_type to match %% kv trees when iterating over entropy data to build yz trees. to_pair({struct, [{_,_Vsn},{_,<<"default">>},{_,BName},{_,Key},{_,Base64Hash}]}) -> @@ -508,6 +567,19 @@ to_pair({struct, [{_,_Vsn},{_,<<"default">>},{_,BName},{_,Key},{_,Base64Hash}]}) to_pair({struct, [{_,_Vsn},{_,BType},{_,BName},{_,Key},{_,Base64Hash}]}) -> {{{BType, BName},Key}, base64:decode(Base64Hash)}. +to_cursor_pair({struct, Values}) when is_list(Values) -> + BucketType = proplists:get_value(?YZ_RT_FIELD_B, Values), + BucketName = proplists:get_value(?YZ_RB_FIELD_B, Values), + Key = proplists:get_value(?YZ_RK_FIELD_B, Values), + Hash = base64:decode(proplists:get_value(?YZ_HA_FIELD_B, Values)), + Bucket = case BucketName of + <<"default">> -> + BucketName; + _ -> + {BucketType, BucketName} + end, + {{Bucket, Key}, Hash}. + get_doc_pairs(Resp) -> Docs = kvc:path([<<"docs">>], Resp), [to_doc_pairs(DocStruct) || DocStruct <- Docs].