Skip to content
Open
95 changes: 95 additions & 0 deletions src/batch_processor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
%% ----------------------------------------------------------------------------------------
%% @author Hritik Soni <hritik.s@greyorange.sg>
%% @end
%% ----------------------------------------------------------------------------------------

-module(batch_processor).

-behaviour(gen_statem).

%% API
-export([
start_link/1
]).

%% gen_server callbacks
-export([init/1, callback_mode/0,
terminate/3]).

-export([ready/3]).

-record(state, {
batch_proc_fun = undefined,
flush_threshold
}).

%%%===================================================================
%%% API
%%%===================================================================

%% @doc Starts the server
start_link(Args) ->
gen_statem:start_link(?MODULE, Args, []).

%%%===================================================================
%%% gen_statem callbacks
%%%==================================================================

callback_mode() ->
state_functions.

%% @private
%% @doc Initializes the server
init(Args) ->
%% Don't store data in main heap to improve performance during congestion
process_flag(message_queue_data, off_heap),
BatchProcessFun = proplists:get_value(batch_proc_fun, Args),
FlushThreshold = proplists:get_value(flush_threshold, Args, 1000000),
{ok, ready, #state{batch_proc_fun = BatchProcessFun, flush_threshold = FlushThreshold}}.

ready(info, Data, State) ->
ProcFun = State#state.batch_proc_fun,
%% Use erlang:monotonic_time() to avoid time warps
StartTime = erlang:monotonic_time(millisecond),
NewData = receive_and_merge([Data], StartTime, State),
ProcFun(NewData),
keep_state_and_data.

drop_mq() ->
receive
_Data ->
drop_mq()
after 0 ->
ok
end.

flush_if_necessary(T) ->
{_, L} = process_info(self(), message_queue_len),
case L > T of
true -> drop_mq();
false -> ok
end.

receive_and_merge(AccData, _StartTime, State) when length(AccData) >= 500 ->
flush_if_necessary(State#state.flush_threshold),
AccData;
receive_and_merge(AccData, StartTime, State) ->
Time = erlang:monotonic_time(millisecond),
TimeLeft = 1000 - (Time - StartTime),
if
TimeLeft =< 0 ->
AccData;
true ->
receive
Data ->
receive_and_merge([Data | AccData], StartTime, State)
after
TimeLeft ->
AccData
end
end.

%% @private
%% @doc Opposite of init.
terminate(_Reason, _StateName, _State) ->
ok.
67 changes: 66 additions & 1 deletion src/influxdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
query/3,
query/4,
write/2,
write/3
write/3,
write_async/2,
write_async/3,
get_batch_processing_fun/0
]).
-export_type([
config/0,
Expand Down Expand Up @@ -79,6 +82,29 @@ default_url_query(#{database := Database}) ->
default_url_query(#{}) ->
#{"epoch" => precision(nanosecond)}.

get_pool_name(Db) ->
AppName =
case application:get_application() of
{ok, App} -> App;
_ -> undefined
end,
AppPools = application:get_env(influxdb, app_pools, #{}),
case maps:get(AppName, AppPools, undefined) of
undefined ->
influxdb_pool;
AppSpec ->
case maps:get(influxdb_pool, AppSpec, undefined) of
undefined ->
influxdb_pool;
DbSpecMap ->
case maps:get(Db, DbSpecMap, undefined) of
undefined ->
influxdb_pool;
_ ->
list_to_atom(atom_to_list(AppName) ++ "_" ++ Db ++ "_influxdb_pool")
end
end
end.

precision(hour) -> "h";
precision(minute) -> "m";
Expand Down Expand Up @@ -121,3 +147,42 @@ write(#{host := Host, port := Port, username := Username, password := Password,
}),
Body = influxdb_line_encoding:encode(Measurements),
influxdb_http:post(write, Url, Username, Password, "application/octet-stream", Body, Timeout).

write_async(Config, Measurements) ->
write_async(Config, Measurements, #{}).

write_async(Config, Measurements, Options) ->
GetWorkerTimeout = maps:get(get_worker_timeout, Options, 5000),
Db = maps:get(database, Config, undefined),
AvailWorkers = gen_server:call(get_pool_name(Db), get_avail_workers, GetWorkerTimeout),
EncodedMeasurements = influxdb_line_encoding:encode(Measurements),
RandomWorkerIndex = rand:uniform(length(AvailWorkers)),
lists:nth(RandomWorkerIndex, AvailWorkers) ! {Config, EncodedMeasurements, Options}.

get_batch_processing_fun() ->
fun(Batch) ->
{Config, _Measurement, Options} = hd(Batch),
#{host := Host, port := Port, username := Username,
password := Password, database := Database} = Config,
Timeout = maps:get(timeout, Options, infinity),
Url = influxdb_uri:encode(#{
scheme => "http",
host => Host,
port => Port,
path => "/write",
query => maps:fold(fun
(precision, Value, Acc) -> maps:put("precision", precision(Value), Acc);
(retention_policy, Value, Acc) -> maps:put("rp", Value, Acc);
(_Key, _Value, Acc) -> Acc
end, #{"db" => Database}, Options)
}),
FinalBody =
lists:foldl(
fun({_Config, EncodedMeasurements, _Options}, BodyAcc) ->
EncodedMeasurements ++ BodyAcc
end,
[],
Batch),
influxdb_http:post(write, Url, Username, Password,
"application/octet-stream", FinalBody, Timeout)
end.
77 changes: 73 additions & 4 deletions src/influxdb_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,76 @@ start_link() ->

%% @hidden
init([]) ->
Flags = #{},
Children = [
],
{ok, {Flags, Children}}.
BatchProcessFun = influxdb:get_batch_processing_fun(),
Args = [{name, {local, influxdb_pool}},
{worker_module, batch_processor},
{size, 5}, {max_overflow, 10},
{batch_proc_fun, BatchProcessFun}],
PoolSpec = {influxdb_pool, {poolboy, start_link, [Args]},
permanent, 2000, worker, [poolboy]},
AppPools = application:get_env(influxdb, app_pools, #{}),
ExtraPoolSpec =
maps:fold(
fun(App, AppSpec, SpecAcc) ->
case maps:get(influxdb_pool, AppSpec, undefined) of
undefined -> SpecAcc;
SizeSpec ->
DbSpecs =
maps:fold(
fun(Db, DbSpec, DbSpecAcc) ->
PoolName = list_to_atom(atom_to_list(App) ++ "_" ++ Db ++ "_influxdb_pool"),
PoolArgs = [{name, {local, PoolName}},
{worker_module, batch_processor},
{size, 5}, {max_overflow, 10},
{batch_proc_fun, BatchProcessFun}] ++ DbSpec,
[{list_to_atom(atom_to_list(App) ++ "_" ++ Db ++ "_influxdb_pool"),
{poolboy, start_link, [PoolArgs]}, permanent, 2000, worker, [poolboy]} | DbSpecAcc]

end,
[],
SizeSpec
),
DbSpecs ++ SpecAcc
end
end,
[],
AppPools
),
RestartStrategy = {one_for_one, 10, 10},
{ok, {RestartStrategy, [PoolSpec | ExtraPoolSpec]}}.


-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

init_test_() ->
AppPools = #{
mhs => #{
influxdb_pool => #{
"gmc" => [
{size, 5},
{max_overflow, 5}
],
"GreyOrange" => [
{size, 5},
{max_overflow, 5}
]
}
},
pf => #{
influxdb_pool => #{
"GreyOrange" => [
{size, 5},
{max_overflow, 5}
]
}
}
},
application:set_env(influxdb, app_pools, AppPools),
{ok, {_, PoolSpecList}} = influxdb_sup:init([]),
TestResult1 = lists:keyfind('mhs_gmc_influxdb_pool', 1, PoolSpecList),
TestResult2 = lists:keyfind('pf_GreyOrange_influxdb_pool', 1, PoolSpecList),
?_assertEqual(true, TestResult1 =/= false),
?_assertEqual(true, TestResult2 =/= false).

-endif.