Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dialyzer_ignore.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[
# If we compile with another @storage_impl lib/kafka_batcher/temp_storage.ex:33 become reachable
{"lib/kafka_batcher/temp_storage.ex", :guard_fail, 30}
{"lib/kafka_batcher/temp_storage.ex", :guard_fail, 32}
]
51 changes: 28 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ A library to increase the throughput of producing messages (coming one at a time
def start(_type, _args) do
children = [
# Describe the child spec
KafkaBatcher.Supervisor
{KafkaBatcher.Supervisor, Application.fetch_env!(:kafka_batcher, :default_client)}
]

opts = [strategy: :one_for_one, name: MyApp.Supervisor, max_restarts: 3, max_seconds: 5]
Expand All @@ -36,32 +36,36 @@ A library to increase the throughput of producing messages (coming one at a time
Config example:

```elixir
config :kafka_batcher, KafkaBatcher.Collector1, topic_name: "topic1"
config :kafka_batcher, KafkaBatcher.Collector2, topic_name: "topic2"
config :kafka_batcher, KafkaBatcher.Collector3, topic_name: "topic3"
config :kafka_batcher, KafkaBatcher.Collector4, topic_name: "topic4"
config :kafka_batcher, KafkaBatcher.Collector5, topic_name: "topic5"

config :kafka_batcher, collectors:
[
KafkaBatcher.Collector1,
KafkaBatcher.Collector2,
KafkaBatcher.Collector3,
KafkaBatcher.Collector4,
KafkaBatcher.Collector5
]
config :kafka_batcher, :default_client, [
{KafkaBatcher.Collector1, [topic_name: "topic1"]},
{KafkaBatcher.Collector2, [topic_name: "topic2"]},
{KafkaBatcher.Collector3, [topic_name: "topic3"]},
{KafkaBatcher.Collector4, [topic_name: "topic4"]},
{KafkaBatcher.Collector5, [topic_name: "topic5"]}
]

config :kafka_batcher, :kafka,
endpoints: "localhost:9092",
# in case you use SASL
# sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"},
# ssl: true,
telemetry: true,
allow_topic_auto_creation: false,
config :kafka_batcher, :default_client,
collectors: [
KafkaBatcher.Collector1,
KafkaBatcher.Collector2,
KafkaBatcher.Collector3,
KafkaBatcher.Collector4,
KafkaBatcher.Collector5
],
kafka: [
client_name: :default_client,
endpoints: "localhost:9092",
# in case you use SASL
# sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"},
# ssl: true,
telemetry: true,
allow_topic_auto_creation: false
],
kafka_topic_aliases: %{
"real_topic_name1" => "incoming-events",
"real_topic_name2" => "special-topic"
}
},
kafka_metric_opts: []

# In case you use KafkaEx, you need to disable default worker to avoid crashes
config :kafka_ex, :disable_default_worker, true
Expand All @@ -88,6 +92,7 @@ Available parameters:
* `:ssl` - optional parameter. Ssl should be type boolean(). By default `:ssl` is `false`.
* `:min_delay` - optional parameter. Set minimal delay before send events. This parameter allows to increase max throughput in case when you get more messages (in term of count per second) than you expected when set `batch_size` parameter.
* `:max_batch_bytesize` - optional parameter. Allows to set a limit on the maximum batch size. By default it is 1_000_000 bytes.
* `:client_name` - client name that is used to start a producer. Included in metrics namespace. Defaults to `:kafka_producer_client`.

**Important:** The size of one message should not exceed `max_batch_bytesize` setting. If you need to work with large messages you must increase `max_batch_bytesize` value and value of Kafka topic setting `max.message.bytes` as well.

Expand Down
4 changes: 3 additions & 1 deletion lib/kafka_batcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ defmodule KafkaBatcher do
"""
defstruct key: "", value: "", headers: []

@type t :: %MessageObject{key: binary(), value: map() | binary(), headers: list()}
@type key :: binary()
@type value :: map() | binary()
@type t :: %MessageObject{key: key(), value: value(), headers: list()}
end
end
103 changes: 59 additions & 44 deletions lib/kafka_batcher/accumulator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@ defmodule KafkaBatcher.Accumulator do
See details how it works in KafkaBatcher.Accumulator.State module
"""

alias KafkaBatcher.{Accumulator.State, MessageObject, TempStorage}
alias KafkaBatcher.{
Accumulator,
Accumulator.State,
DataStreamSpec,
MessageObject,
Producers,
TempStorage
}

alias KafkaBatcher.Behaviours.Collector, as: CollectorBehaviour

@error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier)
Expand All @@ -14,25 +22,34 @@ defmodule KafkaBatcher.Accumulator do
use GenServer
require Logger

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: reg_name(args))
@spec start_link(DataStreamSpec.t()) :: GenServer.on_start()
def start_link(%DataStreamSpec{} = data_stream_spec) do
GenServer.start_link(
__MODULE__,
data_stream_spec,
name: reg_name(data_stream_spec)
)
end

@doc "Returns a specification to start this module under a supervisor"
def child_spec(args) do
{accumulator_mod, args} = Keyword.pop(args, :accumulator_mod, __MODULE__)

@spec child_spec(DataStreamSpec.t()) :: Supervisor.child_spec()
def child_spec(%DataStreamSpec{} = data_stream_spec) do
%{
id: reg_name(args),
start: {accumulator_mod, :start_link, [args]}
id: reg_name(data_stream_spec),
start: {
DataStreamSpec.get_accumulator_mod(data_stream_spec),
:start_link,
[data_stream_spec]
}
}
end

@doc """
Finds appropriate Accumulator process by topic & partition and dispatches `event` to it
"""
def add_event(%MessageObject{} = event, topic_name, partition \\ nil) do
GenServer.call(reg_name(topic_name: topic_name, partition: partition), {:add_event, event})
@spec add_event(MessageObject.t(), DataStreamSpec.t()) :: :ok | {:error, term()}
def add_event(%MessageObject{} = event, %DataStreamSpec{} = data_stream_spec) do
GenServer.call(reg_name(data_stream_spec), {:add_event, event})
catch
_, _reason ->
Logger.warning("KafkaBatcher: Couldn't get through to accumulator")
Expand All @@ -43,15 +60,17 @@ defmodule KafkaBatcher.Accumulator do
## Callbacks
##
@impl GenServer
def init(args) do
def init(%DataStreamSpec{} = data_stream_spec) do
Process.flag(:trap_exit, true)
state = build_state(args)

topic_name = DataStreamSpec.get_topic_name(data_stream_spec)
partition = DataStreamSpec.get_partition(data_stream_spec)

Logger.debug("""
KafkaBatcher: Accumulator process started: topic #{state.topic_name} partition #{state.partition} pid #{inspect(self())}
KafkaBatcher: Accumulator process started: topic #{topic_name} partition #{partition} pid #{inspect(self())}
""")

{:ok, state}
{:ok, %State{data_stream_spec: data_stream_spec}}
end

@impl GenServer
Expand All @@ -78,7 +97,8 @@ defmodule KafkaBatcher.Accumulator do
{:noreply, new_state}

{:error, _reason, new_state} ->
state.collector.set_lock()
collector_mod = DataStreamSpec.get_collector_mod(state.data_stream_spec)
collector_mod.set_lock()
{:noreply, new_state}
end
end
Expand All @@ -92,7 +112,7 @@ defmodule KafkaBatcher.Accumulator do
def handle_info(term, state) do
Logger.warning("""
KafkaBatcher: Unknown message #{inspect(term)} to #{__MODULE__}.handle_info/2.
Current state: #{inspect(drop_sensitive(state))}
Current state: #{inspect(state)}
""")

{:noreply, state}
Expand All @@ -104,12 +124,14 @@ defmodule KafkaBatcher.Accumulator do
end

@impl GenServer
def format_status(_reason, [pdict, state]) do
[pdict, drop_sensitive(state)]
end

defp drop_sensitive(%State{config: config} = state) do
%State{state | config: Keyword.drop(config, [:sasl])}
def format_status(_reason, [pdict, %State{} = state]) do
[
pdict,
%State{
state
| data_stream_spec: DataStreamSpec.drop_sensitive(state.data_stream_spec)
}
]
end

defp cleanup(%{pending_messages: [], messages_to_produce: []}) do
Expand All @@ -122,7 +144,11 @@ defmodule KafkaBatcher.Accumulator do
end

defp set_cleanup_timer_if_not_exists(%State{cleanup_timer_ref: nil} = state) do
ref = :erlang.start_timer(state.max_wait_time, self(), :cleanup)
%DataStreamSpec{
accumulator_config: %Accumulator.Config{max_wait_time: max_wait_time}
} = state.data_stream_spec

ref = :erlang.start_timer(max_wait_time, self(), :cleanup)
%State{state | cleanup_timer_ref: ref}
end

Expand Down Expand Up @@ -161,37 +187,26 @@ defmodule KafkaBatcher.Accumulator do

@spec produce_list(messages :: [CollectorBehaviour.event()], state :: State.t()) :: :ok | {:error, any()}
defp produce_list(messages, state) when is_list(messages) do
@producer.produce_list(messages, state.topic_name, state.partition, state.config)
@producer.produce_list(state.config, messages, state.topic_name, state.partition)
catch
_, reason ->
{:error, reason}
end

defp build_state(args) do
config = Keyword.fetch!(args, :config)

%State{
topic_name: Keyword.fetch!(args, :topic_name),
partition: Keyword.get(args, :partition),
config: config,
batch_flusher: Keyword.fetch!(config, :batch_flusher),
batch_size: Keyword.fetch!(config, :batch_size),
max_wait_time: Keyword.fetch!(config, :max_wait_time),
min_delay: Keyword.fetch!(config, :min_delay),
max_batch_bytesize: Keyword.fetch!(config, :max_batch_bytesize),
collector: Keyword.fetch!(args, :collector)
}
end
defp reg_name(%DataStreamSpec{} = data_stream_spec) do
%DataStreamSpec{
producer_config: %Producers.Config{client_name: client_name}
} = data_stream_spec

defp reg_name(args) do
topic_name = Keyword.fetch!(args, :topic_name)
topic_name = DataStreamSpec.get_topic_name(data_stream_spec)
partition = DataStreamSpec.get_partition(data_stream_spec)

case Keyword.get(args, :partition) do
case partition do
nil ->
:"#{__MODULE__}.#{topic_name}"
:"#{__MODULE__}.#{client_name}.#{topic_name}"

partition ->
:"#{__MODULE__}.#{topic_name}.#{partition}"
:"#{__MODULE__}.#{client_name}.#{topic_name}.#{partition}"
end
end
end
70 changes: 70 additions & 0 deletions lib/kafka_batcher/accumulator/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule KafkaBatcher.Accumulator.Config do
@moduledoc false
alias KafkaBatcher.Config.BadConfigError

@type t :: %__MODULE__{
collector_mod: module(),
topic_name: String.t(),
partition: pos_integer() | nil,
batch_flusher: module(),
max_wait_time: pos_integer(),
batch_size: pos_integer(),
min_delay: non_neg_integer(),
max_batch_bytesize: pos_integer(),
max_accumulator_restarts: pos_integer(),
accumulator_mod: module()
}

@enforce_keys [:collector_mod, :topic_name]
defstruct @enforce_keys ++
[
:partition,
batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher,
max_wait_time: 1_000,
batch_size: 10,
min_delay: 0,
max_batch_bytesize: 1_000_000,
max_accumulator_restarts: 100,
accumulator_mod: KafkaBatcher.Accumulator
]

@spec to_kwlist(t()) :: Keyword.t()
def to_kwlist(%__MODULE__{} = config) do
[
collector_mod: config.collector_mod,
topic_name: config.topic_name,
partition: config.partition,
batch_flusher: config.batch_flusher,
max_wait_time: config.max_wait_time,
batch_size: config.batch_size,
min_delay: config.min_delay,
max_batch_bytesize: config.max_batch_bytesize,
max_accumulator_restarts: config.max_accumulator_restarts,
accumulator_mod: config.accumulator_mod
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be just config |> Enum.into([])

end

@spec build!(opts :: Keyword.t()) :: t()
def build!(opts) do
opts
|> Keyword.take([
:collector_mod,
:topic_name,
:batch_flusher,
:max_wait_time,
:batch_size,
:min_delay,
:max_batch_bytesize,
:max_accumulator_restarts,
:accumulator_mod
])
|> then(&struct!(__MODULE__, &1))
rescue
ArgumentError ->
reraise(
BadConfigError,
"Accumulator config failed: missing required opts: #{inspect(@enforce_keys)}",
__STACKTRACE__
)
end
end
Loading