From 99ea0a9f35a0e7c90d9af5b2a69fc7642dbcdb06 Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Wed, 24 Dec 2025 17:19:41 +0300 Subject: [PATCH 1/5] Extract PipelineUnit and move configs to separate structs --- .dialyzer_ignore.exs | 2 +- lib/kafka_batcher.ex | 4 +- lib/kafka_batcher/accumulator.ex | 102 +++--- lib/kafka_batcher/accumulator/config.ex | 62 ++++ lib/kafka_batcher/accumulator/state.ex | 76 ++-- .../accumulators_pool_supervisor.ex | 43 ++- lib/kafka_batcher/behaviours/producer.ex | 13 +- lib/kafka_batcher/collector.ex | 61 ++-- lib/kafka_batcher/collector/config.ex | 44 +++ lib/kafka_batcher/collector/implementation.ex | 78 ++-- lib/kafka_batcher/collector/state.ex | 71 ++-- lib/kafka_batcher/config.ex | 337 +++++------------- lib/kafka_batcher/connection_manager.ex | 97 +++-- lib/kafka_batcher/pipeline_unit.ex | 98 +++++ lib/kafka_batcher/pipeline_unit/validator.ex | 30 ++ lib/kafka_batcher/producers/base.ex | 40 +-- lib/kafka_batcher/producers/config.ex | 71 ++++ .../producers/config/brod_config.ex | 91 +++++ lib/kafka_batcher/producers/kaffe.ex | 35 +- lib/kafka_batcher/producers/kafka_ex.ex | 18 +- lib/kafka_batcher/prom_ex/plugins/kafka.ex | 42 ++- lib/kafka_batcher/supervisor.ex | 57 ++- lib/kafka_batcher/temp_storage.ex | 4 +- test/support/producers/test_producer.ex | 16 +- 24 files changed, 943 insertions(+), 549 deletions(-) create mode 100644 lib/kafka_batcher/accumulator/config.ex create mode 100644 lib/kafka_batcher/collector/config.ex create mode 100644 lib/kafka_batcher/pipeline_unit.ex create mode 100644 lib/kafka_batcher/pipeline_unit/validator.ex create mode 100644 lib/kafka_batcher/producers/config.ex create mode 100644 lib/kafka_batcher/producers/config/brod_config.ex diff --git a/.dialyzer_ignore.exs b/.dialyzer_ignore.exs index ae33082..59ab4be 100644 --- a/.dialyzer_ignore.exs +++ b/.dialyzer_ignore.exs @@ -1,4 +1,4 @@ [ # If we compile with another @storage_impl lib/kafka_batcher/temp_storage.ex:33 become reachable - {"lib/kafka_batcher/temp_storage.ex", :guard_fail, 30} + {"lib/kafka_batcher/temp_storage.ex", :guard_fail, 32} ] diff --git a/lib/kafka_batcher.ex b/lib/kafka_batcher.ex index 7d492d0..5d17b72 100644 --- a/lib/kafka_batcher.ex +++ b/lib/kafka_batcher.ex @@ -67,6 +67,8 @@ defmodule KafkaBatcher do """ defstruct key: "", value: "", headers: [] - @type t :: %MessageObject{key: binary(), value: map() | binary(), headers: list()} + @type key :: binary() + @type value :: map() | binary() + @type t :: %MessageObject{key: key(), value: value(), headers: list()} end end diff --git a/lib/kafka_batcher/accumulator.ex b/lib/kafka_batcher/accumulator.ex index ec3d5b8..bdc2f77 100644 --- a/lib/kafka_batcher/accumulator.ex +++ b/lib/kafka_batcher/accumulator.ex @@ -5,7 +5,15 @@ defmodule KafkaBatcher.Accumulator do See details how it works in KafkaBatcher.Accumulator.State module """ - alias KafkaBatcher.{Accumulator.State, MessageObject, TempStorage} + alias KafkaBatcher.{ + Accumulator, + Accumulator.State, + MessageObject, + PipelineUnit, + Producers, + TempStorage + } + alias KafkaBatcher.Behaviours.Collector, as: CollectorBehaviour @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @@ -14,25 +22,34 @@ defmodule KafkaBatcher.Accumulator do use GenServer require Logger - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: reg_name(args)) + @spec start_link(PipelineUnit.t()) :: GenServer.on_start() + def start_link(%PipelineUnit{} = pipeline_unit) do + GenServer.start_link( + __MODULE__, + pipeline_unit, + name: reg_name(pipeline_unit) + ) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(args) do - {accumulator_mod, args} = Keyword.pop(args, :accumulator_mod, __MODULE__) - + @spec child_spec(PipelineUnit.t()) :: Supervisor.child_spec() + def child_spec(%PipelineUnit{} = pipeline_unit) do %{ - id: reg_name(args), - start: {accumulator_mod, :start_link, [args]} + id: reg_name(pipeline_unit), + start: { + PipelineUnit.get_accumulator_mod(pipeline_unit), + :start_link, + [pipeline_unit] + } } end @doc """ Finds appropriate Accumulator process by topic & partition and dispatches `event` to it """ - def add_event(%MessageObject{} = event, topic_name, partition \\ nil) do - GenServer.call(reg_name(topic_name: topic_name, partition: partition), {:add_event, event}) + @spec add_event(MessageObject.t(), PipelineUnit.t()) :: :ok | {:error, term()} + def add_event(%MessageObject{} = event, %PipelineUnit{} = pipeline_unit) do + GenServer.call(reg_name(pipeline_unit), {:add_event, event}) catch _, _reason -> Logger.warning("KafkaBatcher: Couldn't get through to accumulator") @@ -43,15 +60,17 @@ defmodule KafkaBatcher.Accumulator do ## Callbacks ## @impl GenServer - def init(args) do + def init(%PipelineUnit{} = pipeline_unit) do Process.flag(:trap_exit, true) - state = build_state(args) + + topic_name = PipelineUnit.get_topic_name(pipeline_unit) + partition = PipelineUnit.get_partition(pipeline_unit) Logger.debug(""" - KafkaBatcher: Accumulator process started: topic #{state.topic_name} partition #{state.partition} pid #{inspect(self())} + KafkaBatcher: Accumulator process started: topic #{topic_name} partition #{partition} pid #{inspect(self())} """) - {:ok, state} + {:ok, %State{pipeline_unit: pipeline_unit}} end @impl GenServer @@ -78,7 +97,7 @@ defmodule KafkaBatcher.Accumulator do {:noreply, new_state} {:error, _reason, new_state} -> - state.collector.set_lock() + PipelineUnit.get_collector(state.pipeline_unit).set_lock() {:noreply, new_state} end end @@ -92,7 +111,7 @@ defmodule KafkaBatcher.Accumulator do def handle_info(term, state) do Logger.warning(""" KafkaBatcher: Unknown message #{inspect(term)} to #{__MODULE__}.handle_info/2. - Current state: #{inspect(drop_sensitive(state))} + Current state: #{inspect(state)} """) {:noreply, state} @@ -104,12 +123,14 @@ defmodule KafkaBatcher.Accumulator do end @impl GenServer - def format_status(_reason, [pdict, state]) do - [pdict, drop_sensitive(state)] - end - - defp drop_sensitive(%State{config: config} = state) do - %State{state | config: Keyword.drop(config, [:sasl])} + def format_status(_reason, [pdict, %State{} = state]) do + [ + pdict, + %State{ + state + | pipeline_unit: PipelineUnit.drop_sensitive(state.pipeline_unit) + } + ] end defp cleanup(%{pending_messages: [], messages_to_produce: []}) do @@ -122,7 +143,11 @@ defmodule KafkaBatcher.Accumulator do end defp set_cleanup_timer_if_not_exists(%State{cleanup_timer_ref: nil} = state) do - ref = :erlang.start_timer(state.max_wait_time, self(), :cleanup) + %PipelineUnit{ + accumulator_config: %Accumulator.Config{max_wait_time: max_wait_time} + } = state.pipeline_unit + + ref = :erlang.start_timer(max_wait_time, self(), :cleanup) %State{state | cleanup_timer_ref: ref} end @@ -161,37 +186,26 @@ defmodule KafkaBatcher.Accumulator do @spec produce_list(messages :: [CollectorBehaviour.event()], state :: State.t()) :: :ok | {:error, any()} defp produce_list(messages, state) when is_list(messages) do - @producer.produce_list(messages, state.topic_name, state.partition, state.config) + @producer.produce_list(state.config, messages, state.topic_name, state.partition) catch _, reason -> {:error, reason} end - defp build_state(args) do - config = Keyword.fetch!(args, :config) - - %State{ - topic_name: Keyword.fetch!(args, :topic_name), - partition: Keyword.get(args, :partition), - config: config, - batch_flusher: Keyword.fetch!(config, :batch_flusher), - batch_size: Keyword.fetch!(config, :batch_size), - max_wait_time: Keyword.fetch!(config, :max_wait_time), - min_delay: Keyword.fetch!(config, :min_delay), - max_batch_bytesize: Keyword.fetch!(config, :max_batch_bytesize), - collector: Keyword.fetch!(args, :collector) - } - end + defp reg_name(%PipelineUnit{} = pipeline_unit) do + %PipelineUnit{ + producer_config: %Producers.Config{client_name: client_name} + } = pipeline_unit - defp reg_name(args) do - topic_name = Keyword.fetch!(args, :topic_name) + topic_name = PipelineUnit.get_topic_name(pipeline_unit) + partition = PipelineUnit.get_partition(pipeline_unit) - case Keyword.get(args, :partition) do + case partition do nil -> - :"#{__MODULE__}.#{topic_name}" + :"#{__MODULE__}.#{client_name}.#{topic_name}" partition -> - :"#{__MODULE__}.#{topic_name}.#{partition}" + :"#{__MODULE__}.#{client_name}.#{topic_name}.#{partition}" end end end diff --git a/lib/kafka_batcher/accumulator/config.ex b/lib/kafka_batcher/accumulator/config.ex new file mode 100644 index 0000000..2b66333 --- /dev/null +++ b/lib/kafka_batcher/accumulator/config.ex @@ -0,0 +1,62 @@ +defmodule KafkaBatcher.Accumulator.Config do + @moduledoc false + + @type t :: %__MODULE__{ + collector: module(), + topic_name: String.t(), + partition: pos_integer() | nil, + batch_flusher: module(), + max_wait_time: pos_integer(), + batch_size: pos_integer(), + min_delay: non_neg_integer(), + max_batch_bytesize: pos_integer(), + max_accumulator_restarts: pos_integer(), + accumulator_mod: module() + } + + @enforce_keys [:collector, :topic_name] + defstruct @enforce_keys ++ + [ + :partition, + batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher, + max_wait_time: 1_000, + batch_size: 10, + min_delay: 0, + max_batch_bytesize: 1_000_000, + max_accumulator_restarts: 100, + accumulator_mod: KafkaBatcher.Accumulator + ] + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + collector: config.collector, + topic_name: config.topic_name, + partition: config.partition, + batch_flusher: config.batch_flusher, + max_wait_time: config.max_wait_time, + batch_size: config.batch_size, + min_delay: config.min_delay, + max_batch_bytesize: config.max_batch_bytesize, + max_accumulator_restarts: config.max_accumulator_restarts, + accumulator_mod: config.accumulator_mod + ] + end + + @spec build!(opts :: Keyword.t()) :: t() + def build!(opts) do + opts + |> Keyword.take([ + :collector, + :topic_name, + :batch_flusher, + :max_wait_time, + :batch_size, + :min_delay, + :max_batch_bytesize, + :max_accumulator_restarts, + :accumulator_mod + ]) + |> then(&struct!(__MODULE__, &1)) + end +end diff --git a/lib/kafka_batcher/accumulator/state.ex b/lib/kafka_batcher/accumulator/state.ex index f914e53..2b7b432 100644 --- a/lib/kafka_batcher/accumulator/state.ex +++ b/lib/kafka_batcher/accumulator/state.ex @@ -11,46 +11,37 @@ defmodule KafkaBatcher.Accumulator.State do * timer expired (in case when a few events arrived timer helps to control that the max waiting time is not exceeded) """ - alias KafkaBatcher.{Accumulator.State, MessageObject} + alias KafkaBatcher.{ + Accumulator, + Accumulator.State, + MessageObject, + PipelineUnit + } + @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @type t :: %State{ - topic_name: binary(), - partition: non_neg_integer() | nil, - config: Keyword.t(), + pipeline_unit: KafkaBatcher.PipelineUnit.t(), pending_messages: list(), last_produced_at: non_neg_integer(), - batch_flusher: atom(), - batch_size: non_neg_integer(), - max_wait_time: non_neg_integer(), - min_delay: non_neg_integer(), - max_batch_bytesize: non_neg_integer(), batch_bytesize: non_neg_integer(), pending_messages_count: non_neg_integer(), - producer_config: Keyword.t(), messages_to_produce: list(), cleanup_timer_ref: reference() | nil, - status: atom(), - collector: atom() | nil + status: atom() } - defstruct topic_name: nil, - partition: nil, - config: [], - pending_messages: [], - last_produced_at: 0, - batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher, - batch_size: 0, - max_wait_time: 0, - min_delay: 0, - max_batch_bytesize: 0, - batch_bytesize: 0, - pending_messages_count: 0, - producer_config: [], - messages_to_produce: [], - cleanup_timer_ref: nil, - status: :continue, - collector: nil + @enforce_keys [:pipeline_unit] + defstruct @enforce_keys ++ + [ + pending_messages: [], + last_produced_at: 0, + batch_bytesize: 0, + pending_messages_count: 0, + messages_to_produce: [], + cleanup_timer_ref: nil, + status: :continue + ] @spec add_new_message(State.t(), MessageObject.t(), non_neg_integer()) :: State.t() def add_new_message(%State{} = state, %MessageObject{key: key, value: value} = event, now) do @@ -88,15 +79,22 @@ defmodule KafkaBatcher.Accumulator.State do end defp consider_max_bytesize(%State{status: :continue, batch_bytesize: batch_bytesize} = state, new_message) do + %PipelineUnit{ + accumulator_config: %Accumulator.Config{max_batch_bytesize: max_batch_bytesize} + } = state.pipeline_unit + + topic_name = PipelineUnit.get_topic_name(state.pipeline_unit) + partition = PipelineUnit.get_partition(state.pipeline_unit) + message_size = :erlang.external_size(new_message) - case batch_bytesize + message_size >= state.max_batch_bytesize do - true when message_size >= state.max_batch_bytesize -> + case batch_bytesize + message_size >= max_batch_bytesize do + true when message_size >= max_batch_bytesize -> @error_notifier.report( type: "KafkaBatcherProducerError", message: """ - event#produce topic=#{state.topic_name} partition=#{state.partition}. - Message size #{inspect(message_size)} exceeds limit #{inspect(state.max_batch_bytesize)} + event#produce topic=#{topic_name} partition=#{partition}. + Message size #{inspect(message_size)} exceeds limit #{inspect(max_batch_bytesize)} """ ) @@ -111,7 +109,11 @@ defmodule KafkaBatcher.Accumulator.State do end defp consider_max_size_and_wait_time(%State{status: :continue} = state, now) do - if state.pending_messages_count >= state.batch_size and now - state.last_produced_at >= state.min_delay do + %PipelineUnit{ + accumulator_config: %Accumulator.Config{batch_size: batch_size, min_delay: min_delay} + } = state.pipeline_unit + + if state.pending_messages_count >= batch_size and now - state.last_produced_at >= min_delay do mark_as_ready(state) else state @@ -121,7 +123,11 @@ defmodule KafkaBatcher.Accumulator.State do defp consider_max_size_and_wait_time(%State{status: :ready} = state, _), do: state defp consider_istant_flush(%State{status: :continue} = state, key, value) do - if state.batch_flusher.flush?(key, value) do + %PipelineUnit{ + accumulator_config: %Accumulator.Config{batch_flusher: batch_flusher} + } = state.pipeline_unit + + if batch_flusher.flush?(key, value) do mark_as_ready(state) else state diff --git a/lib/kafka_batcher/accumulators_pool_supervisor.ex b/lib/kafka_batcher/accumulators_pool_supervisor.ex index 7c98a17..b8022b6 100644 --- a/lib/kafka_batcher/accumulators_pool_supervisor.ex +++ b/lib/kafka_batcher/accumulators_pool_supervisor.ex @@ -5,40 +5,59 @@ defmodule KafkaBatcher.AccumulatorsPoolSupervisor do use DynamicSupervisor - alias KafkaBatcher.Accumulator + alias KafkaBatcher.{Accumulator, PipelineUnit, Producers} @dialyzer {:no_return, {:init, 1}} - def start_link(config) do - DynamicSupervisor.start_link(__MODULE__, config, name: reg_name(config)) + def start_link(%PipelineUnit{} = pipeline_unit) do + DynamicSupervisor.start_link( + __MODULE__, + pipeline_unit, + name: reg_name(pipeline_unit) + ) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(config) do + def child_spec(%PipelineUnit{} = pipeline_unit) do %{ - id: reg_name(config), - start: {__MODULE__, :start_link, [config]}, + id: reg_name(pipeline_unit), + start: {__MODULE__, :start_link, [pipeline_unit]}, type: :supervisor } end - def init(config) do + def init(%PipelineUnit{} = pipeline_unit) do + %PipelineUnit{ + accumulator_config: %Accumulator.Config{ + max_accumulator_restarts: max_accumulator_restarts + } + } = pipeline_unit + # max_restarts value depends on partitions count in case when partitioned accumulation is used. # For example: 100 max_restarts -> 10 process restarts per second for 1 topic with 10 partitions DynamicSupervisor.init( strategy: :one_for_one, restart: :permanent, - max_restarts: Keyword.get(config, :max_restart, 100), + max_restarts: max_accumulator_restarts, max_seconds: 1, extra_arguments: [] ) end - def start_accumulator(args) do - DynamicSupervisor.start_child(reg_name(args), Accumulator.child_spec(args)) + def start_accumulator(%PipelineUnit{} = pipeline_unit) do + DynamicSupervisor.start_child( + reg_name(pipeline_unit), + Accumulator.child_spec(pipeline_unit) + ) end - def reg_name(args) do - :"#{__MODULE__}.#{Keyword.fetch!(args, :topic_name)}" + def reg_name(%PipelineUnit{} = pipeline_unit) do + %PipelineUnit{ + producer_config: %Producers.Config{client_name: client_name} + } = pipeline_unit + + topic_name = PipelineUnit.get_topic_name(pipeline_unit) + + :"#{__MODULE__}.#{client_name}.#{topic_name}" end end diff --git a/lib/kafka_batcher/behaviours/producer.ex b/lib/kafka_batcher/behaviours/producer.ex index 38d723e..75256d8 100644 --- a/lib/kafka_batcher/behaviours/producer.ex +++ b/lib/kafka_batcher/behaviours/producer.ex @@ -3,19 +3,22 @@ defmodule KafkaBatcher.Behaviours.Producer do KafkaBatcher.Behaviours.Producer adds an abstraction level over producer implementations in various Kafka libs. Defines the callbacks that a Kafka producer should implement """ + alias KafkaBatcher.Producers + @type event :: KafkaBatcher.MessageObject.t() @type events :: list(event()) @callback do_produce( + Producers.Config.t(), events :: events(), topic :: binary(), - partition :: non_neg_integer() | nil, - config :: Keyword.t() + partition :: non_neg_integer() | nil ) :: :ok | {:error, binary() | atom()} - @callback get_partitions_count(binary()) :: {:ok, integer()} | {:error, binary() | atom()} + @callback get_partitions_count(Producers.Config.t(), binary()) :: + {:ok, integer()} | {:error, binary() | atom()} - @callback start_client() :: {:ok, pid()} | {:error, any()} + @callback start_client(Producers.Config.t()) :: {:ok, pid()} | {:error, any()} - @callback start_producer(binary(), Keyword.t()) :: :ok | {:error, any()} + @callback start_producer(Producers.Config.t(), binary()) :: :ok | {:error, any()} end diff --git a/lib/kafka_batcher/collector.ex b/lib/kafka_batcher/collector.ex index 9b9ed4e..d3c65a7 100644 --- a/lib/kafka_batcher/collector.ex +++ b/lib/kafka_batcher/collector.ex @@ -43,24 +43,31 @@ defmodule KafkaBatcher.Collector do quote location: :keep, bind_quoted: [opts: opts] do use GenServer require Logger - alias KafkaBatcher.{AccumulatorsPoolSupervisor, Collector.State, TempStorage} + + alias KafkaBatcher.{ + AccumulatorsPoolSupervisor, + Collector, + Collector.State, + TempStorage + } @behaviour KafkaBatcher.Behaviours.Collector import KafkaBatcher.Collector.Implementation @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) - @compile_config KafkaBatcher.Config.build_topic_config(opts) + @compile_opts opts # Public API - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) + @spec start_link(KafkaBatcher.PipelineUnit.t()) :: GenServer.on_start() + def start_link(%KafkaBatcher.PipelineUnit{} = unit) do + GenServer.start_link(__MODULE__, unit, name: __MODULE__) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(config) do + def child_spec(%KafkaBatcher.PipelineUnit{} = unit) do %{ id: __MODULE__, - start: {__MODULE__, :start_link, [config]}, + start: {__MODULE__, :start_link, [unit]}, type: :worker } end @@ -90,20 +97,21 @@ defmodule KafkaBatcher.Collector do GenServer.call(__MODULE__, :get_config) end - def get_compile_config do - @compile_config + def get_compile_opts do + @compile_opts end # Callbacks @impl GenServer - def init(config) do + def init(%KafkaBatcher.PipelineUnit{} = pipeline_unit) do Process.flag(:trap_exit, true) - state = build_state(config) + topic_name = KafkaBatcher.PipelineUnit.get_topic_name(pipeline_unit) - Logger.debug("KafkaBatcher: Batch collector started: topic #{state.topic_name} pid #{inspect(self())}") + Logger.debug("KafkaBatcher: Batch collector started: topic #{topic_name} pid #{inspect(self())}") send(self(), :init_accumulators) - {:ok, state} + + {:ok, %State{pipeline_unit: pipeline_unit}} end @impl GenServer @@ -131,8 +139,8 @@ defmodule KafkaBatcher.Collector do end end - def handle_call(:get_config, _from, state) do - {:reply, state.config, state} + def handle_call(:get_config, _from, %State{} = state) do + {:reply, state.pipeline_unit, state} end def handle_call(unknown, _from, state) do @@ -144,7 +152,7 @@ defmodule KafkaBatcher.Collector do @impl GenServer def handle_info(:init_accumulators, state) do - new_state = store_partitions_count(state) + new_state = store_partition_count(state) case start_accumulators(new_state) do :ok -> @@ -175,25 +183,18 @@ defmodule KafkaBatcher.Collector do end @impl GenServer - def format_status(_reason, [pdict, state]) do - [pdict, drop_sensitive(state)] - end - - defp drop_sensitive(%State{config: config} = state) do - %State{state | config: Keyword.drop(config, [:sasl])} + def format_status(_reason, [pdict, %State{} = state]) do + [ + pdict, + %State{ + state + | pipeline_unit: PipelineUnit.drop_sensitive(state.pipeline_unit) + } + ] end # Private functions - defp build_state(config) do - %State{ - topic_name: Keyword.fetch!(config, :topic_name), - config: config, - collect_by_partition: Keyword.fetch!(config, :collect_by_partition), - collector: __MODULE__ - } - end - defp restart_timer(%State{timer_ref: ref}) when :erlang.is_reference(ref) do _ = :erlang.cancel_timer(ref) do_restart() diff --git a/lib/kafka_batcher/collector/config.ex b/lib/kafka_batcher/collector/config.ex new file mode 100644 index 0000000..c15905b --- /dev/null +++ b/lib/kafka_batcher/collector/config.ex @@ -0,0 +1,44 @@ +defmodule KafkaBatcher.Collector.Config do + @moduledoc false + + alias KafkaBatcher.MessageObject + + @typep topic :: String.t() + @typep partition_count :: pos_integer() + + @type partition_fn :: + (topic(), partition_count(), MessageObject.key(), MessageObject.value() -> + pos_integer()) + + @type t :: %__MODULE__{ + collector: module(), + topic_name: String.t(), + partition_fn: partition_fn() | nil, + collect_by_partition: boolean() + } + + @enforce_keys [:collector, :topic_name] + defstruct @enforce_keys ++ [:partition_fn, collect_by_partition: false] + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + collector: config.collector, + topic_name: config.topic_name, + partition_fn: config.partition_fn, + collect_by_partition: config.collect_by_partition + ] + end + + @spec build!(opts :: Keyword.t()) :: t() + def build!(opts) do + opts + |> Keyword.take([ + :collector, + :topic_name, + :partition_fn, + :collect_by_partition + ]) + |> then(&struct!(__MODULE__, &1)) + end +end diff --git a/lib/kafka_batcher/collector/implementation.ex b/lib/kafka_batcher/collector/implementation.ex index 639edf9..f703758 100644 --- a/lib/kafka_batcher/collector/implementation.ex +++ b/lib/kafka_batcher/collector/implementation.ex @@ -4,42 +4,61 @@ defmodule KafkaBatcher.Collector.Implementation do """ require Logger - alias KafkaBatcher.{AccumulatorsPoolSupervisor, Collector.State, MessageObject} + + alias KafkaBatcher.{ + AccumulatorsPoolSupervisor, + Collector, + Collector.State, + MessageObject, + PipelineUnit + } + @producer Application.compile_env(:kafka_batcher, :producer_module, KafkaBatcher.Producers.Kaffe) - def choose_partition(_message, _topic_name, _config, nil), do: {:error, :kafka_unavailable} + def choose_partition(_message, _pipeline_unit, nil), do: {:error, :kafka_unavailable} + + def choose_partition( + %MessageObject{key: key, value: value}, + %KafkaBatcher.PipelineUnit{} = pipeline_unit, + partitions_count + ) do + %Collector.Config{ + partition_fn: partition_fn, + topic_name: topic_name + } = pipeline_unit.collector_config - def choose_partition(%MessageObject{key: key, value: value}, topic_name, config, partitions_count) do - calc_partition_fn = Keyword.fetch!(config, :partition_fn) + partition = partition_fn.(topic_name, partitions_count, key, value) - partition = calc_partition_fn.(topic_name, partitions_count, key, value) {:ok, partition} end - def start_accumulators(%State{collect_by_partition: true, partitions_count: nil}) do - {:error, :kafka_unavailable} - end + def start_accumulators(%State{} = state) do + collect_by_partition? = + PipelineUnit.collect_by_partition?(state.pipeline_unit) - def start_accumulators(%State{collect_by_partition: true, partitions_count: count} = state) do - start_accumulators_by_partitions(count, state) - end + cond do + collect_by_partition? and is_nil(state.partitions_count) -> + {:error, :kafka_unavailable} - def start_accumulators(%State{topic_name: topic_name, config: config, collect_by_partition: false} = state) do - start_accumulator(topic_name: topic_name, config: config, collector: state.collector) - end + collect_by_partition? -> + start_accumulators_by_partitions( + state.pipeline_unit, + state.partitions_count + ) - defp start_accumulators_by_partitions(count, %State{} = state) do - opts = [ - topic_name: state.topic_name, - config: state.config, - collector: state.collector - ] + not collect_by_partition? -> + start_accumulator(state.pipeline_unit) + end + end + defp start_accumulators_by_partitions(pipeline_unit, count) do Enum.reduce_while( 0..(count - 1), :ok, fn partition, _ -> - case start_accumulator(Keyword.put(opts, :partition, partition)) do + pipeline_unit = PipelineUnit.set_partition(pipeline_unit, partition) + + case start_accumulator(pipeline_unit) do :ok -> {:cont, :ok} @@ -50,8 +69,8 @@ defmodule KafkaBatcher.Collector.Implementation do ) end - defp start_accumulator(args) do - case AccumulatorsPoolSupervisor.start_accumulator(args) do + defp start_accumulator(pipeline_unit) do + case AccumulatorsPoolSupervisor.start_accumulator(pipeline_unit) do {:ok, _} -> :ok @@ -60,7 +79,7 @@ defmodule KafkaBatcher.Collector.Implementation do {:error, reason} -> Logger.warning(""" - KafkaBatcher: Accumulator has failed to start with args: #{inspect(args)}. + KafkaBatcher: Accumulator has failed to start with args: #{inspect(pipeline_unit)}. Reason: #{inspect(reason)}} """) @@ -69,8 +88,15 @@ defmodule KafkaBatcher.Collector.Implementation do end @spec store_partition_count(State.t()) :: State.t() - def store_partitions_count(%State{partitions_count: nil} = state) do - case @producer.get_partitions_count(state.topic_name) do + def store_partition_count(%State{partitions_count: nil} = state) do + %State{ + pipeline_unit: %PipelineUnit{ + collector_config: %Collector.Config{topic_name: topic_name}, + producer_config: producer_config + } + } = state + + case @producer.get_partitions_count(producer_config, topic_name) do {:ok, partitions_count} -> %State{state | partitions_count: partitions_count} diff --git a/lib/kafka_batcher/collector/state.ex b/lib/kafka_batcher/collector/state.ex index 09fa107..7b77191 100644 --- a/lib/kafka_batcher/collector/state.ex +++ b/lib/kafka_batcher/collector/state.ex @@ -3,16 +3,20 @@ defmodule KafkaBatcher.Collector.State do Describes the state of KafkaBatcher.Collector and functions working with it """ - alias KafkaBatcher.{Accumulator, Collector, MessageObject, TempStorage} + alias KafkaBatcher.{ + Accumulator, + Collector, + MessageObject, + PipelineUnit, + TempStorage + } + alias KafkaBatcher.Collector.{State, Utils} require Logger @type t :: %State{ - topic_name: String.t() | nil, - config: Keyword.t(), - collect_by_partition: boolean(), - collector: atom() | nil, + pipeline_unit: PipelineUnit.t(), locked?: boolean(), last_check_timestamp: non_neg_integer() | nil, ready?: boolean(), @@ -20,17 +24,17 @@ defmodule KafkaBatcher.Collector.State do partitions_count: pos_integer() | nil } - defstruct topic_name: nil, - config: [], - collect_by_partition: true, - collector: nil, - # these fields are used to handle case when Kafka went down suddenly - locked?: false, - last_check_timestamp: nil, - # these fields are used to handle case when Kafka is not available at the start - ready?: false, - timer_ref: nil, - partitions_count: nil + @enforce_keys [:pipeline_unit] + defstruct @enforce_keys ++ + [ + # these fields are used to handle case when Kafka went down suddenly + locked?: false, + last_check_timestamp: nil, + # these fields are used to handle case when Kafka is not available at the start + ready?: false, + timer_ref: nil, + partitions_count: nil + ] @spec add_events(t(), [Utils.event()]) :: {:ok, t()} | {:error, term(), t()} def add_events(%State{} = state, events) do @@ -49,7 +53,7 @@ defmodule KafkaBatcher.Collector.State do |> Enum.reduce(:ok, fn %MessageObject{} = event, result -> case choose_partition(state, event) do {:ok, partition} when result == :ok -> - try_to_add_event(event, state.topic_name, partition) + try_to_add_event(state, event, partition) {:ok, partition} -> keep_failed_event(result, event, elem(result, 1), partition) @@ -60,8 +64,10 @@ defmodule KafkaBatcher.Collector.State do end) end - defp try_to_add_event(event, topic_name, partition) do - case Accumulator.add_event(event, topic_name, partition) do + defp try_to_add_event(%State{} = state, event, partition) do + pipeline_unit = PipelineUnit.set_partition(state.pipeline_unit, partition) + + case Accumulator.add_event(event, pipeline_unit) do :ok -> :ok {:error, reason} -> keep_failed_event(:ok, event, reason, partition) end @@ -84,17 +90,16 @@ defmodule KafkaBatcher.Collector.State do } end - defp choose_partition(%State{collect_by_partition: true} = state, event) do - Collector.Implementation.choose_partition( - event, - state.topic_name, - state.config, - state.partitions_count - ) - end - - defp choose_partition(%State{collect_by_partition: false}, _event) do - {:ok, nil} + defp choose_partition(%State{} = state, event) do + if PipelineUnit.collect_by_partition?(state.pipeline_unit) do + Collector.Implementation.choose_partition( + event, + state.pipeline_unit, + state.partitions_count + ) + else + {:ok, nil} + end end defp save_failed_events(:ok, _state), do: :ok @@ -103,12 +108,14 @@ defmodule KafkaBatcher.Collector.State do {:error, _reason, failed_event_batches} = result, %State{} = state ) do + %State{pipeline_unit: %PipelineUnit{} = pipeline_unit} = state + for {partition, failed_events} <- failed_event_batches do TempStorage.save_batch(%TempStorage.Batch{ messages: Enum.reverse(failed_events), - topic: state.topic_name, + topic: PipelineUnit.get_topic_name(pipeline_unit), partition: partition, - producer_config: state.config + producer_config: pipeline_unit.opts }) end diff --git a/lib/kafka_batcher/config.ex b/lib/kafka_batcher/config.ex index f7aee01..dd96eb2 100644 --- a/lib/kafka_batcher/config.ex +++ b/lib/kafka_batcher/config.ex @@ -5,14 +5,6 @@ defmodule KafkaBatcher.Config do Examples of configs can be found in the files config/test.exs and test/support/collectors/collector_handlers.ex """ - defmodule SASLConfigError do - defexception [:message] - - def exception(_term) do - %SASLConfigError{message: "Sasl config error"} - end - end - defmodule CollectorMissingError do defexception [:message] @@ -29,259 +21,98 @@ defmodule KafkaBatcher.Config do end end - @type sasl_mechanism() :: :plain | :scram_sha_256 | :scram_sha_512 - @type sasl_type() :: {sasl_mechanism(), binary(), binary()} | :undefined - - @spec collectors_spec() :: [:supervisor.child_spec()] - def collectors_spec do - collector_configs = get_configs_by_collector!() - - children_specs = - Enum.reduce( - collector_configs, - [], - fn {collector, config}, all_children -> - collector_spec = collector.child_spec(config) - accum_sup_spec = KafkaBatcher.AccumulatorsPoolSupervisor.child_spec(config) - - [collector_spec, accum_sup_spec | all_children] - end - ) - - conn_manager_spec = KafkaBatcher.ConnectionManager.child_spec() - Enum.reverse([conn_manager_spec | children_specs]) - end - - @spec general_producer_config() :: Keyword.t() - def general_producer_config do - Application.get_env(:kafka_batcher, :kafka, []) - |> Keyword.take(allowed_producer_keys()) - |> set_endpoints() - |> set_sasl() - |> set_ssl() - |> then(fn config -> Keyword.merge(default_producer_config(), config) end) - end - - @doc """ - Return all configured topics with its config. - """ - @spec get_configs_by_topic_name() :: list({binary(), Keyword.t()}) - def get_configs_by_topic_name do - get_configs_by_collector!() - |> Enum.map(fn {_, config} -> - {Keyword.fetch!(config, :topic_name), config} - end) - |> Enum.into(%{}) - end - - @spec get_configs_by_collector!() :: list({atom(), Keyword.t()}) - def get_configs_by_collector! do - Enum.map(fetch_runtime_configs(), fn {collector, runtime_config} -> - config = - general_producer_config() - |> Keyword.merge(get_compile_config!(collector)) - |> Keyword.merge(runtime_config) - - case validate_config!({collector, config}) do - :ok -> - {collector, config} - - {:error, reasons} -> - raise(KafkaBatcher.Config.BadConfigError, "Collector config failed: #{inspect(reasons)}") + alias KafkaBatcher.{ + Accumulator, + Collector, + PipelineUnit.Validator, + Producers + } + + @type t :: %__MODULE__{ + producer_config: Producers.Config.t(), + pipeline_units: [KafkaBatcher.PipelineUnit.t()], + kafka_topic_aliases: %{optional(binary()) => binary()}, + kafka_metric_opts: Keyword.t() + } + + @enforce_keys [ + :producer_config, + :pipeline_units, + :kafka_topic_aliases, + :kafka_metric_opts + ] + defstruct @enforce_keys + + @spec get_client_name(t()) :: atom() + def get_client_name(%__MODULE__{} = config) do + %__MODULE__{ + producer_config: %Producers.Config{client_name: client_name} + } = config + + client_name + end + + @spec build_config!(opts :: Keyword.t()) :: t() + def build_config!(opts) do + producer_config = + opts + |> Keyword.fetch!(:kafka) + |> Producers.Config.build!() + + pipeline_units = + for collector <- Keyword.fetch!(opts, :collectors) do + producer_config + |> build_pipeline_unit!(collector, opts) + |> validate_pipeline_unit!() end - end) - end - @spec get_collector_config(topic_name :: binary()) :: Keyword.t() - def get_collector_config(topic_name) do - case get_configs_by_topic_name()[topic_name] do - nil -> general_producer_config() - config -> config + %__MODULE__{ + producer_config: producer_config, + pipeline_units: pipeline_units, + kafka_topic_aliases: Keyword.get(opts, :kafka_topic_aliases, %{}), + kafka_metric_opts: Keyword.get(opts, :kafka_metric_opts, []) + } + end + + defp build_pipeline_unit!(producer_config, collector, opts) do + opts = + opts + |> Keyword.merge(get_compile_opts!(collector)) + |> Keyword.merge(Keyword.get(opts, collector, [])) + |> Keyword.put(:collector, collector) + + collector_config = Collector.Config.build!(opts) + accumulator_config = Accumulator.Config.build!(opts) + + %KafkaBatcher.PipelineUnit{ + collector_config: collector_config, + accumulator_config: accumulator_config, + producer_config: producer_config, + opts: + collector_config + |> Collector.Config.to_kwlist() + |> Keyword.merge(Accumulator.Config.to_kwlist(accumulator_config)) + |> Keyword.merge(kafka: Producers.Config.to_kwlist(producer_config)) + } + end + + defp validate_pipeline_unit!(pipeline_unit) do + case Validator.validate(pipeline_unit) do + :ok -> + pipeline_unit + + {:error, reason} -> + raise(BadConfigError, "Collector config failed: #{inspect(reason)}") end end - @spec build_topic_config(opts :: Keyword.t()) :: Keyword.t() - def build_topic_config(opts) do - default_config = default_producer_config() - - allowed_producer_keys() - |> Enum.map(fn key -> {key, Keyword.get(opts, key) || Keyword.get(default_config, key)} end) - |> Enum.filter(fn {_, value} -> value != nil end) - end - - @spec get_endpoints :: list({binary(), non_neg_integer()}) - def get_endpoints do - Application.get_env(:kafka_batcher, :kafka, []) - |> get_endpoints() - end - - @spec get_endpoints(config :: Keyword.t()) :: list({binary(), non_neg_integer()}) - def get_endpoints(config) do - Keyword.fetch!(config, :endpoints) - |> parse_endpoints() - end - - defp parse_endpoints(endpoints) do - endpoints |> String.split(",") |> Enum.map(&parse_endpoint/1) - end - - defp parse_endpoint(url) do - [host, port] = String.split(url, ":") - {host, :erlang.binary_to_integer(port)} - end - - defp validate_config!({collector, config}) do - required_keys() - |> Enum.reduce( - [], - fn - key, acc when is_atom(key) -> - case Keyword.has_key?(config, key) do - true -> - acc - - false -> - ["collector #{inspect(collector)}. Not found required key #{inspect(key)}" | acc] - end - - %{cond: condition, keys: keys}, acc -> - case check_conditions?(condition, config) do - true -> - check_keys(keys, config, collector, acc) - - false -> - acc - end - end - ) - |> case do - [] -> :ok - reasons -> {:error, reasons} - end - end - - defp check_keys(keys, config, collector, acc) do - case keys -- Keyword.keys(config) do - [] -> - acc - - fields -> - ["collector #{inspect(collector)}. Not found required keys #{inspect(fields)}" | acc] - end - end - - defp check_conditions?(cond, config) do - Enum.all?( - cond, - fn {key, value} -> - Keyword.get(config, key) == value - end - ) - end - - defp required_keys do - [ - :topic_name, - %{cond: [collect_by_partition: true], keys: [:partition_fn]}, - %{cond: [collect_by_partition: false], keys: [:partition_strategy]} - ] - end - - defp set_endpoints(config) do - Keyword.put(config, :endpoints, get_endpoints(config)) - end - - defp set_sasl(config) do - new_sasl = - case validate_sasl_config!(Keyword.get(config, :sasl)) do - {:ok, sasl_config_tuple} -> sasl_config_tuple - _ -> :undefined - end - - Keyword.put(config, :sasl, new_sasl) - end - - defp allowed_producer_keys do - keys = - default_producer_config() - |> Keyword.keys() - - [[:endpoints, :partition_fn, :topic_name, :ssl, :sasl] | keys] - |> List.flatten() - end - - @spec validate_sasl_config!(map() | nil) :: {:ok, sasl_type()} | {:error, :is_not_set} | no_return() - defp validate_sasl_config!(%{mechanism: mechanism, login: login, password: password} = config) do - valid_mechanism = mechanism in [:plain, :scram_sha_256, :scram_sha_512] - - if valid_mechanism && password != nil && login != nil do - {:ok, {mechanism, login, password}} - else - raise(KafkaBatcher.Config.SASLConfigError, "SASL config failed: #{inspect(config)}") - end - end - - defp validate_sasl_config!(sasl_config) when sasl_config == nil or sasl_config == %{} do - {:error, :is_not_set} - end - - defp validate_sasl_config!(bad_sasl_config) do - raise(KafkaBatcher.Config.SASLConfigError, "SASL config failed: #{inspect(bad_sasl_config)}") - end - - defp set_ssl(config) do - Keyword.put(config, :ssl, get_ssl(config)) - end - - defp get_ssl(config) do - Keyword.get(config, :ssl, false) - end - - defp get_compile_config!(module) do + defp get_compile_opts!(module) do with {:module, ^module} <- Code.ensure_compiled(module), - {:ok, [_ | _] = config} <- {:ok, module.get_compile_config()} do - config + {:ok, [_ | _] = opts} <- {:ok, module.get_compile_opts()} do + opts else _ -> - raise(KafkaBatcher.Config.CollectorMissingError, "Collector: #{inspect(module)} missing") + raise(CollectorMissingError, "Collector: #{inspect(module)} missing") end end - - defp fetch_runtime_configs do - Application.get_env(:kafka_batcher, :collectors) - |> Enum.map(&fetch_runtime_config/1) - end - - defp fetch_runtime_config(collector_name) do - case Application.fetch_env(:kafka_batcher, collector_name) do - {:ok, config} -> - {collector_name, config} - - _ -> - {collector_name, []} - end - end - - defp default_producer_config do - [ - ## brod producer parameters - ## https://github.com/kafka4beam/brod/blob/master/src/brod_producer.erl - allow_topic_auto_creation: false, - partition_strategy: :random, - required_acks: -1, - ## KafkaBatcher parameters - ## specified start pool processes for collection events by partitions - collect_by_partition: false, - ## send metric values to prom_ex application - telemetry: true, - # This module implements logic for force pushing current batch to Kafka, - # without waiting for other conditions (on size and/or interval). - batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher, - # These parameters are required for the collector - max_wait_time: 1_000, - batch_size: 10, - min_delay: 0, - max_batch_bytesize: 1_000_000 - ] - end end diff --git a/lib/kafka_batcher/connection_manager.ex b/lib/kafka_batcher/connection_manager.ex index ed0cf83..7a654cd 100644 --- a/lib/kafka_batcher/connection_manager.ex +++ b/lib/kafka_batcher/connection_manager.ex @@ -9,11 +9,18 @@ defmodule KafkaBatcher.ConnectionManager do defmodule State do @moduledoc "State of ConnectionManager process" - defstruct client_started: false, client_pid: nil + @type t :: %State{ + config: KafkaBatcher.Config.t(), + client_started: boolean(), + client_pid: nil | pid() + } - @type t :: %State{client_started: boolean(), client_pid: nil | pid()} + @enforce_keys [:config] + defstruct @enforce_keys ++ [client_started: false, client_pid: nil] end + alias KafkaBatcher.Producers + @producer Application.compile_env(:kafka_batcher, :producer_module, KafkaBatcher.Producers.Kaffe) @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @reconnect_timeout Application.compile_env(:kafka_batcher, :reconnect_timeout, 5_000) @@ -21,25 +28,29 @@ defmodule KafkaBatcher.ConnectionManager do use GenServer # Public API - @spec start_link() :: :ignore | {:error, any()} | {:ok, pid()} - def start_link do - GenServer.start_link(__MODULE__, [], name: __MODULE__) + @spec start_link(KafkaBatcher.Config.t()) :: GenServer.on_start() + def start_link(%KafkaBatcher.Config{} = config) do + GenServer.start_link( + __MODULE__, + [config], + name: reg_name(config.producer_config) + ) end @doc "Returns a specification to start this module under a supervisor" - @spec child_spec(nil) :: map() - def child_spec(_ \\ nil) do + @spec child_spec(KafkaBatcher.Config.t()) :: Supervisor.child_spec() + def child_spec(%KafkaBatcher.Config{} = config) do %{ - id: __MODULE__, - start: {__MODULE__, :start_link, []}, + id: reg_name(config.producer_config), + start: {__MODULE__, :start_link, [config]}, type: :worker } end @doc "Checks that Kafka client is already started" - @spec client_started?() :: boolean() - def client_started? do - GenServer.call(__MODULE__, :client_started?) + @spec client_started?(Producers.Config.t()) :: boolean() + def client_started?(%Producers.Config{} = producer_config) do + GenServer.call(reg_name(producer_config), :client_started?) end ## @@ -47,9 +58,9 @@ defmodule KafkaBatcher.ConnectionManager do ## @impl GenServer - def init(_opts) do + def init(config) do Process.flag(:trap_exit, true) - {:ok, %State{}, {:continue, :start_client}} + {:ok, %State{config: config}, {:continue, :start_client}} end @impl GenServer @@ -110,7 +121,7 @@ defmodule KafkaBatcher.ConnectionManager do ## defp connect(state) do - case prepare_connection() do + case prepare_connection(state) do {:ok, pid} -> %State{state | client_started: true, client_pid: pid} @@ -120,29 +131,33 @@ defmodule KafkaBatcher.ConnectionManager do end end - defp start_producers do - KafkaBatcher.Config.get_configs_by_topic_name() - |> Enum.reduce_while(:ok, fn {topic_name, config}, _ -> - case @producer.start_producer(topic_name, config) do - :ok -> - {:cont, :ok} + defp start_producers(%State{config: %KafkaBatcher.Config{} = config}) do + config.pipeline_units + |> Enum.map(&KafkaBatcher.PipelineUnit.get_topic_name/1) + |> Enum.reduce_while( + :ok, + fn topic_name, _ -> + case @producer.start_producer(config.producer_config, topic_name) do + :ok -> + {:cont, :ok} - {:error, reason} -> - @error_notifier.report( - type: "KafkaBatcherProducerStartFailed", - message: "Topic: #{topic_name}. Reason #{inspect(reason)}" - ) + {:error, reason} -> + @error_notifier.report( + type: "KafkaBatcherProducerStartFailed", + message: "Topic: #{topic_name}. Reason #{inspect(reason)}" + ) - {:halt, :error} + {:halt, :error} + end end - end) + ) end - defp prepare_connection do - case start_client() do - {:ok, pid} -> - case start_producers() do - :ok -> {:ok, pid} + defp prepare_connection(state) do + case start_client(state) do + {:ok, _pid} = ok -> + case start_producers(state) do + :ok -> ok :error -> :retry end @@ -156,10 +171,14 @@ defmodule KafkaBatcher.ConnectionManager do end end - defp start_client do - case @producer.start_client() do - {:ok, pid} -> - {:ok, pid} + defp start_client(%State{} = state) do + %KafkaBatcher.Config{ + producer_config: producer_config + } = state.config + + case @producer.start_client(producer_config) do + {:ok, _pid} = ok -> + ok {:error, {:already_started, pid}} -> Logger.debug("KafkaBatcher: Kafka client already started: #{inspect(pid)}") @@ -169,4 +188,8 @@ defmodule KafkaBatcher.ConnectionManager do error end end + + defp reg_name(%Producers.Config{} = producer_config) do + :"#{__MODULE__}.#{producer_config.client_name}" + end end diff --git a/lib/kafka_batcher/pipeline_unit.ex b/lib/kafka_batcher/pipeline_unit.ex new file mode 100644 index 0000000..763d034 --- /dev/null +++ b/lib/kafka_batcher/pipeline_unit.ex @@ -0,0 +1,98 @@ +defmodule KafkaBatcher.PipelineUnit do + @moduledoc false + + alias KafkaBatcher.{ + Accumulator, + Collector, + Producers + } + + @type t :: %__MODULE__{ + collector_config: Collector.Config.t(), + accumulator_config: Accumulator.Config.t(), + producer_config: Producers.Config.t(), + # opts are kept for backwards compatibility + opts: Keyword.t() + } + + @derive { + Inspect, + only: [:collector_config, :accumulator_config, :producer_config] + } + + @enforce_keys [ + :collector_config, + :accumulator_config, + :producer_config, + :opts + ] + defstruct @enforce_keys + + @spec drop_sensitive(t()) :: t() + def drop_sensitive(%__MODULE__{} = pipeline_unit) do + %__MODULE__{ + collector_config: pipeline_unit.collector_config, + accumulator_config: pipeline_unit.accumulator_config, + producer_config: Producers.Config.drop_sensitive(pipeline_unit.producer_config), + opts: [] + } + end + + @spec get_accumulator_mod(t()) :: module() + def get_accumulator_mod(%__MODULE__{} = pipeline_unit) do + %__MODULE__{ + accumulator_config: %Accumulator.Config{accumulator_mod: accumulator_mod} + } = pipeline_unit + + accumulator_mod + end + + @spec get_partition(t()) :: pos_integer() | nil + def get_partition(%__MODULE__{} = pipeline_unit) do + %__MODULE__{ + accumulator_config: %Accumulator.Config{partition: partition} + } = pipeline_unit + + partition + end + + @spec set_partition(t(), pos_integer()) :: t() + def set_partition(%__MODULE__{} = pipeline_unit, partition) do + %__MODULE__{ + pipeline_unit + | accumulator_config: %Accumulator.Config{ + pipeline_unit.accumulator_config + | partition: partition + } + } + end + + @spec get_topic_name(t()) :: String.t() + def get_topic_name(%__MODULE__{} = pipeline_unit) do + %__MODULE__{ + collector_config: %Collector.Config{topic_name: topic_name} + } = pipeline_unit + + topic_name + end + + @spec get_collector(t()) :: module() + def get_collector(%__MODULE__{} = pipeline_unit) do + %__MODULE__{ + collector_config: %Collector.Config{collector: collector} + } = pipeline_unit + + collector + end + + @spec collect_by_partition?(t()) :: boolean() + def collect_by_partition?(%__MODULE__{} = pipeline_unit) do + %__MODULE__{ + collector_config: %Collector.Config{ + collect_by_partition: collect_by_partition + } + } = pipeline_unit + + collect_by_partition + end +end diff --git a/lib/kafka_batcher/pipeline_unit/validator.ex b/lib/kafka_batcher/pipeline_unit/validator.ex new file mode 100644 index 0000000..74b605b --- /dev/null +++ b/lib/kafka_batcher/pipeline_unit/validator.ex @@ -0,0 +1,30 @@ +defmodule KafkaBatcher.PipelineUnit.Validator do + @moduledoc false + + alias KafkaBatcher.{Collector, PipelineUnit, Producers} + + @spec validate(PipelineUnit.t()) :: :ok | {:error, String.t()} + def validate(%PipelineUnit{} = pipeline_unit) do + %PipelineUnit{ + collector_config: %Collector.Config{ + collector: collector, + partition_fn: partition_fn, + collect_by_partition: collect_by_partition + }, + producer_config: %Producers.Config{ + partition_strategy: partition_strategy + } + } = pipeline_unit + + cond do + collect_by_partition and is_nil(partition_fn) -> + {:error, "collector #{inspect(collector)}. Not found required key :partition_fn"} + + not collect_by_partition and is_nil(partition_strategy) -> + {:error, "collector #{inspect(collector)}. Not found required key :partition_strategy"} + + :otherwise -> + :ok + end + end +end diff --git a/lib/kafka_batcher/producers/base.ex b/lib/kafka_batcher/producers/base.ex index eef1165..bd3f058 100644 --- a/lib/kafka_batcher/producers/base.ex +++ b/lib/kafka_batcher/producers/base.ex @@ -5,14 +5,17 @@ defmodule KafkaBatcher.Producers.Base do defmacro __using__(opts) do quote location: :keep, bind_quoted: [opts: opts] do + alias KafkaBatcher.Producers + @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) require Logger - def produce_list(messages, topic, nil, config) when is_list(messages) and is_binary(topic) and is_list(config) do - with {:ok, partitions_count} <- get_partitions_count(topic), - grouped_messages <- group_messages(messages, topic, partitions_count, partition_strategy_from(config)), - :ok <- produce_list_to_topic(grouped_messages, topic, config) do + def produce_list(%Producers.Config{} = config, messages, topic, nil) when is_list(messages) and is_binary(topic) do + with {:ok, partitions_count} <- get_partitions_count(config, topic), + partition_strategy = config.partition_strategy || :random, + grouped_messages <- group_messages(messages, topic, partitions_count, partition_strategy), + :ok <- produce_list_to_topic(config, grouped_messages, topic) do :ok else error -> @@ -29,16 +32,16 @@ defmodule KafkaBatcher.Producers.Base do {:error, :failed_push_to_kafka} end - def produce_list(messages, topic, partition, config) - when is_list(messages) and is_binary(topic) and is_list(config) and is_integer(partition) do - produce_list_to_topic(%{partition => messages}, topic, config) + def produce_list(%Producers.Config{} = config, messages, topic, partition) + when is_list(messages) and is_binary(topic) and is_integer(partition) do + produce_list_to_topic(config, %{partition => messages}, topic) rescue err -> @error_notifier.report(err, stacktrace: __STACKTRACE__) {:error, :failed_push_to_kafka} end - def produce_list(messages, topic, partition, config) do + def produce_list(%Producers.Config{} = config, messages, topic, partition) do @error_notifier.report( type: "KafkaBatcherProducerError", message: """ @@ -50,15 +53,15 @@ defmodule KafkaBatcher.Producers.Base do {:error, :internal_error} end - defp produce_list_to_topic(message_list, topic, config) do + defp produce_list_to_topic(%Producers.Config{} = config, message_list, topic) do message_list |> Enum.reduce_while(:ok, fn {partition, messages}, :ok -> Logger.debug("KafkaBatcher: event#produce_list_to_topic topic=#{topic} partition=#{partition}") start_time = System.monotonic_time() - case __MODULE__.do_produce(messages, topic, partition, config) do + case __MODULE__.do_produce(config, messages, topic, partition) do :ok -> - push_metrics(start_time, topic, partition, messages, telemetry_on?(config)) + push_metrics(start_time, topic, partition, messages, config.telemetry) {:cont, :ok} {:error, _reason} = error -> @@ -82,10 +85,6 @@ defmodule KafkaBatcher.Producers.Base do end) end - defp telemetry_on?(opts) do - Keyword.get(opts, :telemetry, true) - end - defp push_metrics(_start_time, _topic, _partition, _messages, false) do :ok end @@ -106,17 +105,6 @@ defmodule KafkaBatcher.Producers.Base do } ) end - - defp partition_strategy_from(opts) do - case Keyword.fetch(opts, :partition_strategy) do - {:ok, partition_strategy} -> - partition_strategy - - :error -> - KafkaBatcher.Config.general_producer_config() - |> Keyword.get(:partition_strategy, :random) - end - end end end end diff --git a/lib/kafka_batcher/producers/config.ex b/lib/kafka_batcher/producers/config.ex new file mode 100644 index 0000000..f88c45f --- /dev/null +++ b/lib/kafka_batcher/producers/config.ex @@ -0,0 +1,71 @@ +defmodule KafkaBatcher.Producers.Config do + @moduledoc false + + alias KafkaBatcher.{MessageObject, Producers.Config.BrodConfig} + + @typep topic :: String.t() + @typep partition_count :: pos_integer() + + @type partition_fn :: + (topic(), partition_count(), MessageObject.key(), MessageObject.value() -> + pos_integer()) + + @type partition_strategy :: :random | :md5 | partition_fn() + + @type t :: %__MODULE__{ + endpoints: [{String.t(), non_neg_integer()}], + client_name: atom(), + partition_strategy: partition_strategy() | nil, + required_acks: integer(), + telemetry: boolean(), + brod_config: BrodConfig.t() + } + + @enforce_keys [ + :endpoints, + :client_name, + :partition_strategy, + :required_acks, + :telemetry, + :brod_config + ] + + defstruct @enforce_keys + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + endpoints: config.endpoints, + client_name: config.client_name, + partition_strategy: config.partition_strategy, + required_acks: config.required_acks, + telemetry: config.telemetry + ] ++ BrodConfig.to_kwlist(config.brod_config) + end + + @spec drop_sensitive(t()) :: t() + def drop_sensitive(%__MODULE__{} = config) do + %__MODULE__{ + config + | brod_config: BrodConfig.drop_sensitive(config.brod_config) + } + end + + @spec build!(opts :: Keyword.t()) :: t() + def build!(opts) do + endpoints = + for url <- opts |> Keyword.fetch!(:endpoints) |> String.split(",") do + [host, port] = String.split(url, ":") + {host, :erlang.binary_to_integer(port)} + end + + %__MODULE__{ + endpoints: endpoints, + client_name: Keyword.get(opts, :client_name, :kafka_producer_client), + partition_strategy: Keyword.get(opts, :partition_strategy), + required_acks: Keyword.get(opts, :required_acks, -1), + telemetry: Keyword.get(opts, :telemetry, true), + brod_config: BrodConfig.build!(opts) + } + end +end diff --git a/lib/kafka_batcher/producers/config/brod_config.ex b/lib/kafka_batcher/producers/config/brod_config.ex new file mode 100644 index 0000000..b6a3242 --- /dev/null +++ b/lib/kafka_batcher/producers/config/brod_config.ex @@ -0,0 +1,91 @@ +defmodule KafkaBatcher.Producers.Config.BrodConfig do + defmodule SASLConfigError do + defexception [:message] + + def exception(_term) do + %SASLConfigError{message: "Sasl config error"} + end + end + + @moduledoc false + + @type sasl_mechanism() :: :plain | :scram_sha_256 | :scram_sha_512 + @type sasl() :: {sasl_mechanism(), binary(), binary()} | :undefined + + @type t :: %__MODULE__{ + allow_topic_auto_creation: boolean(), + required_acks: integer(), + ssl: boolean(), + sasl: sasl() + } + + @derive {Inspect, only: [:allow_topic_auto_creation, :required_acks, :ssl]} + + @enforce_keys [:allow_topic_auto_creation, :required_acks, :ssl, :sasl] + defstruct @enforce_keys + + @spec drop_sensitive(t()) :: t() + def drop_sensitive(%__MODULE__{} = config) do + case config.sasl do + :undefined -> config + {mechanism, _, _} -> %__MODULE__{config | sasl: {mechanism, "", ""}} + end + end + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + allow_topic_auto_creation: config.allow_topic_auto_creation, + required_acks: config.required_acks, + ssl: config.ssl, + sasl: config.sasl + ] + end + + @spec build!(Keyword.t()) :: t() + def build!(opts) do + sasl_config = + case validate_sasl_config(Keyword.get(opts, :sasl)) do + {:ok, sasl_config} -> + sasl_config + + {:error, {:invalid, config}} -> + raise(SASLConfigError, "SASL config failed: #{inspect(config)}") + end + + # https://github.com/kafka4beam/brod/blob/master/src/brod_producer.erl + %__MODULE__{ + allow_topic_auto_creation: Keyword.get(opts, :allow_topic_auto_creation, false), + required_acks: Keyword.get(opts, :required_acks, -1), + ssl: Keyword.get(opts, :ssl, false), + sasl: sasl_config + } + end + + @spec validate_sasl_config(map() | nil) :: + {:ok, sasl()} | {:error, {:invalid, map()}} + defp validate_sasl_config( + %{ + mechanism: mechanism, + login: login, + password: password + } = config + ) do + mechanism_valid? = mechanism in [:plain, :scram_sha_256, :scram_sha_512] + + if mechanism_valid? and password != nil and login != nil do + {:ok, {mechanism, login, password}} + else + {:error, {:invalid, config}} + end + end + + defp validate_sasl_config(sasl_config) + when sasl_config == nil or sasl_config == %{} do + {:ok, :undefined} + end + + defp validate_sasl_config(bad_sasl_config) do + {:error, {:invalid, bad_sasl_config}} + end +end diff --git a/lib/kafka_batcher/producers/kaffe.ex b/lib/kafka_batcher/producers/kaffe.ex index 0ec51ea..989b9fa 100644 --- a/lib/kafka_batcher/producers/kaffe.ex +++ b/lib/kafka_batcher/producers/kaffe.ex @@ -2,9 +2,9 @@ defmodule KafkaBatcher.Producers.Kaffe do @moduledoc """ An implementation of the KafkaBatcher.Behaviours.Producer for Kaffe """ + alias KafkaBatcher.{Producers, Producers.Config.BrodConfig} @brod_client Application.compile_env(:kafka_batcher, :brod_client, :brod) - @client_name :kafka_producer_client @behaviour KafkaBatcher.Behaviours.Producer use KafkaBatcher.Producers.Base @@ -14,26 +14,37 @@ defmodule KafkaBatcher.Producers.Kaffe do ## ------------------------------------------------------------------------- @impl true - def start_client do - config = KafkaBatcher.Config.general_producer_config() - endpoints = Keyword.fetch!(config, :endpoints) - - @brod_client.start_link_client(endpoints, @client_name, config) + def start_client(%Producers.Config{} = config) do + @brod_client.start_link_client( + config.endpoints, + config.client_name, + BrodConfig.to_kwlist(config.brod_config) + ) end @impl true - def start_producer(topic_name, config) do - @brod_client.start_producer(@client_name, topic_name, config) + def start_producer(%Producers.Config{} = config, topic_name) do + @brod_client.start_producer( + config.client_name, + topic_name, + BrodConfig.to_kwlist(config.brod_config) + ) end @impl true - def get_partitions_count(topic) do - @brod_client.get_partitions_count(@client_name, topic) + def get_partitions_count(%Producers.Config{} = config, topic) do + @brod_client.get_partitions_count(config.client_name, topic) end @impl true - def do_produce(messages, topic, partition, _config) do - @brod_client.produce_sync(@client_name, topic, partition, "ignored", transform_messages(messages)) + def do_produce(%Producers.Config{} = config, messages, topic, partition) do + @brod_client.produce_sync( + config.client_name, + topic, + partition, + "ignored", + transform_messages(messages) + ) end ## ------------------------------------------------------------------------- diff --git a/lib/kafka_batcher/producers/kafka_ex.ex b/lib/kafka_batcher/producers/kafka_ex.ex index c3748d1..ea90828 100644 --- a/lib/kafka_batcher/producers/kafka_ex.ex +++ b/lib/kafka_batcher/producers/kafka_ex.ex @@ -3,10 +3,10 @@ if Code.ensure_loaded?(KafkaEx) do @moduledoc """ An implementation of the KafkaBatcher.Behaviours.Producer for KafkaEx """ + alias KafkaBatcher.Producers @kafka_ex_client Application.compile_env(:kafka_batcher, :kafka_ex_client, KafkaEx) @metadata_response Application.compile_env(:kafka_batcher, :kafka_ex_metadata, KafkaEx.Protocol.Metadata.Response) - @client_name :kafka_producer_client @behaviour KafkaBatcher.Behaviours.Producer use KafkaBatcher.Producers.Base @@ -17,10 +17,8 @@ if Code.ensure_loaded?(KafkaEx) do ## KafkaEx start worker @impl true - def start_client do - uris = KafkaBatcher.Config.get_endpoints() - - @kafka_ex_client.create_worker(@client_name, uris: uris) + def start_client(%Producers.Config{} = config) do + @kafka_ex_client.create_worker(config.client_name, uris: config.endpoints) end @impl true @@ -29,9 +27,9 @@ if Code.ensure_loaded?(KafkaEx) do end @impl true - def get_partitions_count(topic) do + def get_partitions_count(%Producers.Config{} = config, topic) do count = - @kafka_ex_client.metadata(topic: topic, worker_name: @client_name) + @kafka_ex_client.metadata(topic: topic, worker_name: config.client_name) |> @metadata_response.partitions_for_topic(topic) |> length() @@ -39,15 +37,15 @@ if Code.ensure_loaded?(KafkaEx) do end @impl true - def do_produce(messages, topic, partition, config) do + def do_produce(%Producers.Config{} = config, messages, topic, partition) do case @kafka_ex_client.produce( %KafkaEx.Protocol.Produce.Request{ topic: topic, partition: partition, - required_acks: Keyword.get(config, :required_acks), + required_acks: config.required_acks, messages: transform_messages(messages) }, - worker_name: @client_name + worker_name: config.client_name ) do {:ok, _offset} -> :ok diff --git a/lib/kafka_batcher/prom_ex/plugins/kafka.ex b/lib/kafka_batcher/prom_ex/plugins/kafka.ex index 8f7d61b..80905f5 100644 --- a/lib/kafka_batcher/prom_ex/plugins/kafka.ex +++ b/lib/kafka_batcher/prom_ex/plugins/kafka.ex @@ -59,19 +59,37 @@ if Code.ensure_loaded?(PromEx) do ] @impl true - def event_metrics(_opts) do - metric_prefix = [:prom_ex, :kafka] + def event_metrics(opts) do + %KafkaBatcher.Config{} = + config = + Keyword.get_lazy(opts, :kafka_batcher_config, fn -> + :kafka_batcher + |> Application.get_all_env() + |> KafkaBatcher.Config.build_config!() + end) + + metric_prefix = + [:prom_ex, :kafka, KafkaBatcher.Config.get_client_name(config)] labels = %{} - buckets = Application.get_env(:kafka_batcher, :kafka_metric_opts, []) [ - producer_event_metrics(metric_prefix, labels, buckets), - consumer_event_metrics(metric_prefix, labels, buckets) + producer_event_metrics( + metric_prefix, + labels, + config.kafka_metric_opts, + config.kafka_topic_aliases + ), + consumer_event_metrics( + metric_prefix, + labels, + config.kafka_metric_opts, + config.kafka_topic_aliases + ) ] end - def producer_event_metrics(metric_prefix, labels, buckets) do + def producer_event_metrics(metric_prefix, labels, buckets, aliases) do producer_metrics_tags = Map.keys(labels) ++ [:topic, :partition, :topic_alias] buckets = Keyword.get(buckets, :producer_buckets, @default_producer_buckets) @@ -83,12 +101,13 @@ if Code.ensure_loaded?(PromEx) do labels, :producer, @producer_event_metrics, - buckets + buckets, + aliases ) ) end - def consumer_event_metrics(metric_prefix, labels, buckets) do + def consumer_event_metrics(metric_prefix, labels, buckets, aliases) do consumer_metrics_tags = Map.keys(labels) ++ [:topic, :partition, :topic_alias] buckets = Keyword.get(buckets, :consumer_buckets, @default_consumer_buckets) @@ -100,14 +119,13 @@ if Code.ensure_loaded?(PromEx) do labels, :consumer, @consumer_event_metrics, - buckets + buckets, + aliases ) ) end - defp build_kafka_metrics(metric_prefix, metrics_tags, labels, name, event_name, buckets) do - aliases = Application.get_env(:kafka_batcher, :kafka_topic_aliases, %{}) - + defp build_kafka_metrics(metric_prefix, metrics_tags, labels, name, event_name, buckets, aliases) do [ distribution( metric_prefix ++ [name, :duration, :seconds], diff --git a/lib/kafka_batcher/supervisor.ex b/lib/kafka_batcher/supervisor.ex index 2a7c2fc..e49abc1 100644 --- a/lib/kafka_batcher/supervisor.ex +++ b/lib/kafka_batcher/supervisor.ex @@ -4,16 +4,65 @@ defmodule KafkaBatcher.Supervisor do Starts Collector & AccumulatorsPoolSupervisor for each configured collector """ + alias KafkaBatcher.{AccumulatorsPoolSupervisor, Collector} + use Supervisor - def start_link(args) do - Supervisor.start_link(__MODULE__, args, name: __MODULE__) + @spec child_spec( + KafkaBatcher.Config.t() + | Keyword.t() + | nil + ) :: Supervisor.child_spec() + def child_spec(%KafkaBatcher.Config{} = config) do + %{id: reg_name(config), start: {__MODULE__, :start_link, [config]}} + end + + def child_spec(opts) when is_nil(opts) or is_list(opts) do + opts = + if is_nil(opts) or opts == [] do + # Backwards compatibility + Application.get_all_env(:kafka_batcher) + else + opts + end + + opts |> KafkaBatcher.Config.build_config!() |> child_spec() + end + + @spec start_link(KafkaBatcher.Config.t()) :: Supervisor.on_start() + def start_link(%KafkaBatcher.Config{} = config) do + Supervisor.start_link(__MODULE__, config, name: reg_name(config)) end - def init(_args) do - children = KafkaBatcher.Config.collectors_spec() + @impl true + def init(%KafkaBatcher.Config{} = config) do + children = + [ + KafkaBatcher.ConnectionManager.child_spec(config) + | build_pipeline_unit_specs(config.pipeline_units) + ] + |> Enum.reverse() opts = [strategy: :one_for_one] Supervisor.init(children, opts) end + + defp build_pipeline_unit_specs(units) do + for %KafkaBatcher.PipelineUnit{} = unit <- units, reduce: [] do + specs -> + %KafkaBatcher.PipelineUnit{ + collector_config: %Collector.Config{collector: collector} + } = unit + + [ + collector.child_spec(unit), + AccumulatorsPoolSupervisor.child_spec(unit) + | specs + ] + end + end + + defp reg_name(%KafkaBatcher.Config{} = config) do + :"#{__MODULE__}.#{KafkaBatcher.Config.get_client_name(config)}" + end end diff --git a/lib/kafka_batcher/temp_storage.ex b/lib/kafka_batcher/temp_storage.ex index 3521237..80fb88e 100644 --- a/lib/kafka_batcher/temp_storage.ex +++ b/lib/kafka_batcher/temp_storage.ex @@ -26,7 +26,9 @@ defmodule KafkaBatcher.TempStorage do end end - defp recheck_and_update(%CollectorState{topic_name: topic, locked?: true} = state, now) do + defp recheck_and_update(%CollectorState{locked?: true} = state, now) do + topic = KafkaBatcher.PipelineUnit.get_topic_name(state.pipeline_unit) + if @storage_impl.empty?(topic) do %CollectorState{state | locked?: false, last_check_timestamp: nil} else diff --git a/test/support/producers/test_producer.ex b/test/support/producers/test_producer.ex index 9851608..efc114e 100644 --- a/test/support/producers/test_producer.ex +++ b/test/support/producers/test_producer.ex @@ -27,24 +27,24 @@ defmodule KafkaBatcher.Producers.TestProducer do } @impl true - def start_client do - process_callback(%{action: :start_client}, {:ok, self()}) + def start_client(config) do + process_callback(%{action: :start_client, parameters: config}, {:ok, self()}) end @impl true - def start_producer(topic_name, config) do - process_callback(%{action: :start_producer, parameters: {topic_name, config}}, :ok) + def start_producer(config, topic_name) do + process_callback(%{action: :start_producer, parameters: {config, topic_name}}, :ok) end @impl true - def get_partitions_count(topic_name) do + def get_partitions_count(config, topic_name) do response = {:ok, @partition_counts[topic_name]} - process_callback(%{action: :get_partitions_count, parameters: topic_name}, response) + process_callback(%{action: :get_partitions_count, parameters: {config, topic_name}}, response) end @impl true - def do_produce(messages, topic, partition, config) do - process_callback(%{action: :do_produce, parameters: {messages, topic, partition, config}}, :ok) + def do_produce(config, messages, topic, partition) do + process_callback(%{action: :do_produce, parameters: {config, messages, topic, partition}}, :ok) end def topic_name(idx) when idx >= 1 and idx <= 8, do: "topic#{idx}" From d82c2938d77583a99d8c808bb7fe2514b7526a01 Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Tue, 30 Dec 2025 11:41:19 +0300 Subject: [PATCH 2/5] Rename PipelineUnit to DataStreamSpec --- lib/kafka_batcher/accumulator.ex | 52 +++++++++---------- lib/kafka_batcher/accumulator/state.ex | 24 ++++----- .../accumulators_pool_supervisor.ex | 34 ++++++------ lib/kafka_batcher/collector.ex | 21 ++++---- lib/kafka_batcher/collector/implementation.ex | 30 +++++------ lib/kafka_batcher/collector/state.ex | 20 +++---- lib/kafka_batcher/config.ex | 24 ++++----- lib/kafka_batcher/connection_manager.ex | 4 +- .../{pipeline_unit.ex => data_stream_spec.ex} | 36 ++++++------- .../validator.ex | 12 ++--- lib/kafka_batcher/supervisor.ex | 14 ++--- lib/kafka_batcher/temp_storage.ex | 2 +- 12 files changed, 137 insertions(+), 136 deletions(-) rename lib/kafka_batcher/{pipeline_unit.ex => data_stream_spec.ex} (65%) rename lib/kafka_batcher/{pipeline_unit => data_stream_spec}/validator.ex (70%) diff --git a/lib/kafka_batcher/accumulator.ex b/lib/kafka_batcher/accumulator.ex index bdc2f77..9f9e206 100644 --- a/lib/kafka_batcher/accumulator.ex +++ b/lib/kafka_batcher/accumulator.ex @@ -8,8 +8,8 @@ defmodule KafkaBatcher.Accumulator do alias KafkaBatcher.{ Accumulator, Accumulator.State, + DataStreamSpec, MessageObject, - PipelineUnit, Producers, TempStorage } @@ -22,24 +22,24 @@ defmodule KafkaBatcher.Accumulator do use GenServer require Logger - @spec start_link(PipelineUnit.t()) :: GenServer.on_start() - def start_link(%PipelineUnit{} = pipeline_unit) do + @spec start_link(DataStreamSpec.t()) :: GenServer.on_start() + def start_link(%DataStreamSpec{} = data_stream_spec) do GenServer.start_link( __MODULE__, - pipeline_unit, - name: reg_name(pipeline_unit) + data_stream_spec, + name: reg_name(data_stream_spec) ) end @doc "Returns a specification to start this module under a supervisor" - @spec child_spec(PipelineUnit.t()) :: Supervisor.child_spec() - def child_spec(%PipelineUnit{} = pipeline_unit) do + @spec child_spec(DataStreamSpec.t()) :: Supervisor.child_spec() + def child_spec(%DataStreamSpec{} = data_stream_spec) do %{ - id: reg_name(pipeline_unit), + id: reg_name(data_stream_spec), start: { - PipelineUnit.get_accumulator_mod(pipeline_unit), + DataStreamSpec.get_accumulator_mod(data_stream_spec), :start_link, - [pipeline_unit] + [data_stream_spec] } } end @@ -47,9 +47,9 @@ defmodule KafkaBatcher.Accumulator do @doc """ Finds appropriate Accumulator process by topic & partition and dispatches `event` to it """ - @spec add_event(MessageObject.t(), PipelineUnit.t()) :: :ok | {:error, term()} - def add_event(%MessageObject{} = event, %PipelineUnit{} = pipeline_unit) do - GenServer.call(reg_name(pipeline_unit), {:add_event, event}) + @spec add_event(MessageObject.t(), DataStreamSpec.t()) :: :ok | {:error, term()} + def add_event(%MessageObject{} = event, %DataStreamSpec{} = data_stream_spec) do + GenServer.call(reg_name(data_stream_spec), {:add_event, event}) catch _, _reason -> Logger.warning("KafkaBatcher: Couldn't get through to accumulator") @@ -60,17 +60,17 @@ defmodule KafkaBatcher.Accumulator do ## Callbacks ## @impl GenServer - def init(%PipelineUnit{} = pipeline_unit) do + def init(%DataStreamSpec{} = data_stream_spec) do Process.flag(:trap_exit, true) - topic_name = PipelineUnit.get_topic_name(pipeline_unit) - partition = PipelineUnit.get_partition(pipeline_unit) + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) + partition = DataStreamSpec.get_partition(data_stream_spec) Logger.debug(""" KafkaBatcher: Accumulator process started: topic #{topic_name} partition #{partition} pid #{inspect(self())} """) - {:ok, %State{pipeline_unit: pipeline_unit}} + {:ok, %State{data_stream_spec: data_stream_spec}} end @impl GenServer @@ -97,7 +97,7 @@ defmodule KafkaBatcher.Accumulator do {:noreply, new_state} {:error, _reason, new_state} -> - PipelineUnit.get_collector(state.pipeline_unit).set_lock() + DataStreamSpec.get_collector(state.data_stream_spec).set_lock() {:noreply, new_state} end end @@ -128,7 +128,7 @@ defmodule KafkaBatcher.Accumulator do pdict, %State{ state - | pipeline_unit: PipelineUnit.drop_sensitive(state.pipeline_unit) + | data_stream_spec: DataStreamSpec.drop_sensitive(state.data_stream_spec) } ] end @@ -143,9 +143,9 @@ defmodule KafkaBatcher.Accumulator do end defp set_cleanup_timer_if_not_exists(%State{cleanup_timer_ref: nil} = state) do - %PipelineUnit{ + %DataStreamSpec{ accumulator_config: %Accumulator.Config{max_wait_time: max_wait_time} - } = state.pipeline_unit + } = state.data_stream_spec ref = :erlang.start_timer(max_wait_time, self(), :cleanup) %State{state | cleanup_timer_ref: ref} @@ -192,13 +192,13 @@ defmodule KafkaBatcher.Accumulator do {:error, reason} end - defp reg_name(%PipelineUnit{} = pipeline_unit) do - %PipelineUnit{ + defp reg_name(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ producer_config: %Producers.Config{client_name: client_name} - } = pipeline_unit + } = data_stream_spec - topic_name = PipelineUnit.get_topic_name(pipeline_unit) - partition = PipelineUnit.get_partition(pipeline_unit) + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) + partition = DataStreamSpec.get_partition(data_stream_spec) case partition do nil -> diff --git a/lib/kafka_batcher/accumulator/state.ex b/lib/kafka_batcher/accumulator/state.ex index 2b7b432..90e4990 100644 --- a/lib/kafka_batcher/accumulator/state.ex +++ b/lib/kafka_batcher/accumulator/state.ex @@ -14,14 +14,14 @@ defmodule KafkaBatcher.Accumulator.State do alias KafkaBatcher.{ Accumulator, Accumulator.State, - MessageObject, - PipelineUnit + DataStreamSpec, + MessageObject } @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @type t :: %State{ - pipeline_unit: KafkaBatcher.PipelineUnit.t(), + data_stream_spec: KafkaBatcher.DataStreamSpec.t(), pending_messages: list(), last_produced_at: non_neg_integer(), batch_bytesize: non_neg_integer(), @@ -31,7 +31,7 @@ defmodule KafkaBatcher.Accumulator.State do status: atom() } - @enforce_keys [:pipeline_unit] + @enforce_keys [:data_stream_spec] defstruct @enforce_keys ++ [ pending_messages: [], @@ -79,12 +79,12 @@ defmodule KafkaBatcher.Accumulator.State do end defp consider_max_bytesize(%State{status: :continue, batch_bytesize: batch_bytesize} = state, new_message) do - %PipelineUnit{ + %DataStreamSpec{ accumulator_config: %Accumulator.Config{max_batch_bytesize: max_batch_bytesize} - } = state.pipeline_unit + } = state.data_stream_spec - topic_name = PipelineUnit.get_topic_name(state.pipeline_unit) - partition = PipelineUnit.get_partition(state.pipeline_unit) + topic_name = DataStreamSpec.get_topic_name(state.data_stream_spec) + partition = DataStreamSpec.get_partition(state.data_stream_spec) message_size = :erlang.external_size(new_message) @@ -109,9 +109,9 @@ defmodule KafkaBatcher.Accumulator.State do end defp consider_max_size_and_wait_time(%State{status: :continue} = state, now) do - %PipelineUnit{ + %DataStreamSpec{ accumulator_config: %Accumulator.Config{batch_size: batch_size, min_delay: min_delay} - } = state.pipeline_unit + } = state.data_stream_spec if state.pending_messages_count >= batch_size and now - state.last_produced_at >= min_delay do mark_as_ready(state) @@ -123,9 +123,9 @@ defmodule KafkaBatcher.Accumulator.State do defp consider_max_size_and_wait_time(%State{status: :ready} = state, _), do: state defp consider_istant_flush(%State{status: :continue} = state, key, value) do - %PipelineUnit{ + %DataStreamSpec{ accumulator_config: %Accumulator.Config{batch_flusher: batch_flusher} - } = state.pipeline_unit + } = state.data_stream_spec if batch_flusher.flush?(key, value) do mark_as_ready(state) diff --git a/lib/kafka_batcher/accumulators_pool_supervisor.ex b/lib/kafka_batcher/accumulators_pool_supervisor.ex index b8022b6..75e54f0 100644 --- a/lib/kafka_batcher/accumulators_pool_supervisor.ex +++ b/lib/kafka_batcher/accumulators_pool_supervisor.ex @@ -5,33 +5,33 @@ defmodule KafkaBatcher.AccumulatorsPoolSupervisor do use DynamicSupervisor - alias KafkaBatcher.{Accumulator, PipelineUnit, Producers} + alias KafkaBatcher.{Accumulator, DataStreamSpec, Producers} @dialyzer {:no_return, {:init, 1}} - def start_link(%PipelineUnit{} = pipeline_unit) do + def start_link(%DataStreamSpec{} = data_stream_spec) do DynamicSupervisor.start_link( __MODULE__, - pipeline_unit, - name: reg_name(pipeline_unit) + data_stream_spec, + name: reg_name(data_stream_spec) ) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(%PipelineUnit{} = pipeline_unit) do + def child_spec(%DataStreamSpec{} = data_stream_spec) do %{ - id: reg_name(pipeline_unit), - start: {__MODULE__, :start_link, [pipeline_unit]}, + id: reg_name(data_stream_spec), + start: {__MODULE__, :start_link, [data_stream_spec]}, type: :supervisor } end - def init(%PipelineUnit{} = pipeline_unit) do - %PipelineUnit{ + def init(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ accumulator_config: %Accumulator.Config{ max_accumulator_restarts: max_accumulator_restarts } - } = pipeline_unit + } = data_stream_spec # max_restarts value depends on partitions count in case when partitioned accumulation is used. # For example: 100 max_restarts -> 10 process restarts per second for 1 topic with 10 partitions @@ -44,19 +44,19 @@ defmodule KafkaBatcher.AccumulatorsPoolSupervisor do ) end - def start_accumulator(%PipelineUnit{} = pipeline_unit) do + def start_accumulator(%DataStreamSpec{} = data_stream_spec) do DynamicSupervisor.start_child( - reg_name(pipeline_unit), - Accumulator.child_spec(pipeline_unit) + reg_name(data_stream_spec), + Accumulator.child_spec(data_stream_spec) ) end - def reg_name(%PipelineUnit{} = pipeline_unit) do - %PipelineUnit{ + def reg_name(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ producer_config: %Producers.Config{client_name: client_name} - } = pipeline_unit + } = data_stream_spec - topic_name = PipelineUnit.get_topic_name(pipeline_unit) + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) :"#{__MODULE__}.#{client_name}.#{topic_name}" end diff --git a/lib/kafka_batcher/collector.ex b/lib/kafka_batcher/collector.ex index d3c65a7..da367e6 100644 --- a/lib/kafka_batcher/collector.ex +++ b/lib/kafka_batcher/collector.ex @@ -48,6 +48,7 @@ defmodule KafkaBatcher.Collector do AccumulatorsPoolSupervisor, Collector, Collector.State, + DataStreamSpec, TempStorage } @@ -58,16 +59,16 @@ defmodule KafkaBatcher.Collector do @compile_opts opts # Public API - @spec start_link(KafkaBatcher.PipelineUnit.t()) :: GenServer.on_start() - def start_link(%KafkaBatcher.PipelineUnit{} = unit) do - GenServer.start_link(__MODULE__, unit, name: __MODULE__) + @spec start_link(DataStreamSpec.t()) :: GenServer.on_start() + def start_link(%DataStreamSpec{} = spec) do + GenServer.start_link(__MODULE__, spec, name: __MODULE__) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(%KafkaBatcher.PipelineUnit{} = unit) do + def child_spec(%DataStreamSpec{} = spec) do %{ id: __MODULE__, - start: {__MODULE__, :start_link, [unit]}, + start: {__MODULE__, :start_link, [spec]}, type: :worker } end @@ -103,15 +104,15 @@ defmodule KafkaBatcher.Collector do # Callbacks @impl GenServer - def init(%KafkaBatcher.PipelineUnit{} = pipeline_unit) do + def init(%DataStreamSpec{} = data_stream_spec) do Process.flag(:trap_exit, true) - topic_name = KafkaBatcher.PipelineUnit.get_topic_name(pipeline_unit) + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) Logger.debug("KafkaBatcher: Batch collector started: topic #{topic_name} pid #{inspect(self())}") send(self(), :init_accumulators) - {:ok, %State{pipeline_unit: pipeline_unit}} + {:ok, %State{data_stream_spec: data_stream_spec}} end @impl GenServer @@ -140,7 +141,7 @@ defmodule KafkaBatcher.Collector do end def handle_call(:get_config, _from, %State{} = state) do - {:reply, state.pipeline_unit, state} + {:reply, state.data_stream_spec, state} end def handle_call(unknown, _from, state) do @@ -188,7 +189,7 @@ defmodule KafkaBatcher.Collector do pdict, %State{ state - | pipeline_unit: PipelineUnit.drop_sensitive(state.pipeline_unit) + | data_stream_spec: DataStreamSpec.drop_sensitive(state.data_stream_spec) } ] end diff --git a/lib/kafka_batcher/collector/implementation.ex b/lib/kafka_batcher/collector/implementation.ex index f703758..ff2c4fa 100644 --- a/lib/kafka_batcher/collector/implementation.ex +++ b/lib/kafka_batcher/collector/implementation.ex @@ -9,23 +9,23 @@ defmodule KafkaBatcher.Collector.Implementation do AccumulatorsPoolSupervisor, Collector, Collector.State, - MessageObject, - PipelineUnit + DataStreamSpec, + MessageObject } @producer Application.compile_env(:kafka_batcher, :producer_module, KafkaBatcher.Producers.Kaffe) - def choose_partition(_message, _pipeline_unit, nil), do: {:error, :kafka_unavailable} + def choose_partition(_message, _data_stream_spec, nil), do: {:error, :kafka_unavailable} def choose_partition( %MessageObject{key: key, value: value}, - %KafkaBatcher.PipelineUnit{} = pipeline_unit, + %KafkaBatcher.DataStreamSpec{} = data_stream_spec, partitions_count ) do %Collector.Config{ partition_fn: partition_fn, topic_name: topic_name - } = pipeline_unit.collector_config + } = data_stream_spec.collector_config partition = partition_fn.(topic_name, partitions_count, key, value) @@ -34,7 +34,7 @@ defmodule KafkaBatcher.Collector.Implementation do def start_accumulators(%State{} = state) do collect_by_partition? = - PipelineUnit.collect_by_partition?(state.pipeline_unit) + DataStreamSpec.collect_by_partition?(state.data_stream_spec) cond do collect_by_partition? and is_nil(state.partitions_count) -> @@ -42,23 +42,23 @@ defmodule KafkaBatcher.Collector.Implementation do collect_by_partition? -> start_accumulators_by_partitions( - state.pipeline_unit, + state.data_stream_spec, state.partitions_count ) not collect_by_partition? -> - start_accumulator(state.pipeline_unit) + start_accumulator(state.data_stream_spec) end end - defp start_accumulators_by_partitions(pipeline_unit, count) do + defp start_accumulators_by_partitions(data_stream_spec, count) do Enum.reduce_while( 0..(count - 1), :ok, fn partition, _ -> - pipeline_unit = PipelineUnit.set_partition(pipeline_unit, partition) + data_stream_spec = DataStreamSpec.set_partition(data_stream_spec, partition) - case start_accumulator(pipeline_unit) do + case start_accumulator(data_stream_spec) do :ok -> {:cont, :ok} @@ -69,8 +69,8 @@ defmodule KafkaBatcher.Collector.Implementation do ) end - defp start_accumulator(pipeline_unit) do - case AccumulatorsPoolSupervisor.start_accumulator(pipeline_unit) do + defp start_accumulator(data_stream_spec) do + case AccumulatorsPoolSupervisor.start_accumulator(data_stream_spec) do {:ok, _} -> :ok @@ -79,7 +79,7 @@ defmodule KafkaBatcher.Collector.Implementation do {:error, reason} -> Logger.warning(""" - KafkaBatcher: Accumulator has failed to start with args: #{inspect(pipeline_unit)}. + KafkaBatcher: Accumulator has failed to start with args: #{inspect(data_stream_spec)}. Reason: #{inspect(reason)}} """) @@ -90,7 +90,7 @@ defmodule KafkaBatcher.Collector.Implementation do @spec store_partition_count(State.t()) :: State.t() def store_partition_count(%State{partitions_count: nil} = state) do %State{ - pipeline_unit: %PipelineUnit{ + data_stream_spec: %DataStreamSpec{ collector_config: %Collector.Config{topic_name: topic_name}, producer_config: producer_config } diff --git a/lib/kafka_batcher/collector/state.ex b/lib/kafka_batcher/collector/state.ex index 7b77191..97824d9 100644 --- a/lib/kafka_batcher/collector/state.ex +++ b/lib/kafka_batcher/collector/state.ex @@ -6,8 +6,8 @@ defmodule KafkaBatcher.Collector.State do alias KafkaBatcher.{ Accumulator, Collector, + DataStreamSpec, MessageObject, - PipelineUnit, TempStorage } @@ -16,7 +16,7 @@ defmodule KafkaBatcher.Collector.State do require Logger @type t :: %State{ - pipeline_unit: PipelineUnit.t(), + data_stream_spec: DataStreamSpec.t(), locked?: boolean(), last_check_timestamp: non_neg_integer() | nil, ready?: boolean(), @@ -24,7 +24,7 @@ defmodule KafkaBatcher.Collector.State do partitions_count: pos_integer() | nil } - @enforce_keys [:pipeline_unit] + @enforce_keys [:data_stream_spec] defstruct @enforce_keys ++ [ # these fields are used to handle case when Kafka went down suddenly @@ -65,9 +65,9 @@ defmodule KafkaBatcher.Collector.State do end defp try_to_add_event(%State{} = state, event, partition) do - pipeline_unit = PipelineUnit.set_partition(state.pipeline_unit, partition) + data_stream_spec = DataStreamSpec.set_partition(state.data_stream_spec, partition) - case Accumulator.add_event(event, pipeline_unit) do + case Accumulator.add_event(event, data_stream_spec) do :ok -> :ok {:error, reason} -> keep_failed_event(:ok, event, reason, partition) end @@ -91,10 +91,10 @@ defmodule KafkaBatcher.Collector.State do end defp choose_partition(%State{} = state, event) do - if PipelineUnit.collect_by_partition?(state.pipeline_unit) do + if DataStreamSpec.collect_by_partition?(state.data_stream_spec) do Collector.Implementation.choose_partition( event, - state.pipeline_unit, + state.data_stream_spec, state.partitions_count ) else @@ -108,14 +108,14 @@ defmodule KafkaBatcher.Collector.State do {:error, _reason, failed_event_batches} = result, %State{} = state ) do - %State{pipeline_unit: %PipelineUnit{} = pipeline_unit} = state + %State{data_stream_spec: %DataStreamSpec{} = data_stream_spec} = state for {partition, failed_events} <- failed_event_batches do TempStorage.save_batch(%TempStorage.Batch{ messages: Enum.reverse(failed_events), - topic: PipelineUnit.get_topic_name(pipeline_unit), + topic: DataStreamSpec.get_topic_name(data_stream_spec), partition: partition, - producer_config: pipeline_unit.opts + producer_config: data_stream_spec.opts }) end diff --git a/lib/kafka_batcher/config.ex b/lib/kafka_batcher/config.ex index dd96eb2..0f2c67a 100644 --- a/lib/kafka_batcher/config.ex +++ b/lib/kafka_batcher/config.ex @@ -24,20 +24,20 @@ defmodule KafkaBatcher.Config do alias KafkaBatcher.{ Accumulator, Collector, - PipelineUnit.Validator, + DataStreamSpec.Validator, Producers } @type t :: %__MODULE__{ producer_config: Producers.Config.t(), - pipeline_units: [KafkaBatcher.PipelineUnit.t()], + data_stream_specs: [KafkaBatcher.DataStreamSpec.t()], kafka_topic_aliases: %{optional(binary()) => binary()}, kafka_metric_opts: Keyword.t() } @enforce_keys [ :producer_config, - :pipeline_units, + :data_stream_specs, :kafka_topic_aliases, :kafka_metric_opts ] @@ -59,22 +59,22 @@ defmodule KafkaBatcher.Config do |> Keyword.fetch!(:kafka) |> Producers.Config.build!() - pipeline_units = + data_stream_specs = for collector <- Keyword.fetch!(opts, :collectors) do producer_config - |> build_pipeline_unit!(collector, opts) - |> validate_pipeline_unit!() + |> build_data_stream_spec!(collector, opts) + |> validate_data_stream_spec!() end %__MODULE__{ producer_config: producer_config, - pipeline_units: pipeline_units, + data_stream_specs: data_stream_specs, kafka_topic_aliases: Keyword.get(opts, :kafka_topic_aliases, %{}), kafka_metric_opts: Keyword.get(opts, :kafka_metric_opts, []) } end - defp build_pipeline_unit!(producer_config, collector, opts) do + defp build_data_stream_spec!(producer_config, collector, opts) do opts = opts |> Keyword.merge(get_compile_opts!(collector)) @@ -84,7 +84,7 @@ defmodule KafkaBatcher.Config do collector_config = Collector.Config.build!(opts) accumulator_config = Accumulator.Config.build!(opts) - %KafkaBatcher.PipelineUnit{ + %KafkaBatcher.DataStreamSpec{ collector_config: collector_config, accumulator_config: accumulator_config, producer_config: producer_config, @@ -96,10 +96,10 @@ defmodule KafkaBatcher.Config do } end - defp validate_pipeline_unit!(pipeline_unit) do - case Validator.validate(pipeline_unit) do + defp validate_data_stream_spec!(data_stream_spec) do + case Validator.validate(data_stream_spec) do :ok -> - pipeline_unit + data_stream_spec {:error, reason} -> raise(BadConfigError, "Collector config failed: #{inspect(reason)}") diff --git a/lib/kafka_batcher/connection_manager.ex b/lib/kafka_batcher/connection_manager.ex index 7a654cd..9da7be5 100644 --- a/lib/kafka_batcher/connection_manager.ex +++ b/lib/kafka_batcher/connection_manager.ex @@ -132,8 +132,8 @@ defmodule KafkaBatcher.ConnectionManager do end defp start_producers(%State{config: %KafkaBatcher.Config{} = config}) do - config.pipeline_units - |> Enum.map(&KafkaBatcher.PipelineUnit.get_topic_name/1) + config.data_stream_specs + |> Enum.map(&KafkaBatcher.DataStreamSpec.get_topic_name/1) |> Enum.reduce_while( :ok, fn topic_name, _ -> diff --git a/lib/kafka_batcher/pipeline_unit.ex b/lib/kafka_batcher/data_stream_spec.ex similarity index 65% rename from lib/kafka_batcher/pipeline_unit.ex rename to lib/kafka_batcher/data_stream_spec.ex index 763d034..84c19b2 100644 --- a/lib/kafka_batcher/pipeline_unit.ex +++ b/lib/kafka_batcher/data_stream_spec.ex @@ -1,4 +1,4 @@ -defmodule KafkaBatcher.PipelineUnit do +defmodule KafkaBatcher.DataStreamSpec do @moduledoc false alias KafkaBatcher.{ @@ -29,69 +29,69 @@ defmodule KafkaBatcher.PipelineUnit do defstruct @enforce_keys @spec drop_sensitive(t()) :: t() - def drop_sensitive(%__MODULE__{} = pipeline_unit) do + def drop_sensitive(%__MODULE__{} = data_stream_spec) do %__MODULE__{ - collector_config: pipeline_unit.collector_config, - accumulator_config: pipeline_unit.accumulator_config, - producer_config: Producers.Config.drop_sensitive(pipeline_unit.producer_config), + collector_config: data_stream_spec.collector_config, + accumulator_config: data_stream_spec.accumulator_config, + producer_config: Producers.Config.drop_sensitive(data_stream_spec.producer_config), opts: [] } end @spec get_accumulator_mod(t()) :: module() - def get_accumulator_mod(%__MODULE__{} = pipeline_unit) do + def get_accumulator_mod(%__MODULE__{} = data_stream_spec) do %__MODULE__{ accumulator_config: %Accumulator.Config{accumulator_mod: accumulator_mod} - } = pipeline_unit + } = data_stream_spec accumulator_mod end @spec get_partition(t()) :: pos_integer() | nil - def get_partition(%__MODULE__{} = pipeline_unit) do + def get_partition(%__MODULE__{} = data_stream_spec) do %__MODULE__{ accumulator_config: %Accumulator.Config{partition: partition} - } = pipeline_unit + } = data_stream_spec partition end @spec set_partition(t(), pos_integer()) :: t() - def set_partition(%__MODULE__{} = pipeline_unit, partition) do + def set_partition(%__MODULE__{} = data_stream_spec, partition) do %__MODULE__{ - pipeline_unit + data_stream_spec | accumulator_config: %Accumulator.Config{ - pipeline_unit.accumulator_config + data_stream_spec.accumulator_config | partition: partition } } end @spec get_topic_name(t()) :: String.t() - def get_topic_name(%__MODULE__{} = pipeline_unit) do + def get_topic_name(%__MODULE__{} = data_stream_spec) do %__MODULE__{ collector_config: %Collector.Config{topic_name: topic_name} - } = pipeline_unit + } = data_stream_spec topic_name end @spec get_collector(t()) :: module() - def get_collector(%__MODULE__{} = pipeline_unit) do + def get_collector(%__MODULE__{} = data_stream_spec) do %__MODULE__{ collector_config: %Collector.Config{collector: collector} - } = pipeline_unit + } = data_stream_spec collector end @spec collect_by_partition?(t()) :: boolean() - def collect_by_partition?(%__MODULE__{} = pipeline_unit) do + def collect_by_partition?(%__MODULE__{} = data_stream_spec) do %__MODULE__{ collector_config: %Collector.Config{ collect_by_partition: collect_by_partition } - } = pipeline_unit + } = data_stream_spec collect_by_partition end diff --git a/lib/kafka_batcher/pipeline_unit/validator.ex b/lib/kafka_batcher/data_stream_spec/validator.ex similarity index 70% rename from lib/kafka_batcher/pipeline_unit/validator.ex rename to lib/kafka_batcher/data_stream_spec/validator.ex index 74b605b..6cff577 100644 --- a/lib/kafka_batcher/pipeline_unit/validator.ex +++ b/lib/kafka_batcher/data_stream_spec/validator.ex @@ -1,11 +1,11 @@ -defmodule KafkaBatcher.PipelineUnit.Validator do +defmodule KafkaBatcher.DataStreamSpec.Validator do @moduledoc false - alias KafkaBatcher.{Collector, PipelineUnit, Producers} + alias KafkaBatcher.{Collector, DataStreamSpec, Producers} - @spec validate(PipelineUnit.t()) :: :ok | {:error, String.t()} - def validate(%PipelineUnit{} = pipeline_unit) do - %PipelineUnit{ + @spec validate(DataStreamSpec.t()) :: :ok | {:error, String.t()} + def validate(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ collector_config: %Collector.Config{ collector: collector, partition_fn: partition_fn, @@ -14,7 +14,7 @@ defmodule KafkaBatcher.PipelineUnit.Validator do producer_config: %Producers.Config{ partition_strategy: partition_strategy } - } = pipeline_unit + } = data_stream_spec cond do collect_by_partition and is_nil(partition_fn) -> diff --git a/lib/kafka_batcher/supervisor.ex b/lib/kafka_batcher/supervisor.ex index e49abc1..fa9efbc 100644 --- a/lib/kafka_batcher/supervisor.ex +++ b/lib/kafka_batcher/supervisor.ex @@ -39,7 +39,7 @@ defmodule KafkaBatcher.Supervisor do children = [ KafkaBatcher.ConnectionManager.child_spec(config) - | build_pipeline_unit_specs(config.pipeline_units) + | build_data_stream_specs(config.data_stream_specs) ] |> Enum.reverse() @@ -47,16 +47,16 @@ defmodule KafkaBatcher.Supervisor do Supervisor.init(children, opts) end - defp build_pipeline_unit_specs(units) do - for %KafkaBatcher.PipelineUnit{} = unit <- units, reduce: [] do + defp build_data_stream_specs(specs) do + for %KafkaBatcher.DataStreamSpec{} = spec <- specs, reduce: [] do specs -> - %KafkaBatcher.PipelineUnit{ + %KafkaBatcher.DataStreamSpec{ collector_config: %Collector.Config{collector: collector} - } = unit + } = spec [ - collector.child_spec(unit), - AccumulatorsPoolSupervisor.child_spec(unit) + collector.child_spec(spec), + AccumulatorsPoolSupervisor.child_spec(spec) | specs ] end diff --git a/lib/kafka_batcher/temp_storage.ex b/lib/kafka_batcher/temp_storage.ex index 80fb88e..b0f2c96 100644 --- a/lib/kafka_batcher/temp_storage.ex +++ b/lib/kafka_batcher/temp_storage.ex @@ -27,7 +27,7 @@ defmodule KafkaBatcher.TempStorage do end defp recheck_and_update(%CollectorState{locked?: true} = state, now) do - topic = KafkaBatcher.PipelineUnit.get_topic_name(state.pipeline_unit) + topic = KafkaBatcher.DataStreamSpec.get_topic_name(state.data_stream_spec) if @storage_impl.empty?(topic) do %CollectorState{state | locked?: false, last_check_timestamp: nil} From d9ae0d731bc5e7d8bcd487dfbc5eb858ba28bf8f Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Tue, 30 Dec 2025 13:13:27 +0300 Subject: [PATCH 3/5] Improve configuration error handling --- lib/kafka_batcher/accumulator/config.ex | 8 +++++ lib/kafka_batcher/collector/config.ex | 8 +++++ lib/kafka_batcher/collector/implementation.ex | 2 +- lib/kafka_batcher/config.ex | 4 +-- lib/kafka_batcher/producers/config.ex | 35 ++++++++++++++----- .../producers/config/brod_config.ex | 15 ++++---- 6 files changed, 53 insertions(+), 19 deletions(-) diff --git a/lib/kafka_batcher/accumulator/config.ex b/lib/kafka_batcher/accumulator/config.ex index 2b66333..be78447 100644 --- a/lib/kafka_batcher/accumulator/config.ex +++ b/lib/kafka_batcher/accumulator/config.ex @@ -1,5 +1,6 @@ defmodule KafkaBatcher.Accumulator.Config do @moduledoc false + alias KafkaBatcher.Config.BadConfigError @type t :: %__MODULE__{ collector: module(), @@ -58,5 +59,12 @@ defmodule KafkaBatcher.Accumulator.Config do :accumulator_mod ]) |> then(&struct!(__MODULE__, &1)) + rescue + ArgumentError -> + reraise( + BadConfigError, + "Accumulator config failed: missing required opts: #{inspect(@enforce_keys)}", + __STACKTRACE__ + ) end end diff --git a/lib/kafka_batcher/collector/config.ex b/lib/kafka_batcher/collector/config.ex index c15905b..d142b48 100644 --- a/lib/kafka_batcher/collector/config.ex +++ b/lib/kafka_batcher/collector/config.ex @@ -1,6 +1,7 @@ defmodule KafkaBatcher.Collector.Config do @moduledoc false + alias KafkaBatcher.Config.BadConfigError alias KafkaBatcher.MessageObject @typep topic :: String.t() @@ -40,5 +41,12 @@ defmodule KafkaBatcher.Collector.Config do :collect_by_partition ]) |> then(&struct!(__MODULE__, &1)) + rescue + ArgumentError -> + reraise( + BadConfigError, + "Accumulator config failed: missing required opts: #{inspect(@enforce_keys)}", + __STACKTRACE__ + ) end end diff --git a/lib/kafka_batcher/collector/implementation.ex b/lib/kafka_batcher/collector/implementation.ex index ff2c4fa..46bd8aa 100644 --- a/lib/kafka_batcher/collector/implementation.ex +++ b/lib/kafka_batcher/collector/implementation.ex @@ -19,7 +19,7 @@ defmodule KafkaBatcher.Collector.Implementation do def choose_partition( %MessageObject{key: key, value: value}, - %KafkaBatcher.DataStreamSpec{} = data_stream_spec, + %DataStreamSpec{} = data_stream_spec, partitions_count ) do %Collector.Config{ diff --git a/lib/kafka_batcher/config.ex b/lib/kafka_batcher/config.ex index 0f2c67a..8b66eff 100644 --- a/lib/kafka_batcher/config.ex +++ b/lib/kafka_batcher/config.ex @@ -55,9 +55,7 @@ defmodule KafkaBatcher.Config do @spec build_config!(opts :: Keyword.t()) :: t() def build_config!(opts) do producer_config = - opts - |> Keyword.fetch!(:kafka) - |> Producers.Config.build!() + opts |> Keyword.get(:kafka, []) |> Producers.Config.build!() data_stream_specs = for collector <- Keyword.fetch!(opts, :collectors) do diff --git a/lib/kafka_batcher/producers/config.ex b/lib/kafka_batcher/producers/config.ex index f88c45f..f45783e 100644 --- a/lib/kafka_batcher/producers/config.ex +++ b/lib/kafka_batcher/producers/config.ex @@ -1,7 +1,9 @@ defmodule KafkaBatcher.Producers.Config do @moduledoc false - alias KafkaBatcher.{MessageObject, Producers.Config.BrodConfig} + alias KafkaBatcher.Config.BadConfigError + alias KafkaBatcher.MessageObject + alias KafkaBatcher.Producers.Config.BrodConfig @typep topic :: String.t() @typep partition_count :: pos_integer() @@ -53,14 +55,8 @@ defmodule KafkaBatcher.Producers.Config do @spec build!(opts :: Keyword.t()) :: t() def build!(opts) do - endpoints = - for url <- opts |> Keyword.fetch!(:endpoints) |> String.split(",") do - [host, port] = String.split(url, ":") - {host, :erlang.binary_to_integer(port)} - end - %__MODULE__{ - endpoints: endpoints, + endpoints: build_endpoints!(opts), client_name: Keyword.get(opts, :client_name, :kafka_producer_client), partition_strategy: Keyword.get(opts, :partition_strategy), required_acks: Keyword.get(opts, :required_acks, -1), @@ -68,4 +64,27 @@ defmodule KafkaBatcher.Producers.Config do brod_config: BrodConfig.build!(opts) } end + + defp build_endpoints!(opts) do + case Keyword.fetch(opts, :endpoints) do + {:ok, endpoints} when is_binary(endpoints) -> + for url <- String.split(endpoints, ","), do: parse_endpoint!(url) + + {:ok, endpoints} -> + raise(BadConfigError, "Producer config failed: non-string endpoints given #{inspect(endpoints)}") + + :error -> + raise(BadConfigError, "Producer config failed: no endpoints given") + end + end + + defp parse_endpoint!(endpoint) do + case URI.parse("//" <> endpoint) do + %URI{host: host, port: port} when not is_nil(host) and not is_nil(port) -> + {host, port} + + _ -> + raise(BadConfigError, "Producer config failed: invalid endpoint url format #{inspect(endpoint)}") + end + end end diff --git a/lib/kafka_batcher/producers/config/brod_config.ex b/lib/kafka_batcher/producers/config/brod_config.ex index b6a3242..74433c6 100644 --- a/lib/kafka_batcher/producers/config/brod_config.ex +++ b/lib/kafka_batcher/producers/config/brod_config.ex @@ -45,7 +45,7 @@ defmodule KafkaBatcher.Producers.Config.BrodConfig do @spec build!(Keyword.t()) :: t() def build!(opts) do sasl_config = - case validate_sasl_config(Keyword.get(opts, :sasl)) do + case build_sasl_config(Keyword.get(opts, :sasl)) do {:ok, sasl_config} -> sasl_config @@ -62,30 +62,31 @@ defmodule KafkaBatcher.Producers.Config.BrodConfig do } end - @spec validate_sasl_config(map() | nil) :: + @spec build_sasl_config(map() | nil) :: {:ok, sasl()} | {:error, {:invalid, map()}} - defp validate_sasl_config( + defp build_sasl_config( %{ mechanism: mechanism, login: login, password: password } = config - ) do + ) + when is_binary(login) and is_binary(password) do mechanism_valid? = mechanism in [:plain, :scram_sha_256, :scram_sha_512] - if mechanism_valid? and password != nil and login != nil do + if mechanism_valid? and String.valid?(login) and String.valid?(password) do {:ok, {mechanism, login, password}} else {:error, {:invalid, config}} end end - defp validate_sasl_config(sasl_config) + defp build_sasl_config(sasl_config) when sasl_config == nil or sasl_config == %{} do {:ok, :undefined} end - defp validate_sasl_config(bad_sasl_config) do + defp build_sasl_config(bad_sasl_config) do {:error, {:invalid, bad_sasl_config}} end end From bfdee3a95a2da3de27dec3a374538f04a7d4d8bd Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Mon, 12 Jan 2026 12:48:32 +0300 Subject: [PATCH 4/5] Move default client name to a variable and add client_name doc --- README.md | 51 ++++++++++++++----------- lib/kafka_batcher/connection_manager.ex | 12 ++++-- lib/kafka_batcher/producers/config.ex | 7 +++- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index a1a89dd..31bb911 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ A library to increase the throughput of producing messages (coming one at a time def start(_type, _args) do children = [ # Describe the child spec - KafkaBatcher.Supervisor + {KafkaBatcher.Supervisor, Application.fetch_env!(:kafka_batcher, :default_client)} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor, max_restarts: 3, max_seconds: 5] @@ -36,32 +36,36 @@ A library to increase the throughput of producing messages (coming one at a time Config example: ```elixir - config :kafka_batcher, KafkaBatcher.Collector1, topic_name: "topic1" - config :kafka_batcher, KafkaBatcher.Collector2, topic_name: "topic2" - config :kafka_batcher, KafkaBatcher.Collector3, topic_name: "topic3" - config :kafka_batcher, KafkaBatcher.Collector4, topic_name: "topic4" - config :kafka_batcher, KafkaBatcher.Collector5, topic_name: "topic5" - - config :kafka_batcher, collectors: - [ - KafkaBatcher.Collector1, - KafkaBatcher.Collector2, - KafkaBatcher.Collector3, - KafkaBatcher.Collector4, - KafkaBatcher.Collector5 - ] + config :kafka_batcher, :default_client, [ + {KafkaBatcher.Collector1, [topic_name: "topic1"]}, + {KafkaBatcher.Collector2, [topic_name: "topic2"]}, + {KafkaBatcher.Collector3, [topic_name: "topic3"]}, + {KafkaBatcher.Collector4, [topic_name: "topic4"]}, + {KafkaBatcher.Collector5, [topic_name: "topic5"]} + ] - config :kafka_batcher, :kafka, - endpoints: "localhost:9092", - # in case you use SASL - # sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"}, - # ssl: true, - telemetry: true, - allow_topic_auto_creation: false, + config :kafka_batcher, :default_client, + collectors: [ + KafkaBatcher.Collector1, + KafkaBatcher.Collector2, + KafkaBatcher.Collector3, + KafkaBatcher.Collector4, + KafkaBatcher.Collector5 + ], + kafka: [ + client_name: :default_client, + endpoints: "localhost:9092", + # in case you use SASL + # sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"}, + # ssl: true, + telemetry: true, + allow_topic_auto_creation: false + ], kafka_topic_aliases: %{ "real_topic_name1" => "incoming-events", "real_topic_name2" => "special-topic" - } + }, + kafka_metric_opts: [] # In case you use KafkaEx, you need to disable default worker to avoid crashes config :kafka_ex, :disable_default_worker, true @@ -88,6 +92,7 @@ Available parameters: * `:ssl` - optional parameter. Ssl should be type boolean(). By default `:ssl` is `false`. * `:min_delay` - optional parameter. Set minimal delay before send events. This parameter allows to increase max throughput in case when you get more messages (in term of count per second) than you expected when set `batch_size` parameter. * `:max_batch_bytesize` - optional parameter. Allows to set a limit on the maximum batch size. By default it is 1_000_000 bytes. +* `:client_name` - client name that is used to start a producer. Included in metrics namespace. Defaults to `:kafka_producer_client`. **Important:** The size of one message should not exceed `max_batch_bytesize` setting. If you need to work with large messages you must increase `max_batch_bytesize` value and value of Kafka topic setting `max.message.bytes` as well. diff --git a/lib/kafka_batcher/connection_manager.ex b/lib/kafka_batcher/connection_manager.ex index 9da7be5..c410927 100644 --- a/lib/kafka_batcher/connection_manager.ex +++ b/lib/kafka_batcher/connection_manager.ex @@ -48,9 +48,9 @@ defmodule KafkaBatcher.ConnectionManager do end @doc "Checks that Kafka client is already started" - @spec client_started?(Producers.Config.t()) :: boolean() - def client_started?(%Producers.Config{} = producer_config) do - GenServer.call(reg_name(producer_config), :client_started?) + @spec client_started?(client_name :: atom()) :: boolean() + def client_started?(client_name \\ Producers.Config.default_client_name()) do + GenServer.call(reg_name(client_name), :client_started?) end ## @@ -190,6 +190,10 @@ defmodule KafkaBatcher.ConnectionManager do end defp reg_name(%Producers.Config{} = producer_config) do - :"#{__MODULE__}.#{producer_config.client_name}" + reg_name(producer_config.client_name) + end + + defp reg_name(client_name) when is_atom(client_name) do + :"#{__MODULE__}.#{client_name}" end end diff --git a/lib/kafka_batcher/producers/config.ex b/lib/kafka_batcher/producers/config.ex index f45783e..82284c7 100644 --- a/lib/kafka_batcher/producers/config.ex +++ b/lib/kafka_batcher/producers/config.ex @@ -23,6 +23,8 @@ defmodule KafkaBatcher.Producers.Config do brod_config: BrodConfig.t() } + @default_client_name :kafka_producer_client + @enforce_keys [ :endpoints, :client_name, @@ -53,11 +55,14 @@ defmodule KafkaBatcher.Producers.Config do } end + @spec default_client_name :: :kafka_producer_client + def default_client_name, do: @default_client_name + @spec build!(opts :: Keyword.t()) :: t() def build!(opts) do %__MODULE__{ endpoints: build_endpoints!(opts), - client_name: Keyword.get(opts, :client_name, :kafka_producer_client), + client_name: Keyword.get(opts, :client_name, @default_client_name), partition_strategy: Keyword.get(opts, :partition_strategy), required_acks: Keyword.get(opts, :required_acks, -1), telemetry: Keyword.get(opts, :telemetry, true), From 514fd8e0952db15f3395aabe51a8caab84705f9e Mon Sep 17 00:00:00 2001 From: Gleb Ivanov Date: Mon, 12 Jan 2026 13:00:47 +0300 Subject: [PATCH 5/5] Rename collector->collector_mod --- lib/kafka_batcher/accumulator.ex | 3 ++- lib/kafka_batcher/accumulator/config.ex | 8 ++++---- lib/kafka_batcher/collector/config.ex | 8 ++++---- lib/kafka_batcher/config.ex | 12 ++++++------ lib/kafka_batcher/data_stream_spec.ex | 8 ++++---- lib/kafka_batcher/data_stream_spec/validator.ex | 6 +++--- lib/kafka_batcher/supervisor.ex | 4 ++-- 7 files changed, 25 insertions(+), 24 deletions(-) diff --git a/lib/kafka_batcher/accumulator.ex b/lib/kafka_batcher/accumulator.ex index 9f9e206..abf51e9 100644 --- a/lib/kafka_batcher/accumulator.ex +++ b/lib/kafka_batcher/accumulator.ex @@ -97,7 +97,8 @@ defmodule KafkaBatcher.Accumulator do {:noreply, new_state} {:error, _reason, new_state} -> - DataStreamSpec.get_collector(state.data_stream_spec).set_lock() + collector_mod = DataStreamSpec.get_collector_mod(state.data_stream_spec) + collector_mod.set_lock() {:noreply, new_state} end end diff --git a/lib/kafka_batcher/accumulator/config.ex b/lib/kafka_batcher/accumulator/config.ex index be78447..79d7a39 100644 --- a/lib/kafka_batcher/accumulator/config.ex +++ b/lib/kafka_batcher/accumulator/config.ex @@ -3,7 +3,7 @@ defmodule KafkaBatcher.Accumulator.Config do alias KafkaBatcher.Config.BadConfigError @type t :: %__MODULE__{ - collector: module(), + collector_mod: module(), topic_name: String.t(), partition: pos_integer() | nil, batch_flusher: module(), @@ -15,7 +15,7 @@ defmodule KafkaBatcher.Accumulator.Config do accumulator_mod: module() } - @enforce_keys [:collector, :topic_name] + @enforce_keys [:collector_mod, :topic_name] defstruct @enforce_keys ++ [ :partition, @@ -31,7 +31,7 @@ defmodule KafkaBatcher.Accumulator.Config do @spec to_kwlist(t()) :: Keyword.t() def to_kwlist(%__MODULE__{} = config) do [ - collector: config.collector, + collector_mod: config.collector_mod, topic_name: config.topic_name, partition: config.partition, batch_flusher: config.batch_flusher, @@ -48,7 +48,7 @@ defmodule KafkaBatcher.Accumulator.Config do def build!(opts) do opts |> Keyword.take([ - :collector, + :collector_mod, :topic_name, :batch_flusher, :max_wait_time, diff --git a/lib/kafka_batcher/collector/config.ex b/lib/kafka_batcher/collector/config.ex index d142b48..425688f 100644 --- a/lib/kafka_batcher/collector/config.ex +++ b/lib/kafka_batcher/collector/config.ex @@ -12,19 +12,19 @@ defmodule KafkaBatcher.Collector.Config do pos_integer()) @type t :: %__MODULE__{ - collector: module(), + collector_mod: module(), topic_name: String.t(), partition_fn: partition_fn() | nil, collect_by_partition: boolean() } - @enforce_keys [:collector, :topic_name] + @enforce_keys [:collector_mod, :topic_name] defstruct @enforce_keys ++ [:partition_fn, collect_by_partition: false] @spec to_kwlist(t()) :: Keyword.t() def to_kwlist(%__MODULE__{} = config) do [ - collector: config.collector, + collector_mod: config.collector_mod, topic_name: config.topic_name, partition_fn: config.partition_fn, collect_by_partition: config.collect_by_partition @@ -35,7 +35,7 @@ defmodule KafkaBatcher.Collector.Config do def build!(opts) do opts |> Keyword.take([ - :collector, + :collector_mod, :topic_name, :partition_fn, :collect_by_partition diff --git a/lib/kafka_batcher/config.ex b/lib/kafka_batcher/config.ex index 8b66eff..02dadfc 100644 --- a/lib/kafka_batcher/config.ex +++ b/lib/kafka_batcher/config.ex @@ -58,9 +58,9 @@ defmodule KafkaBatcher.Config do opts |> Keyword.get(:kafka, []) |> Producers.Config.build!() data_stream_specs = - for collector <- Keyword.fetch!(opts, :collectors) do + for collector_mod <- Keyword.fetch!(opts, :collectors) do producer_config - |> build_data_stream_spec!(collector, opts) + |> build_data_stream_spec!(collector_mod, opts) |> validate_data_stream_spec!() end @@ -72,12 +72,12 @@ defmodule KafkaBatcher.Config do } end - defp build_data_stream_spec!(producer_config, collector, opts) do + defp build_data_stream_spec!(producer_config, collector_mod, opts) do opts = opts - |> Keyword.merge(get_compile_opts!(collector)) - |> Keyword.merge(Keyword.get(opts, collector, [])) - |> Keyword.put(:collector, collector) + |> Keyword.merge(get_compile_opts!(collector_mod)) + |> Keyword.merge(Keyword.get(opts, collector_mod, [])) + |> Keyword.put(:collector_mod, collector_mod) collector_config = Collector.Config.build!(opts) accumulator_config = Accumulator.Config.build!(opts) diff --git a/lib/kafka_batcher/data_stream_spec.ex b/lib/kafka_batcher/data_stream_spec.ex index 84c19b2..70af9fb 100644 --- a/lib/kafka_batcher/data_stream_spec.ex +++ b/lib/kafka_batcher/data_stream_spec.ex @@ -76,13 +76,13 @@ defmodule KafkaBatcher.DataStreamSpec do topic_name end - @spec get_collector(t()) :: module() - def get_collector(%__MODULE__{} = data_stream_spec) do + @spec get_collector_mod(t()) :: module() + def get_collector_mod(%__MODULE__{} = data_stream_spec) do %__MODULE__{ - collector_config: %Collector.Config{collector: collector} + collector_config: %Collector.Config{collector_mod: collector_mod} } = data_stream_spec - collector + collector_mod end @spec collect_by_partition?(t()) :: boolean() diff --git a/lib/kafka_batcher/data_stream_spec/validator.ex b/lib/kafka_batcher/data_stream_spec/validator.ex index 6cff577..85a2cea 100644 --- a/lib/kafka_batcher/data_stream_spec/validator.ex +++ b/lib/kafka_batcher/data_stream_spec/validator.ex @@ -7,7 +7,7 @@ defmodule KafkaBatcher.DataStreamSpec.Validator do def validate(%DataStreamSpec{} = data_stream_spec) do %DataStreamSpec{ collector_config: %Collector.Config{ - collector: collector, + collector_mod: collector_mod, partition_fn: partition_fn, collect_by_partition: collect_by_partition }, @@ -18,10 +18,10 @@ defmodule KafkaBatcher.DataStreamSpec.Validator do cond do collect_by_partition and is_nil(partition_fn) -> - {:error, "collector #{inspect(collector)}. Not found required key :partition_fn"} + {:error, "collector #{inspect(collector_mod)}. Not found required key :partition_fn"} not collect_by_partition and is_nil(partition_strategy) -> - {:error, "collector #{inspect(collector)}. Not found required key :partition_strategy"} + {:error, "collector #{inspect(collector_mod)}. Not found required key :partition_strategy"} :otherwise -> :ok diff --git a/lib/kafka_batcher/supervisor.ex b/lib/kafka_batcher/supervisor.ex index fa9efbc..c318587 100644 --- a/lib/kafka_batcher/supervisor.ex +++ b/lib/kafka_batcher/supervisor.ex @@ -51,11 +51,11 @@ defmodule KafkaBatcher.Supervisor do for %KafkaBatcher.DataStreamSpec{} = spec <- specs, reduce: [] do specs -> %KafkaBatcher.DataStreamSpec{ - collector_config: %Collector.Config{collector: collector} + collector_config: %Collector.Config{collector_mod: collector_mod} } = spec [ - collector.child_spec(spec), + collector_mod.child_spec(spec), AccumulatorsPoolSupervisor.child_spec(spec) | specs ]