Skip to content

Commit 95df513

Browse files
author
Gleb Ivanov
committed
Extract PipelineUnit and move configs to separated structs
1 parent b415fa0 commit 95df513

File tree

24 files changed

+941
-549
lines changed

24 files changed

+941
-549
lines changed

.dialyzer_ignore.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[
22
# If we compile with another @storage_impl lib/kafka_batcher/temp_storage.ex:33 become reachable
3-
{"lib/kafka_batcher/temp_storage.ex", :guard_fail, 30}
3+
{"lib/kafka_batcher/temp_storage.ex", :guard_fail, 32}
44
]

lib/kafka_batcher.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ defmodule KafkaBatcher do
6767
"""
6868
defstruct key: "", value: "", headers: []
6969

70-
@type t :: %MessageObject{key: binary(), value: map() | binary(), headers: list()}
70+
@type key :: binary()
71+
@type value :: map() | binary()
72+
@type t :: %MessageObject{key: key(), value: value(), headers: list()}
7173
end
7274
end

lib/kafka_batcher/accumulator.ex

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,15 @@ defmodule KafkaBatcher.Accumulator do
55
See details how it works in KafkaBatcher.Accumulator.State module
66
"""
77

8-
alias KafkaBatcher.{Accumulator.State, MessageObject, TempStorage}
8+
alias KafkaBatcher.{
9+
Accumulator,
10+
Accumulator.State,
11+
MessageObject,
12+
PipelineUnit,
13+
Producers,
14+
TempStorage
15+
}
16+
917
alias KafkaBatcher.Behaviours.Collector, as: CollectorBehaviour
1018

1119
@error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier)
@@ -14,25 +22,34 @@ defmodule KafkaBatcher.Accumulator do
1422
use GenServer
1523
require Logger
1624

17-
def start_link(args) do
18-
GenServer.start_link(__MODULE__, args, name: reg_name(args))
25+
@spec start_link(PipelineUnit.t()) :: GenServer.on_start()
26+
def start_link(%PipelineUnit{} = pipeline_unit) do
27+
GenServer.start_link(
28+
__MODULE__,
29+
pipeline_unit,
30+
name: reg_name(pipeline_unit)
31+
)
1932
end
2033

2134
@doc "Returns a specification to start this module under a supervisor"
22-
def child_spec(args) do
23-
{accumulator_mod, args} = Keyword.pop(args, :accumulator_mod, __MODULE__)
24-
35+
@spec child_spec(PipelineUnit.t()) :: Supervisor.child_spec()
36+
def child_spec(%PipelineUnit{} = pipeline_unit) do
2537
%{
26-
id: reg_name(args),
27-
start: {accumulator_mod, :start_link, [args]}
38+
id: reg_name(pipeline_unit),
39+
start: {
40+
PipelineUnit.get_accumulator_mod(pipeline_unit),
41+
:start_link,
42+
[pipeline_unit]
43+
}
2844
}
2945
end
3046

3147
@doc """
3248
Finds appropriate Accumulator process by topic & partition and dispatches `event` to it
3349
"""
34-
def add_event(%MessageObject{} = event, topic_name, partition \\ nil) do
35-
GenServer.call(reg_name(topic_name: topic_name, partition: partition), {:add_event, event})
50+
@spec add_event(MessageObject.t(), PipelineUnit.t()) :: :ok | {:error, term()}
51+
def add_event(%MessageObject{} = event, %PipelineUnit{} = pipeline_unit) do
52+
GenServer.call(reg_name(pipeline_unit), {:add_event, event})
3653
catch
3754
_, _reason ->
3855
Logger.warning("KafkaBatcher: Couldn't get through to accumulator")
@@ -43,15 +60,17 @@ defmodule KafkaBatcher.Accumulator do
4360
## Callbacks
4461
##
4562
@impl GenServer
46-
def init(args) do
63+
def init(%PipelineUnit{} = pipeline_unit) do
4764
Process.flag(:trap_exit, true)
48-
state = build_state(args)
65+
66+
topic_name = PipelineUnit.get_topic_name(pipeline_unit)
67+
partition = PipelineUnit.get_partition(pipeline_unit)
4968

5069
Logger.debug("""
51-
KafkaBatcher: Accumulator process started: topic #{state.topic_name} partition #{state.partition} pid #{inspect(self())}
70+
KafkaBatcher: Accumulator process started: topic #{topic_name} partition #{partition} pid #{inspect(self())}
5271
""")
5372

54-
{:ok, state}
73+
{:ok, %State{pipeline_unit: pipeline_unit}}
5574
end
5675

5776
@impl GenServer
@@ -78,7 +97,7 @@ defmodule KafkaBatcher.Accumulator do
7897
{:noreply, new_state}
7998

8099
{:error, _reason, new_state} ->
81-
state.collector.set_lock()
100+
PipelineUnit.get_collector(state.pipeline_unit).set_lock()
82101
{:noreply, new_state}
83102
end
84103
end
@@ -92,7 +111,7 @@ defmodule KafkaBatcher.Accumulator do
92111
def handle_info(term, state) do
93112
Logger.warning("""
94113
KafkaBatcher: Unknown message #{inspect(term)} to #{__MODULE__}.handle_info/2.
95-
Current state: #{inspect(drop_sensitive(state))}
114+
Current state: #{inspect(state)}
96115
""")
97116

98117
{:noreply, state}
@@ -104,12 +123,14 @@ defmodule KafkaBatcher.Accumulator do
104123
end
105124

106125
@impl GenServer
107-
def format_status(_reason, [pdict, state]) do
108-
[pdict, drop_sensitive(state)]
109-
end
110-
111-
defp drop_sensitive(%State{config: config} = state) do
112-
%State{state | config: Keyword.drop(config, [:sasl])}
126+
def format_status(_reason, [pdict, %State{} = state]) do
127+
[
128+
pdict,
129+
%State{
130+
state
131+
| pipeline_unit: PipelineUnit.drop_sensitive(state.pipeline_unit)
132+
}
133+
]
113134
end
114135

115136
defp cleanup(%{pending_messages: [], messages_to_produce: []}) do
@@ -122,7 +143,11 @@ defmodule KafkaBatcher.Accumulator do
122143
end
123144

124145
defp set_cleanup_timer_if_not_exists(%State{cleanup_timer_ref: nil} = state) do
125-
ref = :erlang.start_timer(state.max_wait_time, self(), :cleanup)
146+
%PipelineUnit{
147+
accumulator_config: %Accumulator.Config{max_wait_time: max_wait_time}
148+
} = state.pipeline_unit
149+
150+
ref = :erlang.start_timer(max_wait_time, self(), :cleanup)
126151
%State{state | cleanup_timer_ref: ref}
127152
end
128153

@@ -161,37 +186,26 @@ defmodule KafkaBatcher.Accumulator do
161186

162187
@spec produce_list(messages :: [CollectorBehaviour.event()], state :: State.t()) :: :ok | {:error, any()}
163188
defp produce_list(messages, state) when is_list(messages) do
164-
@producer.produce_list(messages, state.topic_name, state.partition, state.config)
189+
@producer.produce_list(state.config, messages, state.topic_name, state.partition)
165190
catch
166191
_, reason ->
167192
{:error, reason}
168193
end
169194

170-
defp build_state(args) do
171-
config = Keyword.fetch!(args, :config)
172-
173-
%State{
174-
topic_name: Keyword.fetch!(args, :topic_name),
175-
partition: Keyword.get(args, :partition),
176-
config: config,
177-
batch_flusher: Keyword.fetch!(config, :batch_flusher),
178-
batch_size: Keyword.fetch!(config, :batch_size),
179-
max_wait_time: Keyword.fetch!(config, :max_wait_time),
180-
min_delay: Keyword.fetch!(config, :min_delay),
181-
max_batch_bytesize: Keyword.fetch!(config, :max_batch_bytesize),
182-
collector: Keyword.fetch!(args, :collector)
183-
}
184-
end
195+
defp reg_name(%PipelineUnit{} = pipeline_unit) do
196+
%PipelineUnit{
197+
producer_config: %Producers.Config{client_name: client_name}
198+
} = pipeline_unit
185199

186-
defp reg_name(args) do
187-
topic_name = Keyword.fetch!(args, :topic_name)
200+
topic_name = PipelineUnit.get_topic_name(pipeline_unit)
201+
partition = PipelineUnit.get_partition(pipeline_unit)
188202

189-
case Keyword.get(args, :partition) do
203+
case partition do
190204
nil ->
191-
:"#{__MODULE__}.#{topic_name}"
205+
:"#{__MODULE__}.#{client_name}.#{topic_name}"
192206

193207
partition ->
194-
:"#{__MODULE__}.#{topic_name}.#{partition}"
208+
:"#{__MODULE__}.#{client_name}.#{topic_name}.#{partition}"
195209
end
196210
end
197211
end
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
defmodule KafkaBatcher.Accumulator.Config do
2+
@moduledoc false
3+
4+
@type t :: %__MODULE__{
5+
collector: module(),
6+
topic_name: String.t(),
7+
partition: pos_integer() | nil,
8+
batch_flusher: module(),
9+
max_wait_time: pos_integer(),
10+
batch_size: pos_integer(),
11+
min_delay: non_neg_integer(),
12+
max_batch_bytesize: pos_integer(),
13+
max_accumulator_restarts: pos_integer(),
14+
accumulator_mod: module()
15+
}
16+
17+
@enforce_keys [:collector, :topic_name]
18+
defstruct @enforce_keys ++
19+
[
20+
:partition,
21+
batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher,
22+
max_wait_time: 1_000,
23+
batch_size: 10,
24+
min_delay: 0,
25+
max_batch_bytesize: 1_000_000,
26+
max_accumulator_restarts: 100,
27+
accumulator_mod: KafkaBatcher.Accumulator
28+
]
29+
30+
@spec to_kwlist(t()) :: Keyword.t()
31+
def to_kwlist(%__MODULE__{} = config) do
32+
[
33+
collector: config.collector,
34+
topic_name: config.topic_name,
35+
partition: config.partition,
36+
batch_flusher: config.batch_flusher,
37+
max_wait_time: config.max_wait_time,
38+
batch_size: config.batch_size,
39+
min_delay: config.min_delay,
40+
max_batch_bytesize: config.max_batch_bytesize,
41+
max_accumulator_restarts: config.max_accumulator_restarts,
42+
accumulator_mod: config.accumulator_mod
43+
]
44+
end
45+
46+
@spec build!(opts :: Keyword.t()) :: t()
47+
def build!(opts) do
48+
opts
49+
|> Keyword.take([
50+
:collector,
51+
:topic_name,
52+
:batch_flusher,
53+
:max_wait_time,
54+
:batch_size,
55+
:min_delay,
56+
:max_batch_bytesize,
57+
:max_accumulator_restarts,
58+
:accumulator_mod
59+
])
60+
|> then(&struct!(__MODULE__, &1))
61+
end
62+
end

lib/kafka_batcher/accumulator/state.ex

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,46 +11,37 @@ defmodule KafkaBatcher.Accumulator.State do
1111
* timer expired (in case when a few events arrived timer helps to control that the max waiting time is not exceeded)
1212
"""
1313

14-
alias KafkaBatcher.{Accumulator.State, MessageObject}
14+
alias KafkaBatcher.{
15+
Accumulator,
16+
Accumulator.State,
17+
MessageObject,
18+
PipelineUnit
19+
}
20+
1521
@error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier)
1622

1723
@type t :: %State{
18-
topic_name: binary(),
19-
partition: non_neg_integer() | nil,
20-
config: Keyword.t(),
24+
pipeline_unit: KafkaBatcher.PipelineUnit.t(),
2125
pending_messages: list(),
2226
last_produced_at: non_neg_integer(),
23-
batch_flusher: atom(),
24-
batch_size: non_neg_integer(),
25-
max_wait_time: non_neg_integer(),
26-
min_delay: non_neg_integer(),
27-
max_batch_bytesize: non_neg_integer(),
2827
batch_bytesize: non_neg_integer(),
2928
pending_messages_count: non_neg_integer(),
30-
producer_config: Keyword.t(),
3129
messages_to_produce: list(),
3230
cleanup_timer_ref: reference() | nil,
33-
status: atom(),
34-
collector: atom() | nil
31+
status: atom()
3532
}
3633

37-
defstruct topic_name: nil,
38-
partition: nil,
39-
config: [],
40-
pending_messages: [],
41-
last_produced_at: 0,
42-
batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher,
43-
batch_size: 0,
44-
max_wait_time: 0,
45-
min_delay: 0,
46-
max_batch_bytesize: 0,
47-
batch_bytesize: 0,
48-
pending_messages_count: 0,
49-
producer_config: [],
50-
messages_to_produce: [],
51-
cleanup_timer_ref: nil,
52-
status: :continue,
53-
collector: nil
34+
@enforce_keys [:pipeline_unit]
35+
defstruct @enforce_keys ++
36+
[
37+
pending_messages: [],
38+
last_produced_at: 0,
39+
batch_bytesize: 0,
40+
pending_messages_count: 0,
41+
messages_to_produce: [],
42+
cleanup_timer_ref: nil,
43+
status: :continue
44+
]
5445

5546
@spec add_new_message(State.t(), MessageObject.t(), non_neg_integer()) :: State.t()
5647
def add_new_message(%State{} = state, %MessageObject{key: key, value: value} = event, now) do
@@ -88,15 +79,22 @@ defmodule KafkaBatcher.Accumulator.State do
8879
end
8980

9081
defp consider_max_bytesize(%State{status: :continue, batch_bytesize: batch_bytesize} = state, new_message) do
82+
%PipelineUnit{
83+
accumulator_config: %Accumulator.Config{max_batch_bytesize: max_batch_bytesize}
84+
} = state.pipeline_unit
85+
86+
topic_name = PipelineUnit.get_topic_name(state.pipeline_unit)
87+
partition = PipelineUnit.get_partition(state.pipeline_unit)
88+
9189
message_size = :erlang.external_size(new_message)
9290

93-
case batch_bytesize + message_size >= state.max_batch_bytesize do
94-
true when message_size >= state.max_batch_bytesize ->
91+
case batch_bytesize + message_size >= max_batch_bytesize do
92+
true when message_size >= max_batch_bytesize ->
9593
@error_notifier.report(
9694
type: "KafkaBatcherProducerError",
9795
message: """
98-
event#produce topic=#{state.topic_name} partition=#{state.partition}.
99-
Message size #{inspect(message_size)} exceeds limit #{inspect(state.max_batch_bytesize)}
96+
event#produce topic=#{topic_name} partition=#{partition}.
97+
Message size #{inspect(message_size)} exceeds limit #{inspect(max_batch_bytesize)}
10098
"""
10199
)
102100

@@ -111,7 +109,11 @@ defmodule KafkaBatcher.Accumulator.State do
111109
end
112110

113111
defp consider_max_size_and_wait_time(%State{status: :continue} = state, now) do
114-
if state.pending_messages_count >= state.batch_size and now - state.last_produced_at >= state.min_delay do
112+
%PipelineUnit{
113+
accumulator_config: %Accumulator.Config{batch_size: batch_size, min_delay: min_delay}
114+
} = state.pipeline_unit
115+
116+
if state.pending_messages_count >= batch_size and now - state.last_produced_at >= min_delay do
115117
mark_as_ready(state)
116118
else
117119
state
@@ -121,7 +123,11 @@ defmodule KafkaBatcher.Accumulator.State do
121123
defp consider_max_size_and_wait_time(%State{status: :ready} = state, _), do: state
122124

123125
defp consider_istant_flush(%State{status: :continue} = state, key, value) do
124-
if state.batch_flusher.flush?(key, value) do
126+
%PipelineUnit{
127+
accumulator_config: %Accumulator.Config{batch_flusher: batch_flusher}
128+
} = state.pipeline_unit
129+
130+
if batch_flusher.flush?(key, value) do
125131
mark_as_ready(state)
126132
else
127133
state

0 commit comments

Comments
 (0)