diff --git a/include/riakc.hrl b/include/riakc.hrl
index 66bc9adb..4323f865 100644
--- a/include/riakc.hrl
+++ b/include/riakc.hrl
@@ -24,7 +24,7 @@
-define(PROTO_MAJOR, 1).
-define(PROTO_MINOR, 0).
-define(DEFAULT_PB_TIMEOUT, 60000).
--define(FIRST_RECONNECT_INTERVAL, 100).
+-define(FIRST_RECONNECT_INTERVAL, 10).
-define(MAX_RECONNECT_INTERVAL, 30000).
-type client_option() :: queue_if_disconnected |
@@ -33,7 +33,8 @@
auto_reconnect |
{auto_reconnect, boolean()} |
keepalive |
- {keepalive, boolean()}.
+ {keepalive, boolean()} |
+ {stats, non_neg_integer()}.
%% Options for starting or modifying the connection:
%% `queue_if_disconnected' when present or true will cause requests to
%% be queued while the connection is down. `auto_reconnect' when
diff --git a/src/riakc.app.src b/src/riakc.app.src
index 084bc633..f6eb8214 100644
--- a/src/riakc.app.src
+++ b/src/riakc.app.src
@@ -1,13 +1,21 @@
%% -*- erlang -*-
{application, riakc,
[{description, "Riak Client"},
- {vsn, git},
+ {vsn, "2.0"},
{applications, [
kernel,
stdlib,
riak_pb
]},
{registered, []},
+ {modules, [riakc_counter,
+ riakc_datatype,
+ riakc_flag,
+ riakc_map,
+ riakc_obj,
+ riakc_pb_socket,
+ riakc_register,
+ riakc_set]},
{env, [
%% Set default timeout for operations.
%% Individual operation timeouts can be supplied,
diff --git a/src/riakc_obj.erl b/src/riakc_obj.erl
index 14d063cf..832616ef 100644
--- a/src/riakc_obj.erl
+++ b/src/riakc_obj.erl
@@ -67,7 +67,8 @@
clear_links/1,
delete_links/2,
set_link/2,
- add_link/2
+ add_link/2,
+ is_riakc_obj/1
]).
%% Internal library use only
-export([new_obj/4,index_id_to_bin/1]).
@@ -575,6 +576,9 @@ add_link(MD, [{T, IdList} | Rest]) ->
add_link(MD2, Rest)
end.
+is_riakc_obj(#riakc_obj{}) -> true;
+is_riakc_obj(_) -> false.
+
%% @doc INTERNAL USE ONLY. Set the contents of riakc_obj to the
%% {Metadata, Value} pairs in MVs. Normal clients should use the
%% set_update_[value|metadata]() + apply_updates() method for changing
diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl
index 96a50a10..821310b7 100644
--- a/src/riakc_pb_socket.erl
+++ b/src/riakc_pb_socket.erl
@@ -33,7 +33,7 @@
-include_lib("riak_pb/include/riak_search_pb.hrl").
-include_lib("riak_pb/include/riak_yokozuna_pb.hrl").
-include_lib("riak_pb/include/riak_dt_pb.hrl").
--include("riakc.hrl").
+-include_lib("riakc/include/riakc.hrl").
-behaviour(gen_server).
-export([start_link/2, start_link/3,
@@ -42,6 +42,8 @@
set_options/2, set_options/3,
is_connected/1, is_connected/2,
ping/1, ping/2,
+ queue_len/1,
+ stats_peek/1, stats_take/1, stats_change_level/2, merge_stats/2,
get_client_id/1, get_client_id/2,
set_client_id/2, set_client_id/3,
get_server_info/1, get_server_info/2,
@@ -95,6 +97,7 @@
-deprecated({get_index,'_', eventually}).
+-type timeout2() :: timeout() | {timeout(), timeout()}.
-type ctx() :: any().
-type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple().
-type rpb_resp() :: atom() | tuple().
@@ -127,7 +130,7 @@
%% of the same name on the `riakc' application, for example:
%% `application:set_env(riakc, ping_timeout, 5000).'
-record(request, {ref :: reference(), msg :: rpb_req(), from, ctx :: ctx(), timeout :: timeout(),
- tref :: reference() | undefined }).
+ tref :: reference() | undefined, timestamp}).
-type portnum() :: non_neg_integer(). %% The TCP port number of the Riak node's Protocol Buffers interface
-type address() :: string() | atom() | inet:ip_address(). %% The TCP/IP host name or address of the Riak node
@@ -141,6 +144,7 @@
transport = gen_tcp :: 'gen_tcp' | 'ssl',
active :: #request{} | undefined, % active request
queue :: queue() | undefined, % queue of pending requests
+ queue_len=0 :: non_neg_integer(), % queue size
connects=0 :: non_neg_integer(), % number of successful connects
failed=[] :: [connection_failure()], % breakdown of failed connects
connect_timeout=infinity :: timeout(), % timeout of TCP connection
@@ -152,6 +156,7 @@
% certificate authentication
ssl_opts = [], % Arbitrary SSL options, see the erlang SSL
% documentation.
+ stats,
reconnect_interval=?FIRST_RECONNECT_INTERVAL :: non_neg_integer()}).
%% @private Like `gen_server:call/3', but with the timeout hardcoded
@@ -231,6 +236,18 @@ ping(Pid) ->
ping(Pid, Timeout) ->
call_infinity(Pid, {req, rpbpingreq, Timeout}).
+queue_len(Pid) ->
+ call_infinity(Pid, {check, queue_len}).
+
+stats_peek(Pid) ->
+ stats_format(call_infinity(Pid, stats_peek)).
+
+stats_take(Pid) ->
+ stats_format(call_infinity(Pid, stats_take)).
+
+stats_change_level(Pid, NewLevel) ->
+ stats_format(call_infinity(Pid, {stats_change_level, NewLevel})).
+
%% @doc Get the client id for this connection
%% @equiv get_client_id(Pid, default_timeout(get_client_id_timeout))
-spec get_client_id(pid()) -> {ok, client_id()} | {error, term()}.
@@ -276,8 +293,10 @@ get(Pid, Bucket, Key) ->
%% @doc Get bucket/key from the server specifying timeout.
%% Will return {error, notfound} if the key is not on the server.
%% @equiv get(Pid, Bucket, Key, Options, Timeout)
--spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout() | get_options()) ->
+-spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | get_options()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
+get(Pid, Bucket, Key, {T1, T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ get(Pid, Bucket, Key, [], Timeout);
get(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
get(Pid, Bucket, Key, [], Timeout);
get(Pid, Bucket, Key, Options) ->
@@ -287,7 +306,7 @@ get(Pid, Bucket, Key, Options) ->
%% unchanged will be returned when the
%% {if_modified, Vclock} option is specified and the
%% object is unchanged.
--spec get(pid(), bucket(), key(), get_options(), timeout()) ->
+-spec get(pid(), bucket(), key(), get_options(), timeout2()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
get(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
@@ -305,10 +324,12 @@ put(Pid, Obj) ->
%% @doc Put the metadata/value in the object under bucket/key with options or timeout.
%% @equiv put(Pid, Obj, Options, Timeout)
%% @see put/4
--spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout() | put_options()) ->
+-spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout2() | put_options()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
put(Pid, Obj, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
put(Pid, Obj, [], Timeout);
+put(Pid, Obj, {T1, T2} = Timeout) when is_integer(T1), is_integer(T2); Timeout =:= infinity ->
+ put(Pid, Obj, [], Timeout);
put(Pid, Obj, Options) ->
put(Pid, Obj, Options, default_timeout(put_timeout)).
@@ -323,7 +344,7 @@ put(Pid, Obj, Options) ->
%% `return_body' was specified.
%% @throws siblings
%% @end
--spec put(pid(), riakc_obj(), put_options(), timeout()) ->
+-spec put(pid(), riakc_obj(), put_options(), timeout2()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
put(Pid, Obj, Options, Timeout) ->
Content = riak_pb_kv_codec:encode_content({riakc_obj:get_update_metadata(Obj),
@@ -344,15 +365,17 @@ delete(Pid, Bucket, Key) ->
%% @doc Delete the key/value specifying timeout or options. Note that the rw quorum is deprecated, use r and w.
%% @equiv delete(Pid, Bucket, Key, Options, Timeout)
--spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout() | delete_options()) ->
+-spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | delete_options()) ->
ok | {error, term()}.
delete(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
delete(Pid, Bucket, Key, [], Timeout);
+delete(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2); Timeout =:= infinity ->
+ delete(Pid, Bucket, Key, [], Timeout);
delete(Pid, Bucket, Key, Options) ->
delete(Pid, Bucket, Key, Options, default_timeout(delete_timeout)).
%% @doc Delete the key/value with options and timeout. Note that the rw quorum is deprecated, use r and w.
--spec delete(pid(), bucket(), key(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete(pid(), bucket(), key(), delete_options(), timeout2()) -> ok | {error, term()}.
delete(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
Req = delete_options(Options, #rpbdelreq{type = T, bucket = B, key = Key}),
@@ -406,7 +429,7 @@ delete_obj(Pid, Obj, Options) ->
%% @doc Delete the riak object with options and timeout.
%% @equiv delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj), riakc_obj:vclock(Obj), Options, Timeout)
%% @see delete_vclock/6
--spec delete_obj(pid(), riakc_obj(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete_obj(pid(), riakc_obj(), delete_options(), timeout2()) -> ok | {error, term()}.
delete_obj(Pid, Obj, Options, Timeout) ->
delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj),
riakc_obj:vclock(Obj), Options, Timeout).
@@ -1100,7 +1123,7 @@ cs_bucket_fold(Pid, Bucket, Opts) when is_pid(Pid), (is_binary(Bucket) orelse
%% @doc Return the default timeout for an operation if none is provided.
%% Falls back to the default timeout.
--spec default_timeout(timeout_name()) -> timeout().
+-spec default_timeout(timeout_name()) -> timeout2().
default_timeout(OpTimeout) ->
case application:get_env(riakc, OpTimeout) of
{ok, EnvTimeout} ->
@@ -1219,6 +1242,7 @@ modify_type(Pid, Fun, BucketAndType, Key, Options) ->
init([Address, Port, Options]) ->
%% Schedule a reconnect as the first action. If the server is up then
%% the handle_info(reconnect) will run before any requests can be sent.
+ process_flag(trap_exit,true),
State = parse_options(Options, #state{address = Address,
port = Port,
queue = queue:new()}),
@@ -1228,8 +1252,8 @@ init([Address, Port, Options]) ->
{ok, State};
false ->
case connect(State) of
- {error, Reason} ->
- {stop, {tcp, Reason}};
+ {error, _Reason} ->
+ {stop, normal};
Ok ->
Ok
end
@@ -1267,32 +1291,43 @@ handle_call(is_connected, _From, State) ->
end;
handle_call({set_options, Options}, _From, State) ->
{reply, ok, parse_options(Options, State)};
+handle_call({check, queue_len}, _From, #state{queue_len = QueueLen} = State) ->
+ {reply, QueueLen, State};
+handle_call(stats_peek, _From, #state{stats = Stats} = State) ->
+ {reply, Stats, State};
+handle_call(stats_take, _From, #state{stats = Stats} = State) ->
+ {reply, Stats, State#state{stats = init_stats(Stats)}};
+handle_call({stats_change_level, NewLevel}, _From, #state{stats = Stats} = State) ->
+ {reply, Stats, State#state{stats = init_stats(NewLevel)}};
handle_call(stop, _From, State) ->
- _ = disconnect(State),
- {stop, normal, ok, State}.
+ disconnect(State, false),
+ {stop, normal, ok, State};
+handle_call(get_state, _From, State) ->
+ {reply, State, State}.
%% @private
handle_info({tcp_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client TCP error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({tcp_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client SSL error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
%% Make sure the two Sock's match. If a request timed out, but there was
%% a response queued up behind it we do not want to process it. Instead
%% it should drop through and be ignored.
-handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
+handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active, stats = Stats0})
when Proto == tcp; Proto == ssl ->
+ Stats1 = record_stat(recv, iolist_size(Data), Stats0),
<> = Data,
Resp = case Active#request.msg of
{tunneled, _MsgID} ->
@@ -1303,16 +1338,18 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
end,
NewState = case Resp of
#rpberrorresp{} ->
- NewState1 = maybe_reply(on_error(Active, Resp, State)),
+ NewState1 = maybe_reply(on_error(Active, Resp, State#state{stats = Stats1})),
dequeue_request(NewState1#state{active = undefined});
_ ->
- case process_response(Active, Resp, State) of
- {reply, Response, NewState0} ->
+ case process_response(Active, Resp, State#state{stats = Stats1}) of
+ {reply, Response, NewState0 = #state{stats = Stats2}} ->
%% Send reply and get ready for the next request - send the next request
%% if one is queued up
cancel_req_timer(Active#request.tref),
- _ = send_caller(Response, NewState0#state.active),
- dequeue_request(NewState0#state{active = undefined});
+ send_caller(Response, NewState0#state.active),
+ Stats3 = record_stat({service_time, op_timeout(Active#request.timeout), op_type(Active#request.msg), bucket_name(Active)},
+ timer:now_diff(os:timestamp(), Active#request.timestamp), Stats2),
+ dequeue_request(NewState0#state{active = undefined, stats = Stats3});
{pending, NewState0} -> %% Request is still pending - do not queue up a new one
NewActive = restart_req_timer(Active),
NewState0#state{active = NewActive}
@@ -1325,19 +1362,14 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
ok = ssl:setopts(Sock, [{active, once}])
end,
{noreply, NewState};
-handle_info({req_timeout, Ref}, State) ->
- case State#state.active of %%
- undefined ->
- {noreply, remove_queued_request(Ref, State)};
- Active ->
- case Ref == Active#request.ref of
- true -> %% Matches the current operation
- NewState = maybe_reply(on_timeout(State#state.active, State)),
- disconnect(NewState#state{active = undefined});
- false ->
- {noreply, remove_queued_request(Ref, State)}
- end
- end;
+handle_info({TimeoutTag, Ref}, #state{active = #request{ref = Ref, msg = Msg, timeout = Timeout}, stats = Stats0} = State)
+ when TimeoutTag == op_timeout; TimeoutTag == req_timeout ->
+ NewState = maybe_reply(on_timeout(State#state.active,
+ State#state{stats = record_cntr({TimeoutTag, op_timeout(Timeout), op_type(Msg), bucket_name(State#state.active)}, Stats0)})),
+ disconnect(NewState#state{active = undefined}, false);
+handle_info({TimeoutTag, Ref}, State)
+ when TimeoutTag == q_timeout; TimeoutTag == req_timeout ->
+ {noreply, remove_queued_request(Ref, State, TimeoutTag)};
handle_info(reconnect, State) ->
case connect(State) of
{ok, NewState} ->
@@ -1345,7 +1377,7 @@ handle_info(reconnect, State) ->
{error, Reason} ->
%% Update the failed count and reschedule a reconnection
NewState = State#state{failed = orddict:update_counter(Reason, 1, State#state.failed)},
- disconnect(NewState)
+ disconnect(NewState, true)
end;
handle_info(_, State) ->
{noreply, State}.
@@ -1366,14 +1398,18 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% @private
%% Parse options
-parse_options([], State) ->
+parse_options([], #state{stats=Stats0} = State) ->
+ Stats1 = case Stats0 of
+ undefined -> init_stats(0);
+ _ -> Stats0
+ end,
%% Once all options are parsed, make sure auto_reconnect is enabled
%% if queue_if_disconnected is enabled.
case State#state.queue_if_disconnected of
true ->
- State#state{auto_reconnect = true};
+ State#state{auto_reconnect = true, stats = Stats1};
_ ->
- State
+ State#state{stats = Stats1}
end;
parse_options([{connect_timeout, T}|Options], State) when is_integer(T) ->
parse_options(Options, State#state{connect_timeout = T});
@@ -1400,7 +1436,9 @@ parse_options([{cacertfile, File}|Options], State) ->
parse_options([{keyfile, File}|Options], State) ->
parse_options(Options, State#state{keyfile=File});
parse_options([{ssl_opts, Opts}|Options], State) ->
- parse_options(Options, State#state{ssl_opts=Opts}).
+ parse_options(Options, State#state{ssl_opts=Opts});
+parse_options([{stats, Level}|Options], State) ->
+ parse_options(Options, State#state{stats=init_stats(Level)}).
maybe_reply({reply, Reply, State}) ->
Request = State#state.active,
@@ -1432,6 +1470,8 @@ get_options([{pr, PR} | Rest], Req) ->
get_options(Rest, Req#rpbgetreq{pr = riak_pb_kv_codec:encode_quorum(PR)});
get_options([{timeout, T} | Rest], Req) when is_integer(T)->
get_options(Rest, Req#rpbgetreq{timeout = T});
+get_options([{timeout, {T1,T2}} | Rest], Req) when is_integer(T1), is_integer(T2) ->
+ get_options(Rest, Req#rpbgetreq{timeout = T2});
get_options([{timeout, _T} | _Rest], _Req) ->
erlang:error(badarg);
get_options([{if_modified, VClock} | Rest], Req) ->
@@ -1459,6 +1499,8 @@ put_options([{pw, PW} | Rest], Req) ->
put_options(Rest, Req#rpbputreq{pw = riak_pb_kv_codec:encode_quorum(PW)});
put_options([{timeout, T} | Rest], Req) when is_integer(T) ->
put_options(Rest, Req#rpbputreq{timeout = T});
+put_options([{timeout, {T1,T2}} | Rest], Req) when is_integer(T1), is_integer(T2) ->
+ put_options(Rest, Req#rpbputreq{timeout = T2});
put_options([{timeout, _T} | _Rest], _Req) ->
erlang:error(badarg);
put_options([return_body | Rest], Req) ->
@@ -1499,6 +1541,8 @@ delete_options([{dw, DW} | Rest], Req) ->
delete_options(Rest, Req#rpbdelreq{dw = riak_pb_kv_codec:encode_quorum(DW)});
delete_options([{timeout, T} | Rest], Req) when is_integer(T) ->
delete_options(Rest, Req#rpbdelreq{timeout = T});
+delete_options([{timeout, {T1,T2}} | Rest], Req) when is_integer(T1), is_integer(T2) ->
+ put_options(Rest, Req#rpbdelreq{timeout = T2});
delete_options([{timeout, _T} | _Rest], _Req) ->
erlang:error(badarg);
delete_options([{n_val, N} | Rest], Req)
@@ -1634,7 +1678,7 @@ process_response(#request{msg = #rpbdelreq{}},
process_response(#request{msg = #rpblistbucketsreq{}}=Request,
#rpblistbucketsresp{buckets = Buckets, done = undefined},
State) ->
- _ = send_caller({buckets, Buckets}, Request),
+ send_caller({buckets, Buckets}, Request),
{pending, State};
process_response(#request{msg = #rpblistbucketsreq{}},
@@ -1644,13 +1688,13 @@ process_response(#request{msg = #rpblistbucketsreq{}},
process_response(#request{msg = #rpblistkeysreq{}}=Request,
#rpblistkeysresp{done = Done, keys = Keys}, State) ->
- _ = case Keys of
- undefined ->
- ok;
- _ ->
- %% Have to directly use send_caller as may want to reply with done below.
- send_caller({keys, Keys}, Request)
- end,
+ case Keys of
+ undefined ->
+ ok;
+ _ ->
+ %% Have to directly use send_caller as may want to reply with done below.
+ send_caller({keys, Keys}, Request)
+ end,
case Done of
true ->
{reply, done, State};
@@ -1678,13 +1722,13 @@ process_response(#request{msg = #rpbsetbuckettypereq{}},
process_response(#request{msg = #rpbmapredreq{content_type = ContentType}}=Request,
#rpbmapredresp{done = Done, phase=PhaseId, response=Data}, State) ->
- _ = case Data of
- undefined ->
- ok;
- _ ->
- Response = decode_mapred_resp(Data, ContentType),
- send_caller({mapred, PhaseId, Response}, Request)
- end,
+ case Data of
+ undefined ->
+ ok;
+ _ ->
+ Response = decode_mapred_resp(Data, ContentType),
+ send_caller({mapred, PhaseId, Response}, Request)
+ end,
case Done of
true ->
{reply, done, State};
@@ -1698,7 +1742,7 @@ process_response(#request{msg = #rpbindexreq{}}, rpbindexresp, State) ->
process_response(#request{msg = #rpbindexreq{stream=true, return_terms=Terms}}=Request,
#rpbindexresp{results=Results, keys=Keys, done=Done, continuation=Cont}, State) ->
ToSend = process_index_response(Terms, Keys, Results),
- _ = send_caller(ToSend, Request),
+ send_caller(ToSend, Request),
DoneResponse = {reply, {done, Cont}, State},
case Done of
true -> DoneResponse;
@@ -1725,7 +1769,7 @@ process_response(#request{msg = #rpbcsbucketreq{bucket=Bucket}}=Request, #rpbcsb
Objects),
{ok, CObjects}
end,
- _ = send_caller(ToSend, Request),
+ send_caller(ToSend, Request),
DoneResponse = {reply, {done, Cont}, State},
case Done of
true -> DoneResponse;
@@ -1913,11 +1957,13 @@ send_mapred_req(Pid, MapRed, ClientPid) ->
new_request(Msg, From, Timeout) ->
Ref = make_ref(),
#request{ref = Ref, msg = Msg, from = From, timeout = Timeout,
- tref = create_req_timer(Timeout, Ref)}.
+ tref = create_req_timer(Timeout, Ref),
+ timestamp = os:timestamp()}.
new_request(Msg, From, Timeout, Context) ->
Ref = make_ref(),
#request{ref =Ref, msg = Msg, from = From, ctx = Context, timeout = Timeout,
- tref = create_req_timer(Timeout, Ref)}.
+ tref = create_req_timer(Timeout, Ref),
+ timestamp = os:timestamp()}.
%% @private
%% Create a request timer if desired, otherwise return undefined.
@@ -1925,6 +1971,8 @@ create_req_timer(infinity, _Ref) ->
undefined;
create_req_timer(undefined, _Ref) ->
undefined;
+create_req_timer({Msecs,_}, Ref) ->
+ erlang:send_after(Msecs, self(), {q_timeout, Ref});
create_req_timer(Msecs, Ref) ->
erlang:send_after(Msecs, self(), {req_timeout, Ref}).
@@ -1933,7 +1981,7 @@ create_req_timer(Msecs, Ref) ->
cancel_req_timer(undefined) ->
ok;
cancel_req_timer(Tref) ->
- _ = erlang:cancel_timer(Tref),
+ erlang:cancel_timer(Tref),
ok.
%% @private
@@ -1953,14 +2001,17 @@ restart_req_timer(Request) ->
%% @private
%% Connect the socket if disconnected
connect(State) when State#state.sock =:= undefined ->
- #state{address = Address, port = Port, connects = Connects} = State,
+ #state{address = Address, port = Port, connects = Connects, stats = Stats0} = State,
+ TS0 = os:timestamp(),
case gen_tcp:connect(Address, Port,
[binary, {active, once}, {packet, 4},
{keepalive, State#state.keepalive}],
State#state.connect_timeout) of
{ok, Sock} ->
+ TS1 = os:timestamp(),
State1 = State#state{sock = Sock, connects = Connects+1,
- reconnect_interval = ?FIRST_RECONNECT_INTERVAL},
+ reconnect_interval = ?FIRST_RECONNECT_INTERVAL,
+ stats = record_stat(connect, timer:now_diff(TS1, TS0), Stats0)},
case State#state.credentials of
undefined ->
{ok, State1};
@@ -2032,14 +2083,14 @@ start_auth(State=#state{credentials={User,Pass}, sock=Sock}) ->
%% @private
%% Disconnect socket if connected
-disconnect(State) ->
+disconnect(State, DelayReconnect) ->
%% Tell any pending requests we've disconnected
- _ = case State#state.active of
- undefined ->
- ok;
- Request ->
- send_caller({error, disconnected}, Request)
- end,
+ case State#state.active of
+ undefined ->
+ ok;
+ Request ->
+ send_caller({error, disconnected}, Request)
+ end,
%% Make sure the connection is really closed
case State#state.sock of
@@ -2052,13 +2103,16 @@ disconnect(State) ->
%% Decide whether to reconnect or exit
NewState = State#state{sock = undefined, active = undefined},
- case State#state.auto_reconnect of
- true ->
+ case {State#state.auto_reconnect, DelayReconnect} of
+ {true, true} ->
%% Schedule the reconnect message and return state
erlang:send_after(State#state.reconnect_interval, self(), reconnect),
{noreply, increase_reconnect_interval(NewState)};
- false ->
- {stop, disconnected, NewState}
+ {true, false} ->
+ self() ! reconnect,
+ {noreply, NewState};
+ {false, _} ->
+ {stop, normal, NewState}
end.
%% Double the reconnect interval up to the maximum
@@ -2073,16 +2127,28 @@ increase_reconnect_interval(State) ->
%% Send a request to the server and prepare the state for the response
%% @private
-send_request(Request0, State) when State#state.active =:= undefined ->
- {Request, Pkt} = encode_request_message(Request0),
+send_request(#request{ref = Ref,
+ tref = TRef,
+ timeout = Timeout} = Request0,
+ State = #state{stats = Stats0})
+ when State#state.active =:= undefined ->
+ {Request1, Pkt} = encode_request_message(Request0#request{timestamp = os:timestamp()}),
+ Stats1 = record_stat(send, iolist_size(Pkt), Stats0),
Transport = State#state.transport,
case Transport:send(State#state.sock, Pkt) of
ok ->
- maybe_reply(after_send(Request, State#state{active = Request}));
+ case Timeout of
+ {_,Msecs} ->
+ cancel_req_timer(TRef),
+ Request2 = Request1#request{tref = erlang:send_after(Msecs, self(), {op_timeout, Ref})},
+ maybe_reply(after_send(Request2, State#state{active = Request2, stats = Stats1}));
+ _ ->
+ maybe_reply(after_send(Request1, State#state{active = Request1, stats = Stats1}))
+ end;
{error, Reason} ->
error_logger:warning_msg("Socket error while sending riakc request: ~p.", [Reason]),
Transport:close(State#state.sock),
- maybe_enqueue_and_reconnect(Request, State#state{sock=undefined})
+ maybe_enqueue_and_reconnect(Request1, State#state{sock=undefined, stats = Stats1})
end.
%% Already encoded (for tunneled messages), but must provide Message Id
@@ -2107,37 +2173,51 @@ maybe_reconnect(_) -> ok.
%% If we can queue while disconnected, do so, otherwise tell the
%% caller that the socket was disconnected.
enqueue_or_reply_error(Request, #state{queue_if_disconnected=true}=State) ->
- queue_request(Request, State);
+ case Request#request.timeout of
+ {_,_} -> send_caller({error, timeout}, Request); % we've already used part of the op timeout
+ _ -> queue_request_head(Request, State)
+ end;
enqueue_or_reply_error(Request, State) ->
- _ = send_caller({error, disconnected}, Request),
+ send_caller({error, disconnected}, Request),
State.
%% Queue up a request if one is pending
%% @private
-queue_request(Request, State) ->
- State#state{queue = queue:in(Request, State#state.queue)}.
+queue_request(Request, State) -> queue_request(Request, State, in).
+queue_request_head(Request, State) -> queue_request(Request, State, in_r).
+queue_request(Request0, #state{queue_len = QLen, queue = Q, stats = Stats0} = State, Infunc) ->
+ Request1 = Request0#request{timestamp = os:timestamp()},
+ Stats1 = record_cntr({queue_len, granulate(QLen + 1)}, Stats0),
+ State#state{queue_len = QLen + 1, queue = queue:Infunc(Request1, Q), stats = Stats1}.
%% Try and dequeue request and send onto the server if one is waiting
%% @private
-dequeue_request(State) ->
+dequeue_request(#state{queue_len = QLen, stats = Stats0} = State) ->
case queue:out(State#state.queue) of
{empty, _} ->
- State;
- {{value, Request}, Q2} ->
- send_request(Request, State#state{queue = Q2})
+ State#state{active = undefined};
+ {{value, #request{timestamp = TS0, timeout = Timeout} = Request}, Q2} ->
+ Now = os:timestamp(),
+ Stats1 = record_stat({queue_time, q_timeout(Timeout)}, timer:now_diff(Now, TS0), Stats0),
+ send_request(Request#request{timestamp = Now},
+ State#state{active = undefined,
+ queue_len = QLen - 1,
+ queue = Q2,
+ stats = Stats1})
end.
%% Remove a queued request by reference - returns same queue if ref not present
%% @private
-remove_queued_request(Ref, State) ->
- L = queue:to_list(State#state.queue),
- case lists:keytake(Ref, #request.ref, L) of
+remove_queued_request(Ref, #state{queue_len = QLen, queue = Q, stats = Stats0} = State, TimeoutTag) ->
+ case lists:keytake(Ref, #request.ref, queue:to_list(Q)) of
false -> % Ref not queued up
State;
- {value, Req, L2} ->
+ {value, #request{timeout = Timeout} = Req, L2} ->
{reply, Reply, NewState} = on_timeout(Req, State),
- _ = send_caller(Reply, Req),
- NewState#state{queue = queue:from_list(L2)}
+ send_caller(Reply, Req),
+ NewState#state{queue_len = QLen - 1,
+ queue = queue:from_list(L2),
+ stats = record_cntr({TimeoutTag, q_timeout(Timeout), bucket_name(Req)}, Stats0)}
end.
%% @private
@@ -2272,6 +2352,184 @@ maybe_make_bucket_type(undefined, Bucket) ->
maybe_make_bucket_type(Type, Bucket) ->
{Type, Bucket}.
+%% ====================================================================
+%% stats
+%% ====================================================================
+
+-record(stats, {timestamp, level, dict}).
+
+record_cntr(_Key, #stats{level = 0} = Stats) ->
+ Stats;
+record_cntr(Key, #stats{dict = D} = Stats) ->
+ Stats#stats{dict = dict:update_counter({count, Key}, 1, D)}.
+
+record_stat(_Key, _Val, #stats{level = 0} = Stats) ->
+ Stats;
+record_stat(Key, Val, #stats{dict = Dict0, level = 1} = Stats) ->
+ Dict1 = dict:update_counter({count, Key}, 1, Dict0),
+ Stats#stats{dict = dict:update_counter({total, Key}, Val, Dict1)};
+record_stat(Key, Val, #stats{dict = Dict0, level = 2} = Stats) ->
+ Dict1 = dict:update_counter({count, Key}, 1, Dict0),
+ Dict2 = dict:update_counter({total, Key}, Val, Dict1),
+ Stats#stats{dict = dict:update_counter({histogram, granulate(Val), Key}, 1, Dict2)}.
+
+init_stats(#stats{level = Level}) ->
+ init_stats(Level);
+init_stats(Level) ->
+ #stats{timestamp = os:timestamp(), level = Level, dict = dict:new()}.
+
+stats_format(#stats{timestamp = TS0, level = Level, dict = Dict}) ->
+ TS1 = os:timestamp(),
+ TDiff = timer:now_diff(TS1, TS0),
+ {Cntrs, Hists} = stats_format(
+ Level, lists:sort(dict:to_list(Dict)), [],
+ [{key, count, total, lists:reverse(steps(Level))}]),
+ {{TDiff, 1}, Cntrs, Hists}.
+
+stats_format(_Level, [], CAcc, HAcc) -> {CAcc, HAcc};
+stats_format(Level, [{{count, Key}, CVal} | List], CAcc, HAcc) ->
+ case lists:keytake({total, Key}, 1, List) of
+ false -> stats_format(Level, List, [{Key, CVal} | CAcc], HAcc);
+ {value, {{total, Key}, TVal}, NewList} ->
+ {OutList, Histogram} =
+ lists:foldl(fun(I, {LAcc, XAcc}) ->
+ case lists:keytake({histogram, I, Key}, 1, LAcc) of
+ false -> {LAcc, [0 | XAcc]};
+ {value, {{_, _, Key}, HVal}, LAcc2} -> {LAcc2, [HVal | XAcc]}
+ end
+ end, {NewList, []}, steps(Level)),
+ stats_format(Level, OutList, CAcc, [{Key, CVal, TVal, Histogram} | HAcc])
+ end.
+
+granulate(0) -> 0;
+granulate(1) -> 1;
+granulate(2) -> 2;
+granulate(N) when N =< 4 -> 4;
+granulate(N) when N =< 7 -> 7;
+granulate(N) when N =< 10 -> 10;
+granulate(N) when N =< 20 -> 20;
+granulate(N) when N =< 40 -> 40;
+granulate(N) when N =< 70 -> 70;
+granulate(N) when N =< 100 -> 100;
+granulate(N) when N =< 200 -> 200;
+granulate(N) when N =< 400 -> 400;
+granulate(N) when N =< 700 -> 700;
+granulate(N) when N =< 1000 -> 1000;
+granulate(N) when N =< 2000 -> 2000;
+granulate(N) when N =< 4000 -> 4000;
+granulate(N) when N =< 7000 -> 7000;
+granulate(N) when N =< 10000 -> 10000;
+granulate(N) when N =< 20000 -> 20000;
+granulate(N) when N =< 40000 -> 40000;
+granulate(N) when N =< 70000 -> 70000;
+granulate(N) when N =< 100000 -> 100000;
+granulate(N) when N =< 200000 -> 200000;
+granulate(N) when N =< 400000 -> 400000;
+granulate(N) when N =< 700000 -> 700000;
+granulate(N) when N =< 1000000 -> 1000000;
+granulate(N) when N =< 2000000 -> 2000000;
+granulate(N) when N =< 4000000 -> 4000000;
+granulate(N) when N =< 7000000 -> 7000000;
+granulate(N) when N =< 10000000 -> 10000000;
+granulate(N) when N =< 20000000 -> 20000000;
+granulate(N) when N =< 40000000 -> 40000000;
+granulate(N) when N =< 70000000 -> 70000000;
+granulate(N) when N =< 100000000 -> 100000000;
+granulate(N) when N =< 200000000 -> 200000000;
+granulate(N) when N =< 400000000 -> 400000000;
+granulate(N) when N =< 700000000 -> 700000000;
+granulate(_) -> 1000000000.
+
+steps(2) ->
+ [1000000000,
+ 700000000, 400000000, 200000000, 100000000,
+ 70000000, 40000000, 20000000, 10000000,
+ 7000000, 4000000, 2000000, 1000000,
+ 700000, 400000, 200000, 100000,
+ 70000, 40000, 20000, 10000,
+ 7000, 4000, 2000, 1000,
+ 700, 400, 200, 100,
+ 70, 40, 20, 10,
+ 7, 4, 2, 1,
+ 0];
+steps(_) -> [].
+
+op_type(rpbpingreq ) -> ping;
+op_type(rpbgetclientidreq ) -> get_client_id;
+op_type(rpbgetserverinforeq ) -> get_server_info;
+op_type({tunneled,_} ) -> tunneled;
+op_type(#dtfetchreq{} ) -> dt_fetch;
+op_type(#dtupdatereq{} ) -> dt_update;
+op_type(#rpbcountergetreq{} ) -> counter_get;
+op_type(#rpbcounterupdatereq{} ) -> counter_update;
+op_type(#rpbcsbucketreq{} ) -> cs_bucket;
+op_type(#rpbdelreq{} ) -> del;
+op_type(#rpbgetbucketreq{} ) -> get_bucket;
+op_type(#rpbgetbuckettypereq{} ) -> get_bucket_type;
+op_type(#rpbgetreq{} ) -> get;
+op_type(#rpbindexreq{} ) -> index;
+op_type(#rpblistbucketsreq{} ) -> list_buckets;
+op_type(#rpblistkeysreq{} ) -> list_keys;
+op_type(#rpbmapredreq{} ) -> mapred;
+op_type(#rpbputreq{} ) -> put;
+op_type(#rpbresetbucketreq{} ) -> reset_bucket;
+op_type(#rpbsearchqueryreq{} ) -> search_query;
+op_type(#rpbsetbucketreq{} ) -> set_bucket;
+op_type(#rpbsetbuckettypereq{} ) -> set_bucket_type;
+op_type(#rpbsetclientidreq{} ) -> set_client_id;
+op_type(#rpbyokozunaindexdeletereq{}) -> yokozuna_index_delete;
+op_type(#rpbyokozunaindexgetreq{} ) -> yokozuna_index_get;
+op_type(#rpbyokozunaindexputreq{} ) -> yokozuna_index_put;
+op_type(#rpbyokozunaschemagetreq{} ) -> yokozuna_schema_get;
+op_type(#rpbyokozunaschemaputreq{} ) -> yokozuna_schema_put;
+op_type(_ ) -> unknown_op.
+
+op_timeout({_,OPTimeout}) -> OPTimeout;
+op_timeout(Timeout) -> Timeout.
+
+q_timeout({QTimeout,_}) -> QTimeout;
+q_timeout(Timeout) -> Timeout.
+
+bucket_name(Req) ->
+ case Req#request.msg of
+ #rpbgetreq{bucket = B} -> B;
+ #rpbputreq{bucket = B} -> B;
+ #rpbdelreq{bucket = B} -> B;
+ rpbpingreq -> ping;
+ _ -> other_bucket
+ end.
+
+merge_stats({{TAcc1, TCnt1}, Cntrs1, HistG1}, {{TAcc2, TCnt2}, Cntrs2, HistG2}) ->
+ {{TAcc1 + TAcc2, TCnt1 + TCnt2}, add_cntrs(Cntrs1, Cntrs2, []), add_hists(HistG1, HistG2, [])}.
+
+add_cntrs([], [], Acc) -> Acc;
+add_cntrs([], Cntrs2, Acc) -> lists:append([Cntrs2, Acc]);
+add_cntrs(Cntrs1, [], Acc) -> lists:append([Cntrs1, Acc]);
+add_cntrs([{Cntr, Val1} = Cntr1 | Cntrs1], Cntrs2, Acc) ->
+ case lists:keytake(Cntr, 1, Cntrs2) of
+ false ->
+ add_cntrs(Cntrs1, Cntrs2, [Cntr1 | Acc]);
+ {value, {Cntr, Val2}, NewCntrs2} ->
+ add_cntrs(Cntrs1, NewCntrs2, [{Cntr, Val1 + Val2} | Acc])
+ end.
+
+add_hists([], [], Acc) -> Acc;
+add_hists([], Hists2, Acc) -> lists:append([Hists2, Acc]);
+add_hists(Hists1, [], Acc) -> lists:append([Hists1, Acc]);
+add_hists([{key, _, _, _} = HistRec | Hists1], Hists2, Acc) ->
+ add_hists(Hists1, lists:keydelete(key, 1, Hists2), [HistRec | Acc]);
+add_hists([{Key, Cntr1, Ttl1, Hist1} = HistRec1 | Hists1], Hists2, Acc) ->
+ case lists:keytake(Key, 1, Hists2) of
+ false ->
+ add_hists(Hists1, Hists2, [HistRec1 | Acc]);
+ {value, {Key, Cntr2, Ttl2, Hist2}, NewHists2} ->
+ add_hists(Hists1, NewHists2, [{Key, Cntr1 + Cntr2, Ttl1 + Ttl2, add_hist(Hist1, Hist2, [])} | Acc])
+ end.
+
+add_hist([], [], Acc) -> lists:reverse(Acc);
+add_hist([H1 | Hist1], [H2 | Hist2], Acc) ->
+ add_hist(Hist1, Hist2, [H1 + H2 | Acc]).
+
%% ====================================================================
%% unit tests
%% ====================================================================
@@ -2472,13 +2730,15 @@ maybe_start_network() ->
ok;
{error, {already_started, _}} ->
ok;
+ {error, {{already_started, _}, _}} ->
+ ok;
X ->
X
end.
bad_connect_test() ->
%% Start with an unlikely port number
- ?assertEqual({error, {tcp, econnrefused}}, start({127,0,0,1}, 65535)).
+ ?assertEqual({error, normal}, start({127,0,0,1}, 65535)).
queue_disconnected_test() ->
%% Start with an unlikely port number
@@ -2898,7 +3158,7 @@ live_node_tests() ->
%% Would really like this in a nested {setup, blah} structure
%% but eunit does not allow
{ok, Pid} = start_link(test_ip(), test_port()),
- Pid ! {req_timeout, make_ref()},
+ Pid ! {q_timeout, make_ref()},
?assertEqual(pong, ping(Pid))
end)},
@@ -3812,4 +4072,429 @@ live_node_tests() ->
end)}
].
+timeout_no_conn_test() ->
+ {ok, Pid} = start_link(test_ip(), 65225, [auto_reconnect, queue_if_disconnected]),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ % io:format(user, "RES: ~p~n", [{T,Info}]),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {150,10}), timer:sleep(1),
+ P02 = REQ(get, {100,10}), timer:sleep(1),
+ P03 = REQ(get, {150,10}), timer:sleep(1),
+ P04 = REQ(get, {100,10}), timer:sleep(1),
+ P05 = REQ(get, {150,10}), timer:sleep(1),
+ P06 = REQ(get, {100,10}), timer:sleep(1),
+ P07 = REQ(get, {150,10}), timer:sleep(1),
+ P08 = REQ(get, {100,10}), timer:sleep(1),
+ P09 = REQ(get, {150,10}), timer:sleep(1),
+ P10 = REQ(get, 20), timer:sleep(1),
+ P11 = REQ(get, 40), timer:sleep(1),
+ P12 = REQ(get, 60), timer:sleep(1),
+ P13 = REQ(get, 80), timer:sleep(1),
+ P14 = REQ(get, 20), timer:sleep(1),
+ P15 = REQ(get, 100), timer:sleep(250), 0 = queue_len(Pid),
+ P16 = REQ(get, {20,100}), timer:sleep(1),
+ P17 = REQ(get, {20,100}), timer:sleep(1),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+
+ io:format(user, " 150 TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]),
+ io:format(user, " 100 TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]),
+ lists:foreach(fun(T) -> true = T > 145, true = T < 159 end, [T01,T03,T05,T07,T09]),
+ lists:foreach(fun(T) -> true = T > 95, true = T < 109 end, [T02,T04,T06,T08]),
+ io:format(user, " TIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]),
+ true = T10 > 18, true = T10 < 25,
+ true = T11 > 37, true = T11 < 45,
+ true = T12 > 55, true = T12 < 65,
+ true = T13 > 73, true = T13 < 85,
+ true = T14 > 18, true = T14 < 25,
+ true = T15 > 98, true = T15 < 105,
+ true = T16 > 18, true = T16 < 25,
+ true = T17 > 18, true = T17 < 25,
+
+ stop(Pid).
+
+timeout_conn_test() ->
+ %% Set up a dummy socket to send requests on
+ {ok, DummyServerPid, Port} = dummy_server(noreply),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]),
+ erlang:monitor(process, DummyServerPid),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {100, 20}), timer:sleep(1),
+ P02 = REQ(get, {100, 20}), timer:sleep(1),
+ P03 = REQ(get, {100, 20}), timer:sleep(1),
+ P04 = REQ(get, {100, 20}), timer:sleep(1),
+ P05 = REQ(get, {100, 20}), timer:sleep(1),
+ P06 = REQ(get, {100, 20}), timer:sleep(1),
+ P07 = REQ(get, {100, 20}), timer:sleep(1),
+ P08 = REQ(get, {100, 20}), timer:sleep(1),
+ P09 = REQ(get, {100, 20}), timer:sleep(1),
+ P10 = REQ(get, {100, 20}), timer:sleep(1),
+ P11 = REQ(get, 20), timer:sleep(1),
+ P12 = REQ(get, 40), timer:sleep(1),
+ P13 = REQ(get, 60), timer:sleep(1),
+ P14 = REQ(get, 80), timer:sleep(1),
+ P15 = REQ(get, 20), timer:sleep(1),
+ P16 = REQ(get, 100), timer:sleep(200), 0 = queue_len(Pid),
+ P17 = REQ(get, {20, 100}), timer:sleep(1),
+ P18 = REQ(get, {20, 100}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+ {T18, {error, timeout}} = RES(P18),
+
+ io:format(user, " TIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n",
+ [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]),
+ true = T01 > 19, true = T01 < 26,
+ true = T02 > 37, true = T02 < 49,
+ true = T03 > 55, true = T03 < 70,
+ true = T04 > 73, true = T04 < 90,
+ true = T05 > 91, true = T05 < 117,
+ lists:foreach(fun(T) -> true = T > 98, true = T < 125 end, [T06,T07,T08,T09,T10]),
+ true = T11 > 18, true = T11 < 25,
+ true = T12 > 38, true = T12 < 48,
+ true = T13 > 58, true = T13 < 70,
+ true = T14 > 78, true = T14 < 90,
+ true = T15 > 18, true = T15 < 26,
+ true = T16 > 98, true = T16 < 106,
+ true = T17 > 98, true = T17 < 106,
+ true = T18 > 18, true = T18 < 26,
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg])
+ after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)])
+ end,
+
+ stop(Pid).
+
+stats_test() ->
+ {ok, Pid} = start_link(test_ip(), test_port(), [auto_reconnect, queue_if_disconnected]),
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ % io:format(user, "RES: ~p~n", [{T,Info}]),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ Traffic = fun() ->
+ P01 = REQ(get, {100, 20}),
+ P02 = REQ(get, {100, 20}),
+ P03 = REQ(get, {100, 20}),
+ P04 = REQ(get, {100, 20}),
+ P05 = REQ(get, {100, 20}),
+ P06 = REQ(get, {100, 20}),
+ P07 = REQ(get, {100, 20}),
+ P08 = REQ(get, {100, 20}),
+ P09 = REQ(get, {100, 20}),
+ P10 = REQ(get, {100, 20}),
+ P11 = REQ(get, 20),
+ P12 = REQ(get, 40),
+ P13 = REQ(get, 60),
+ P14 = REQ(get, 80),
+ P15 = REQ(get, 20),
+ P16 = REQ(get, 100),
+ P17 = REQ(get, {20, 100}),
+ P18 = REQ(get, {20, 100}),
+
+ lists:foreach(
+ fun(P) -> RES(P) end,
+ [P01,P02,P03,P04,P05,P06,P07,P08,P09,P10,
+ P11,P12,P13,P14,P15,P16,P17,P18])
+ end,
+
+ Traffic(),
+ {_,[],[_]} = stats_take(Pid),
+
+ stats_change_level(Pid, 1),
+ Traffic(),
+ {_,CL1,HL1} = stats_take(Pid),
+
+ stats_change_level(Pid, 2),
+ Traffic(),
+ {_,CL2,HL2} = stats_take(Pid),
+
+ true = length(CL1) >= 2,
+ true = length(CL2) >= 2,
+ true = length(HL1) >= 2,
+ true = length(HL2) >= 2,
+
+ lists:foreach(fun({_,_,_,[]}) -> ok end, HL1),
+ lists:foreach(fun({_,_,_,L}) -> true = length(L) == length(steps(2)) end, HL2),
+
+ % io:format(user, "~n~n~p~n~n~p~n~n", [HL2, HL1]),
+
+ stop(Pid).
+
+
+overload_test() ->
+ {ok, DummyServerPid, Port} = dummy_server({50, <<10>>}),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected, {stats,1}]),
+ timer:sleep(50),
+ erlang:monitor(process, DummyServerPid),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ Info = (catch apply(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), Info}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 4000 -> error
+ end
+ end,
+
+ TEST = fun(TO) ->
+
+ PidList = lists:foldl(fun(_,Acc) ->
+ timer:sleep(45),
+ [REQ(get, TO) | Acc]
+ end, [], lists:seq(1,200)),
+ timer:sleep(100),
+
+ ReplyList = lists:foldl(fun(RPid,Acc) ->
+ [RES(RPid) | Acc]
+ end, [], PidList),
+
+ {_,_,KL} = stats_take(Pid),
+ Reconns = case lists:keyfind(connect, 1, KL) of
+ false -> 0;
+ {_,V,_,_} -> V
+ end,
+ Replies = lists:foldl(
+ fun({error,Reply},Acc) ->
+ case lists:keyfind(Reply, 1, Acc) of
+ false -> [{Reply, 1} | Acc];
+ {Reply, C} ->
+ lists:keyreplace(Reply, 1, Acc, {Reply, C+1})
+ end
+ end, [], ReplyList),
+ TimeOuts = case lists:keyfind(timeout, 1, Replies) of
+ {_,TOV} -> TOV;
+ false -> 0
+ end,
+ NotFounds = case lists:keyfind(notfound, 1, Replies) of
+ {_,NFV} -> NFV;
+ false -> 0
+ end,
+
+ % io:format(user, "~nSTATS: ~p~n", [Stats])
+ io:format(user, " With timeout: ~p we got ~p reconnections, ~p timeouts and ~p replies~n",
+ [TO, Reconns, TimeOuts, NotFounds])
+
+ end,
+
+ stats_take(Pid),
+ TEST(60),
+ TEST({5,55}),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg])
+ after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)])
+ end,
+
+ stop(Pid).
+
+dummy_server(Directive) ->
+ {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, true}]),
+ {ok, Port} = inet:port(Listen),
+ Pid = spawn(?MODULE, dummy_server_loop, [{Listen, no_conn, Directive}]),
+ {ok, Pid, Port}.
+
+dummy_server_loop({Listen, no_conn, Directive}) ->
+ % case Directive of
+ % {SleepMs, _} -> timer:sleep(SleepMs div 2);
+ % _ -> ok
+ % end,
+ {ok, Sock} = gen_tcp:accept(Listen),
+ dummy_server_loop({Listen, Sock, Directive});
+dummy_server_loop({Listen, Sock, Directive}) ->
+ receive
+ stop -> ok;
+ {tcp_closed, Sock} -> dummy_server_loop({Listen, no_conn, Directive});
+ _Data ->
+ case Directive of
+ noreply -> dummy_server_loop({Listen, Sock, Directive}); % ignore requests, let them timeout
+ {SleepMs, Reply} ->
+ spawn(fun() -> timer:sleep(SleepMs), gen_tcp:send(Sock, Reply) end),
+ dummy_server_loop({Listen, Sock, Directive})
+ end
+ end.
+
+
+stats_demo() ->
+ {ok, DummyServerPid, Port} = dummy_server({5, <<10>>}),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected, {stats,2}]),
+ timer:sleep(50),
+ erlang:monitor(process, DummyServerPid),
+
+ GREQ = fun(Bkt, TO) ->
+ erlang:spawn(fun() ->
+ get(Pid, Bkt,
+ crypto:rand_bytes(1),
+ [], TO)
+ end)
+ end,
+
+ PREQ = fun(Bkt, TO) ->
+ erlang:spawn(fun() ->
+ put(Pid,
+ riakc_obj:new(Bkt,
+ crypto:rand_bytes(1),
+ crypto:rand_bytes(10)),
+ TO)
+ end)
+ end,
+
+ Traffic = fun() ->
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20}),
+ PREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt1">>, {100, 20}),
+ PREQ(<<"bkt2">>, {100, 20}),
+ GREQ(<<"bkt1">>, {100, 20}),
+ GREQ(<<"bkt2">>, {100, 20})
+ end,
+
+ Traffic(),
+ Traffic(),
+ Traffic(),
+ timer:sleep(500),
+ Stats = call_infinity(Pid, stats_peek),
+ io:format(user, "~n~n~p~n~n", [lists:sort(dict:to_list(Stats#stats.dict))]),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg])
+ after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)])
+ end,
+
+ stop(Pid).
+
+all_tests() ->
+ erlang:set_cookie(node(),riak),
+ lists:foreach(
+ fun(TestFun) ->
+ io:format(user, "TEST: ~p~n", [TestFun]),
+ ok = apply(?MODULE, TestFun, [])
+ end,
+ [bad_connect_test,
+ queue_disconnected_test,
+ auto_reconnect_bad_connect_test,
+ server_closes_socket_test,
+ auto_reconnect_server_closes_socket_test,
+ dead_socket_pid_returns_to_caller_test,
+ increase_reconnect_interval_test,
+ timeout_conn_test,
+ timeout_no_conn_test,
+ stats_test,
+ overload_test
+ ]).
+
-endif.
diff --git a/src/vsn.mk b/src/vsn.mk
new file mode 100644
index 00000000..79648e17
--- /dev/null
+++ b/src/vsn.mk
@@ -0,0 +1 @@
+VSN = 2.0