-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathbatch_processor.erl
More file actions
95 lines (79 loc) · 2.51 KB
/
batch_processor.erl
File metadata and controls
95 lines (79 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
%% ----------------------------------------------------------------------------------------
%% @author Hritik Soni <hritik.s@greyorange.sg>
%% @end
%% ----------------------------------------------------------------------------------------
-module(batch_processor).
-behaviour(gen_statem).
%% API
-export([
start_link/1
]).
%% gen_server callbacks
-export([init/1, callback_mode/0,
terminate/3]).
-export([ready/3]).
-record(state, {
batch_proc_fun = undefined,
flush_threshold
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Starts the server
start_link(Args) ->
gen_statem:start_link(?MODULE, Args, []).
%%%===================================================================
%%% gen_statem callbacks
%%%==================================================================
callback_mode() ->
state_functions.
%% @private
%% @doc Initializes the server
init(Args) ->
%% Don't store data in main heap to improve performance during congestion
process_flag(message_queue_data, off_heap),
BatchProcessFun = proplists:get_value(batch_proc_fun, Args),
FlushThreshold = proplists:get_value(flush_threshold, Args, 1000000),
{ok, ready, #state{batch_proc_fun = BatchProcessFun, flush_threshold = FlushThreshold}}.
ready(info, Data, State) ->
ProcFun = State#state.batch_proc_fun,
%% Use erlang:monotonic_time() to avoid time warps
StartTime = erlang:monotonic_time(millisecond),
NewData = receive_and_merge([Data], StartTime, State),
ProcFun(NewData),
keep_state_and_data.
drop_mq() ->
receive
_Data ->
drop_mq()
after 0 ->
ok
end.
flush_if_necessary(T) ->
{_, L} = process_info(self(), message_queue_len),
case L > T of
true -> drop_mq();
false -> ok
end.
receive_and_merge(AccData, _StartTime, State) when length(AccData) >= 500 ->
flush_if_necessary(State#state.flush_threshold),
AccData;
receive_and_merge(AccData, StartTime, State) ->
Time = erlang:monotonic_time(millisecond),
TimeLeft = 1000 - (Time - StartTime),
if
TimeLeft =< 0 ->
AccData;
true ->
receive
Data ->
receive_and_merge([Data | AccData], StartTime, State)
after
TimeLeft ->
AccData
end
end.
%% @private
%% @doc Opposite of init.
terminate(_Reason, _StateName, _State) ->
ok.