diff --git a/include/yokozuna.hrl b/include/yokozuna.hrl index 3213ee36..cd485dd9 100644 --- a/include/yokozuna.hrl +++ b/include/yokozuna.hrl @@ -389,6 +389,13 @@ -define(YZ_ED_FIELD_XML, ?YZ_FIELD_XML(?YZ_ED_FIELD_S)). -define(YZ_ED_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_ed\" and @type=\"_yz_str\" and @indexed=\"true\" and @multiValued=\"false\"]"). +%% Entropy Data +-define(YZ_HA_FIELD, '_yz_ha'). +-define(YZ_HA_FIELD_S, "_yz_ha"). +-define(YZ_HA_FIELD_B, <<"_yz_ha">>). +-define(YZ_HA_FIELD_XML, ?YZ_FIELD_XML(?YZ_HA_FIELD_S)). +-define(YZ_HA_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_ha\" and @type=\"binary\" and @indexed=\"false\" and @multiValued=\"false\"]"). + %% First Partition Number -define(YZ_FPN_FIELD, '_yz_fpn'). -define(YZ_FPN_FIELD_S, "_yz_fpn"). diff --git a/priv/default_schema.xml b/priv/default_schema.xml index 2f987b0a..e52d0ded 100644 --- a/priv/default_schema.xml +++ b/priv/default_schema.xml @@ -111,6 +111,9 @@ + + + diff --git a/src/yz_solr.erl b/src/yz_solr.erl index eb57371a..db6e5926 100644 --- a/src/yz_solr.erl +++ b/src/yz_solr.erl @@ -33,7 +33,9 @@ {delete_index, deleteIndex}, {delete_data_dir, deleteDataDir}]). -define(FIELD_ALIASES, [{continuation, continue}, - {limit, n}]). + {limit,n}]). +-define(CURSOR_FIELD_ALIASES, [{continuation, cursorMark}, + {limit, rows}]). -define(QUERY(Bin), {struct, [{'query', Bin}]}). -define(LOCALHOST, "localhost"). @@ -174,6 +176,15 @@ delete(Index, Ops) -> -spec entropy_data(index_name(), ed_filter()) -> ED::entropy_data() | {error, term()}. entropy_data(Core, Filter) -> + case app_helper:get_env(?YZ_APP_NAME, entropy_data_cursor, false) of + false -> + entropy_data_request_handler(Core, Filter); + true -> + entropy_data_cursor(Core, Filter) + end. + + +entropy_data_request_handler(Core, Filter) -> Params = [{wt, json}|Filter] -- [{continuation, none}], Params2 = proplists:substitute_aliases(?FIELD_ALIASES, Params), Opts = [{response_format, binary}], @@ -190,6 +201,32 @@ entropy_data(Core, Filter) -> {error, X} end. +entropy_data_cursor(Core, Filter) -> + Continuation = + case proplists:get_value(continuation, Filter) of + none -> {cursorMark, "*"}; + Any -> {nextCursorMark, Any} + end, + Params = [Continuation | + proplists:delete(continuation, proplists:delete(partition, Filter))], + Params2 = proplists:substitute_aliases(?CURSOR_FIELD_ALIASES, Params), + URL = ?FMT("~s/~s/select?q=_yz_pn:~p&wt=json&sort=score+desc,_yz_id+asc&fl=_yz_rt,_yz_rb,_yz_rk,_yz_ha&~s", + [base_url(), Core, proplists:get_value(partition, Filter), mochiweb_util:urlencode(Params2)]), + lager:info("FDUSHIN> URL: ~s", [URL]), + Opts = [{response_format, binary}], + case ibrowse:send_req(URL, [], get, [], Opts) of + {ok, "200", _Headers, Body} -> + R = mochijson2:decode(Body), + lager:info("FDUSHIN> R: ~p", [R]), + More = kvc:path([<<"nextCursorMark">>], R) /= [], + lager:info("FDUSHIN> nextCursorMark: ~p", [kvc:path([<<"nextCursorMark">>], R)]), + Cursor = get_continuation_cursor(More, R), + Pairs = get_cursor_pairs(R), + make_ed(More, Cursor, Pairs); + X -> + {error, X} + end. + %% @doc Index the given `Docs'. index(Core, Docs) -> index(Core, Docs, []). @@ -467,10 +504,19 @@ get_continuation(false, _R) -> get_continuation(true, R) -> kvc:path([<<"continuation">>], R). +get_continuation_cursor(false, _R) -> + none; +get_continuation_cursor(true, R) -> + kvc:path([<<"nextCursorToken">>], R). + get_pairs(R) -> Docs = kvc:path([<<"response">>, <<"docs">>], R), [to_pair(DocStruct) || DocStruct <- Docs]. +get_cursor_pairs(R) -> + Docs = kvc:path([<<"response">>, <<"docs">>], R), + [to_cursor_pair(DocStruct) || DocStruct <- Docs]. + %% @doc Convert a doc struct into a pair. Remove the bucket_type to match %% kv trees when iterating over entropy data to build yz trees. to_pair({struct, [{_,_Vsn},{_,<<"default">>},{_,BName},{_,Key},{_,Base64Hash}]}) -> @@ -478,6 +524,19 @@ to_pair({struct, [{_,_Vsn},{_,<<"default">>},{_,BName},{_,Key},{_,Base64Hash}]}) to_pair({struct, [{_,_Vsn},{_,BType},{_,BName},{_,Key},{_,Base64Hash}]}) -> {{{BType, BName},Key}, base64:decode(Base64Hash)}. +to_cursor_pair({struct, Values}) when is_list(Values) -> + BucketType = proplists:get_value(?YZ_RT_FIELD_B, Values), + BucketName = proplists:get_value(?YZ_RB_FIELD_B, Values), + Key = proplists:get_value(?YZ_RK_FIELD_B, Values), + Hash = base64:decode(proplists:get_value(?YZ_HA_FIELD_B, Values)), + Bucket = case BucketName of + <<"default">> -> + BucketName; + _ -> + {BucketType, BucketName} + end, + {{Bucket, Key}, Hash}. + get_doc_pairs(Resp) -> Docs = kvc:path([<<"docs">>], Resp), [to_doc_pairs(DocStruct) || DocStruct <- Docs]. @@ -494,4 +553,4 @@ make_ed(More, Continuation, Pairs) -> -spec shard_frag(index_name(), {string(), string()}) -> string(). shard_frag(Core, {Host, Port}) -> - ?FMT("~s:~s"++?SOLR_HOST_CONTEXT++"/~s", [Host, Port, Core]). \ No newline at end of file + ?FMT("~s:~s"++?SOLR_HOST_CONTEXT++"/~s", [Host, Port, Core]).