diff --git a/.dialyzer_ignore.exs b/.dialyzer_ignore.exs index ae33082..59ab4be 100644 --- a/.dialyzer_ignore.exs +++ b/.dialyzer_ignore.exs @@ -1,4 +1,4 @@ [ # If we compile with another @storage_impl lib/kafka_batcher/temp_storage.ex:33 become reachable - {"lib/kafka_batcher/temp_storage.ex", :guard_fail, 30} + {"lib/kafka_batcher/temp_storage.ex", :guard_fail, 32} ] diff --git a/README.md b/README.md index a1a89dd..31bb911 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ A library to increase the throughput of producing messages (coming one at a time def start(_type, _args) do children = [ # Describe the child spec - KafkaBatcher.Supervisor + {KafkaBatcher.Supervisor, Application.fetch_env!(:kafka_batcher, :default_client)} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor, max_restarts: 3, max_seconds: 5] @@ -36,32 +36,36 @@ A library to increase the throughput of producing messages (coming one at a time Config example: ```elixir - config :kafka_batcher, KafkaBatcher.Collector1, topic_name: "topic1" - config :kafka_batcher, KafkaBatcher.Collector2, topic_name: "topic2" - config :kafka_batcher, KafkaBatcher.Collector3, topic_name: "topic3" - config :kafka_batcher, KafkaBatcher.Collector4, topic_name: "topic4" - config :kafka_batcher, KafkaBatcher.Collector5, topic_name: "topic5" - - config :kafka_batcher, collectors: - [ - KafkaBatcher.Collector1, - KafkaBatcher.Collector2, - KafkaBatcher.Collector3, - KafkaBatcher.Collector4, - KafkaBatcher.Collector5 - ] + config :kafka_batcher, :default_client, [ + {KafkaBatcher.Collector1, [topic_name: "topic1"]}, + {KafkaBatcher.Collector2, [topic_name: "topic2"]}, + {KafkaBatcher.Collector3, [topic_name: "topic3"]}, + {KafkaBatcher.Collector4, [topic_name: "topic4"]}, + {KafkaBatcher.Collector5, [topic_name: "topic5"]} + ] - config :kafka_batcher, :kafka, - endpoints: "localhost:9092", - # in case you use SASL - # sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"}, - # ssl: true, - telemetry: true, - allow_topic_auto_creation: false, + config :kafka_batcher, :default_client, + collectors: [ + KafkaBatcher.Collector1, + KafkaBatcher.Collector2, + KafkaBatcher.Collector3, + KafkaBatcher.Collector4, + KafkaBatcher.Collector5 + ], + kafka: [ + client_name: :default_client, + endpoints: "localhost:9092", + # in case you use SASL + # sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"}, + # ssl: true, + telemetry: true, + allow_topic_auto_creation: false + ], kafka_topic_aliases: %{ "real_topic_name1" => "incoming-events", "real_topic_name2" => "special-topic" - } + }, + kafka_metric_opts: [] # In case you use KafkaEx, you need to disable default worker to avoid crashes config :kafka_ex, :disable_default_worker, true @@ -88,6 +92,7 @@ Available parameters: * `:ssl` - optional parameter. Ssl should be type boolean(). By default `:ssl` is `false`. * `:min_delay` - optional parameter. Set minimal delay before send events. This parameter allows to increase max throughput in case when you get more messages (in term of count per second) than you expected when set `batch_size` parameter. * `:max_batch_bytesize` - optional parameter. Allows to set a limit on the maximum batch size. By default it is 1_000_000 bytes. +* `:client_name` - client name that is used to start a producer. Included in metrics namespace. Defaults to `:kafka_producer_client`. **Important:** The size of one message should not exceed `max_batch_bytesize` setting. If you need to work with large messages you must increase `max_batch_bytesize` value and value of Kafka topic setting `max.message.bytes` as well. diff --git a/lib/kafka_batcher.ex b/lib/kafka_batcher.ex index 7d492d0..5d17b72 100644 --- a/lib/kafka_batcher.ex +++ b/lib/kafka_batcher.ex @@ -67,6 +67,8 @@ defmodule KafkaBatcher do """ defstruct key: "", value: "", headers: [] - @type t :: %MessageObject{key: binary(), value: map() | binary(), headers: list()} + @type key :: binary() + @type value :: map() | binary() + @type t :: %MessageObject{key: key(), value: value(), headers: list()} end end diff --git a/lib/kafka_batcher/accumulator.ex b/lib/kafka_batcher/accumulator.ex index ec3d5b8..abf51e9 100644 --- a/lib/kafka_batcher/accumulator.ex +++ b/lib/kafka_batcher/accumulator.ex @@ -5,7 +5,15 @@ defmodule KafkaBatcher.Accumulator do See details how it works in KafkaBatcher.Accumulator.State module """ - alias KafkaBatcher.{Accumulator.State, MessageObject, TempStorage} + alias KafkaBatcher.{ + Accumulator, + Accumulator.State, + DataStreamSpec, + MessageObject, + Producers, + TempStorage + } + alias KafkaBatcher.Behaviours.Collector, as: CollectorBehaviour @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @@ -14,25 +22,34 @@ defmodule KafkaBatcher.Accumulator do use GenServer require Logger - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: reg_name(args)) + @spec start_link(DataStreamSpec.t()) :: GenServer.on_start() + def start_link(%DataStreamSpec{} = data_stream_spec) do + GenServer.start_link( + __MODULE__, + data_stream_spec, + name: reg_name(data_stream_spec) + ) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(args) do - {accumulator_mod, args} = Keyword.pop(args, :accumulator_mod, __MODULE__) - + @spec child_spec(DataStreamSpec.t()) :: Supervisor.child_spec() + def child_spec(%DataStreamSpec{} = data_stream_spec) do %{ - id: reg_name(args), - start: {accumulator_mod, :start_link, [args]} + id: reg_name(data_stream_spec), + start: { + DataStreamSpec.get_accumulator_mod(data_stream_spec), + :start_link, + [data_stream_spec] + } } end @doc """ Finds appropriate Accumulator process by topic & partition and dispatches `event` to it """ - def add_event(%MessageObject{} = event, topic_name, partition \\ nil) do - GenServer.call(reg_name(topic_name: topic_name, partition: partition), {:add_event, event}) + @spec add_event(MessageObject.t(), DataStreamSpec.t()) :: :ok | {:error, term()} + def add_event(%MessageObject{} = event, %DataStreamSpec{} = data_stream_spec) do + GenServer.call(reg_name(data_stream_spec), {:add_event, event}) catch _, _reason -> Logger.warning("KafkaBatcher: Couldn't get through to accumulator") @@ -43,15 +60,17 @@ defmodule KafkaBatcher.Accumulator do ## Callbacks ## @impl GenServer - def init(args) do + def init(%DataStreamSpec{} = data_stream_spec) do Process.flag(:trap_exit, true) - state = build_state(args) + + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) + partition = DataStreamSpec.get_partition(data_stream_spec) Logger.debug(""" - KafkaBatcher: Accumulator process started: topic #{state.topic_name} partition #{state.partition} pid #{inspect(self())} + KafkaBatcher: Accumulator process started: topic #{topic_name} partition #{partition} pid #{inspect(self())} """) - {:ok, state} + {:ok, %State{data_stream_spec: data_stream_spec}} end @impl GenServer @@ -78,7 +97,8 @@ defmodule KafkaBatcher.Accumulator do {:noreply, new_state} {:error, _reason, new_state} -> - state.collector.set_lock() + collector_mod = DataStreamSpec.get_collector_mod(state.data_stream_spec) + collector_mod.set_lock() {:noreply, new_state} end end @@ -92,7 +112,7 @@ defmodule KafkaBatcher.Accumulator do def handle_info(term, state) do Logger.warning(""" KafkaBatcher: Unknown message #{inspect(term)} to #{__MODULE__}.handle_info/2. - Current state: #{inspect(drop_sensitive(state))} + Current state: #{inspect(state)} """) {:noreply, state} @@ -104,12 +124,14 @@ defmodule KafkaBatcher.Accumulator do end @impl GenServer - def format_status(_reason, [pdict, state]) do - [pdict, drop_sensitive(state)] - end - - defp drop_sensitive(%State{config: config} = state) do - %State{state | config: Keyword.drop(config, [:sasl])} + def format_status(_reason, [pdict, %State{} = state]) do + [ + pdict, + %State{ + state + | data_stream_spec: DataStreamSpec.drop_sensitive(state.data_stream_spec) + } + ] end defp cleanup(%{pending_messages: [], messages_to_produce: []}) do @@ -122,7 +144,11 @@ defmodule KafkaBatcher.Accumulator do end defp set_cleanup_timer_if_not_exists(%State{cleanup_timer_ref: nil} = state) do - ref = :erlang.start_timer(state.max_wait_time, self(), :cleanup) + %DataStreamSpec{ + accumulator_config: %Accumulator.Config{max_wait_time: max_wait_time} + } = state.data_stream_spec + + ref = :erlang.start_timer(max_wait_time, self(), :cleanup) %State{state | cleanup_timer_ref: ref} end @@ -161,37 +187,26 @@ defmodule KafkaBatcher.Accumulator do @spec produce_list(messages :: [CollectorBehaviour.event()], state :: State.t()) :: :ok | {:error, any()} defp produce_list(messages, state) when is_list(messages) do - @producer.produce_list(messages, state.topic_name, state.partition, state.config) + @producer.produce_list(state.config, messages, state.topic_name, state.partition) catch _, reason -> {:error, reason} end - defp build_state(args) do - config = Keyword.fetch!(args, :config) - - %State{ - topic_name: Keyword.fetch!(args, :topic_name), - partition: Keyword.get(args, :partition), - config: config, - batch_flusher: Keyword.fetch!(config, :batch_flusher), - batch_size: Keyword.fetch!(config, :batch_size), - max_wait_time: Keyword.fetch!(config, :max_wait_time), - min_delay: Keyword.fetch!(config, :min_delay), - max_batch_bytesize: Keyword.fetch!(config, :max_batch_bytesize), - collector: Keyword.fetch!(args, :collector) - } - end + defp reg_name(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ + producer_config: %Producers.Config{client_name: client_name} + } = data_stream_spec - defp reg_name(args) do - topic_name = Keyword.fetch!(args, :topic_name) + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) + partition = DataStreamSpec.get_partition(data_stream_spec) - case Keyword.get(args, :partition) do + case partition do nil -> - :"#{__MODULE__}.#{topic_name}" + :"#{__MODULE__}.#{client_name}.#{topic_name}" partition -> - :"#{__MODULE__}.#{topic_name}.#{partition}" + :"#{__MODULE__}.#{client_name}.#{topic_name}.#{partition}" end end end diff --git a/lib/kafka_batcher/accumulator/config.ex b/lib/kafka_batcher/accumulator/config.ex new file mode 100644 index 0000000..79d7a39 --- /dev/null +++ b/lib/kafka_batcher/accumulator/config.ex @@ -0,0 +1,70 @@ +defmodule KafkaBatcher.Accumulator.Config do + @moduledoc false + alias KafkaBatcher.Config.BadConfigError + + @type t :: %__MODULE__{ + collector_mod: module(), + topic_name: String.t(), + partition: pos_integer() | nil, + batch_flusher: module(), + max_wait_time: pos_integer(), + batch_size: pos_integer(), + min_delay: non_neg_integer(), + max_batch_bytesize: pos_integer(), + max_accumulator_restarts: pos_integer(), + accumulator_mod: module() + } + + @enforce_keys [:collector_mod, :topic_name] + defstruct @enforce_keys ++ + [ + :partition, + batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher, + max_wait_time: 1_000, + batch_size: 10, + min_delay: 0, + max_batch_bytesize: 1_000_000, + max_accumulator_restarts: 100, + accumulator_mod: KafkaBatcher.Accumulator + ] + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + collector_mod: config.collector_mod, + topic_name: config.topic_name, + partition: config.partition, + batch_flusher: config.batch_flusher, + max_wait_time: config.max_wait_time, + batch_size: config.batch_size, + min_delay: config.min_delay, + max_batch_bytesize: config.max_batch_bytesize, + max_accumulator_restarts: config.max_accumulator_restarts, + accumulator_mod: config.accumulator_mod + ] + end + + @spec build!(opts :: Keyword.t()) :: t() + def build!(opts) do + opts + |> Keyword.take([ + :collector_mod, + :topic_name, + :batch_flusher, + :max_wait_time, + :batch_size, + :min_delay, + :max_batch_bytesize, + :max_accumulator_restarts, + :accumulator_mod + ]) + |> then(&struct!(__MODULE__, &1)) + rescue + ArgumentError -> + reraise( + BadConfigError, + "Accumulator config failed: missing required opts: #{inspect(@enforce_keys)}", + __STACKTRACE__ + ) + end +end diff --git a/lib/kafka_batcher/accumulator/state.ex b/lib/kafka_batcher/accumulator/state.ex index f914e53..90e4990 100644 --- a/lib/kafka_batcher/accumulator/state.ex +++ b/lib/kafka_batcher/accumulator/state.ex @@ -11,46 +11,37 @@ defmodule KafkaBatcher.Accumulator.State do * timer expired (in case when a few events arrived timer helps to control that the max waiting time is not exceeded) """ - alias KafkaBatcher.{Accumulator.State, MessageObject} + alias KafkaBatcher.{ + Accumulator, + Accumulator.State, + DataStreamSpec, + MessageObject + } + @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @type t :: %State{ - topic_name: binary(), - partition: non_neg_integer() | nil, - config: Keyword.t(), + data_stream_spec: KafkaBatcher.DataStreamSpec.t(), pending_messages: list(), last_produced_at: non_neg_integer(), - batch_flusher: atom(), - batch_size: non_neg_integer(), - max_wait_time: non_neg_integer(), - min_delay: non_neg_integer(), - max_batch_bytesize: non_neg_integer(), batch_bytesize: non_neg_integer(), pending_messages_count: non_neg_integer(), - producer_config: Keyword.t(), messages_to_produce: list(), cleanup_timer_ref: reference() | nil, - status: atom(), - collector: atom() | nil + status: atom() } - defstruct topic_name: nil, - partition: nil, - config: [], - pending_messages: [], - last_produced_at: 0, - batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher, - batch_size: 0, - max_wait_time: 0, - min_delay: 0, - max_batch_bytesize: 0, - batch_bytesize: 0, - pending_messages_count: 0, - producer_config: [], - messages_to_produce: [], - cleanup_timer_ref: nil, - status: :continue, - collector: nil + @enforce_keys [:data_stream_spec] + defstruct @enforce_keys ++ + [ + pending_messages: [], + last_produced_at: 0, + batch_bytesize: 0, + pending_messages_count: 0, + messages_to_produce: [], + cleanup_timer_ref: nil, + status: :continue + ] @spec add_new_message(State.t(), MessageObject.t(), non_neg_integer()) :: State.t() def add_new_message(%State{} = state, %MessageObject{key: key, value: value} = event, now) do @@ -88,15 +79,22 @@ defmodule KafkaBatcher.Accumulator.State do end defp consider_max_bytesize(%State{status: :continue, batch_bytesize: batch_bytesize} = state, new_message) do + %DataStreamSpec{ + accumulator_config: %Accumulator.Config{max_batch_bytesize: max_batch_bytesize} + } = state.data_stream_spec + + topic_name = DataStreamSpec.get_topic_name(state.data_stream_spec) + partition = DataStreamSpec.get_partition(state.data_stream_spec) + message_size = :erlang.external_size(new_message) - case batch_bytesize + message_size >= state.max_batch_bytesize do - true when message_size >= state.max_batch_bytesize -> + case batch_bytesize + message_size >= max_batch_bytesize do + true when message_size >= max_batch_bytesize -> @error_notifier.report( type: "KafkaBatcherProducerError", message: """ - event#produce topic=#{state.topic_name} partition=#{state.partition}. - Message size #{inspect(message_size)} exceeds limit #{inspect(state.max_batch_bytesize)} + event#produce topic=#{topic_name} partition=#{partition}. + Message size #{inspect(message_size)} exceeds limit #{inspect(max_batch_bytesize)} """ ) @@ -111,7 +109,11 @@ defmodule KafkaBatcher.Accumulator.State do end defp consider_max_size_and_wait_time(%State{status: :continue} = state, now) do - if state.pending_messages_count >= state.batch_size and now - state.last_produced_at >= state.min_delay do + %DataStreamSpec{ + accumulator_config: %Accumulator.Config{batch_size: batch_size, min_delay: min_delay} + } = state.data_stream_spec + + if state.pending_messages_count >= batch_size and now - state.last_produced_at >= min_delay do mark_as_ready(state) else state @@ -121,7 +123,11 @@ defmodule KafkaBatcher.Accumulator.State do defp consider_max_size_and_wait_time(%State{status: :ready} = state, _), do: state defp consider_istant_flush(%State{status: :continue} = state, key, value) do - if state.batch_flusher.flush?(key, value) do + %DataStreamSpec{ + accumulator_config: %Accumulator.Config{batch_flusher: batch_flusher} + } = state.data_stream_spec + + if batch_flusher.flush?(key, value) do mark_as_ready(state) else state diff --git a/lib/kafka_batcher/accumulators_pool_supervisor.ex b/lib/kafka_batcher/accumulators_pool_supervisor.ex index 7c98a17..75e54f0 100644 --- a/lib/kafka_batcher/accumulators_pool_supervisor.ex +++ b/lib/kafka_batcher/accumulators_pool_supervisor.ex @@ -5,40 +5,59 @@ defmodule KafkaBatcher.AccumulatorsPoolSupervisor do use DynamicSupervisor - alias KafkaBatcher.Accumulator + alias KafkaBatcher.{Accumulator, DataStreamSpec, Producers} @dialyzer {:no_return, {:init, 1}} - def start_link(config) do - DynamicSupervisor.start_link(__MODULE__, config, name: reg_name(config)) + def start_link(%DataStreamSpec{} = data_stream_spec) do + DynamicSupervisor.start_link( + __MODULE__, + data_stream_spec, + name: reg_name(data_stream_spec) + ) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(config) do + def child_spec(%DataStreamSpec{} = data_stream_spec) do %{ - id: reg_name(config), - start: {__MODULE__, :start_link, [config]}, + id: reg_name(data_stream_spec), + start: {__MODULE__, :start_link, [data_stream_spec]}, type: :supervisor } end - def init(config) do + def init(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ + accumulator_config: %Accumulator.Config{ + max_accumulator_restarts: max_accumulator_restarts + } + } = data_stream_spec + # max_restarts value depends on partitions count in case when partitioned accumulation is used. # For example: 100 max_restarts -> 10 process restarts per second for 1 topic with 10 partitions DynamicSupervisor.init( strategy: :one_for_one, restart: :permanent, - max_restarts: Keyword.get(config, :max_restart, 100), + max_restarts: max_accumulator_restarts, max_seconds: 1, extra_arguments: [] ) end - def start_accumulator(args) do - DynamicSupervisor.start_child(reg_name(args), Accumulator.child_spec(args)) + def start_accumulator(%DataStreamSpec{} = data_stream_spec) do + DynamicSupervisor.start_child( + reg_name(data_stream_spec), + Accumulator.child_spec(data_stream_spec) + ) end - def reg_name(args) do - :"#{__MODULE__}.#{Keyword.fetch!(args, :topic_name)}" + def reg_name(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ + producer_config: %Producers.Config{client_name: client_name} + } = data_stream_spec + + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) + + :"#{__MODULE__}.#{client_name}.#{topic_name}" end end diff --git a/lib/kafka_batcher/behaviours/producer.ex b/lib/kafka_batcher/behaviours/producer.ex index 38d723e..75256d8 100644 --- a/lib/kafka_batcher/behaviours/producer.ex +++ b/lib/kafka_batcher/behaviours/producer.ex @@ -3,19 +3,22 @@ defmodule KafkaBatcher.Behaviours.Producer do KafkaBatcher.Behaviours.Producer adds an abstraction level over producer implementations in various Kafka libs. Defines the callbacks that a Kafka producer should implement """ + alias KafkaBatcher.Producers + @type event :: KafkaBatcher.MessageObject.t() @type events :: list(event()) @callback do_produce( + Producers.Config.t(), events :: events(), topic :: binary(), - partition :: non_neg_integer() | nil, - config :: Keyword.t() + partition :: non_neg_integer() | nil ) :: :ok | {:error, binary() | atom()} - @callback get_partitions_count(binary()) :: {:ok, integer()} | {:error, binary() | atom()} + @callback get_partitions_count(Producers.Config.t(), binary()) :: + {:ok, integer()} | {:error, binary() | atom()} - @callback start_client() :: {:ok, pid()} | {:error, any()} + @callback start_client(Producers.Config.t()) :: {:ok, pid()} | {:error, any()} - @callback start_producer(binary(), Keyword.t()) :: :ok | {:error, any()} + @callback start_producer(Producers.Config.t(), binary()) :: :ok | {:error, any()} end diff --git a/lib/kafka_batcher/collector.ex b/lib/kafka_batcher/collector.ex index 9b9ed4e..da367e6 100644 --- a/lib/kafka_batcher/collector.ex +++ b/lib/kafka_batcher/collector.ex @@ -43,24 +43,32 @@ defmodule KafkaBatcher.Collector do quote location: :keep, bind_quoted: [opts: opts] do use GenServer require Logger - alias KafkaBatcher.{AccumulatorsPoolSupervisor, Collector.State, TempStorage} + + alias KafkaBatcher.{ + AccumulatorsPoolSupervisor, + Collector, + Collector.State, + DataStreamSpec, + TempStorage + } @behaviour KafkaBatcher.Behaviours.Collector import KafkaBatcher.Collector.Implementation @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) - @compile_config KafkaBatcher.Config.build_topic_config(opts) + @compile_opts opts # Public API - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: __MODULE__) + @spec start_link(DataStreamSpec.t()) :: GenServer.on_start() + def start_link(%DataStreamSpec{} = spec) do + GenServer.start_link(__MODULE__, spec, name: __MODULE__) end @doc "Returns a specification to start this module under a supervisor" - def child_spec(config) do + def child_spec(%DataStreamSpec{} = spec) do %{ id: __MODULE__, - start: {__MODULE__, :start_link, [config]}, + start: {__MODULE__, :start_link, [spec]}, type: :worker } end @@ -90,20 +98,21 @@ defmodule KafkaBatcher.Collector do GenServer.call(__MODULE__, :get_config) end - def get_compile_config do - @compile_config + def get_compile_opts do + @compile_opts end # Callbacks @impl GenServer - def init(config) do + def init(%DataStreamSpec{} = data_stream_spec) do Process.flag(:trap_exit, true) - state = build_state(config) + topic_name = DataStreamSpec.get_topic_name(data_stream_spec) - Logger.debug("KafkaBatcher: Batch collector started: topic #{state.topic_name} pid #{inspect(self())}") + Logger.debug("KafkaBatcher: Batch collector started: topic #{topic_name} pid #{inspect(self())}") send(self(), :init_accumulators) - {:ok, state} + + {:ok, %State{data_stream_spec: data_stream_spec}} end @impl GenServer @@ -131,8 +140,8 @@ defmodule KafkaBatcher.Collector do end end - def handle_call(:get_config, _from, state) do - {:reply, state.config, state} + def handle_call(:get_config, _from, %State{} = state) do + {:reply, state.data_stream_spec, state} end def handle_call(unknown, _from, state) do @@ -144,7 +153,7 @@ defmodule KafkaBatcher.Collector do @impl GenServer def handle_info(:init_accumulators, state) do - new_state = store_partitions_count(state) + new_state = store_partition_count(state) case start_accumulators(new_state) do :ok -> @@ -175,25 +184,18 @@ defmodule KafkaBatcher.Collector do end @impl GenServer - def format_status(_reason, [pdict, state]) do - [pdict, drop_sensitive(state)] - end - - defp drop_sensitive(%State{config: config} = state) do - %State{state | config: Keyword.drop(config, [:sasl])} + def format_status(_reason, [pdict, %State{} = state]) do + [ + pdict, + %State{ + state + | data_stream_spec: DataStreamSpec.drop_sensitive(state.data_stream_spec) + } + ] end # Private functions - defp build_state(config) do - %State{ - topic_name: Keyword.fetch!(config, :topic_name), - config: config, - collect_by_partition: Keyword.fetch!(config, :collect_by_partition), - collector: __MODULE__ - } - end - defp restart_timer(%State{timer_ref: ref}) when :erlang.is_reference(ref) do _ = :erlang.cancel_timer(ref) do_restart() diff --git a/lib/kafka_batcher/collector/config.ex b/lib/kafka_batcher/collector/config.ex new file mode 100644 index 0000000..425688f --- /dev/null +++ b/lib/kafka_batcher/collector/config.ex @@ -0,0 +1,52 @@ +defmodule KafkaBatcher.Collector.Config do + @moduledoc false + + alias KafkaBatcher.Config.BadConfigError + alias KafkaBatcher.MessageObject + + @typep topic :: String.t() + @typep partition_count :: pos_integer() + + @type partition_fn :: + (topic(), partition_count(), MessageObject.key(), MessageObject.value() -> + pos_integer()) + + @type t :: %__MODULE__{ + collector_mod: module(), + topic_name: String.t(), + partition_fn: partition_fn() | nil, + collect_by_partition: boolean() + } + + @enforce_keys [:collector_mod, :topic_name] + defstruct @enforce_keys ++ [:partition_fn, collect_by_partition: false] + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + collector_mod: config.collector_mod, + topic_name: config.topic_name, + partition_fn: config.partition_fn, + collect_by_partition: config.collect_by_partition + ] + end + + @spec build!(opts :: Keyword.t()) :: t() + def build!(opts) do + opts + |> Keyword.take([ + :collector_mod, + :topic_name, + :partition_fn, + :collect_by_partition + ]) + |> then(&struct!(__MODULE__, &1)) + rescue + ArgumentError -> + reraise( + BadConfigError, + "Accumulator config failed: missing required opts: #{inspect(@enforce_keys)}", + __STACKTRACE__ + ) + end +end diff --git a/lib/kafka_batcher/collector/implementation.ex b/lib/kafka_batcher/collector/implementation.ex index 639edf9..46bd8aa 100644 --- a/lib/kafka_batcher/collector/implementation.ex +++ b/lib/kafka_batcher/collector/implementation.ex @@ -4,42 +4,61 @@ defmodule KafkaBatcher.Collector.Implementation do """ require Logger - alias KafkaBatcher.{AccumulatorsPoolSupervisor, Collector.State, MessageObject} + + alias KafkaBatcher.{ + AccumulatorsPoolSupervisor, + Collector, + Collector.State, + DataStreamSpec, + MessageObject + } + @producer Application.compile_env(:kafka_batcher, :producer_module, KafkaBatcher.Producers.Kaffe) - def choose_partition(_message, _topic_name, _config, nil), do: {:error, :kafka_unavailable} + def choose_partition(_message, _data_stream_spec, nil), do: {:error, :kafka_unavailable} + + def choose_partition( + %MessageObject{key: key, value: value}, + %DataStreamSpec{} = data_stream_spec, + partitions_count + ) do + %Collector.Config{ + partition_fn: partition_fn, + topic_name: topic_name + } = data_stream_spec.collector_config - def choose_partition(%MessageObject{key: key, value: value}, topic_name, config, partitions_count) do - calc_partition_fn = Keyword.fetch!(config, :partition_fn) + partition = partition_fn.(topic_name, partitions_count, key, value) - partition = calc_partition_fn.(topic_name, partitions_count, key, value) {:ok, partition} end - def start_accumulators(%State{collect_by_partition: true, partitions_count: nil}) do - {:error, :kafka_unavailable} - end + def start_accumulators(%State{} = state) do + collect_by_partition? = + DataStreamSpec.collect_by_partition?(state.data_stream_spec) - def start_accumulators(%State{collect_by_partition: true, partitions_count: count} = state) do - start_accumulators_by_partitions(count, state) - end + cond do + collect_by_partition? and is_nil(state.partitions_count) -> + {:error, :kafka_unavailable} - def start_accumulators(%State{topic_name: topic_name, config: config, collect_by_partition: false} = state) do - start_accumulator(topic_name: topic_name, config: config, collector: state.collector) - end + collect_by_partition? -> + start_accumulators_by_partitions( + state.data_stream_spec, + state.partitions_count + ) - defp start_accumulators_by_partitions(count, %State{} = state) do - opts = [ - topic_name: state.topic_name, - config: state.config, - collector: state.collector - ] + not collect_by_partition? -> + start_accumulator(state.data_stream_spec) + end + end + defp start_accumulators_by_partitions(data_stream_spec, count) do Enum.reduce_while( 0..(count - 1), :ok, fn partition, _ -> - case start_accumulator(Keyword.put(opts, :partition, partition)) do + data_stream_spec = DataStreamSpec.set_partition(data_stream_spec, partition) + + case start_accumulator(data_stream_spec) do :ok -> {:cont, :ok} @@ -50,8 +69,8 @@ defmodule KafkaBatcher.Collector.Implementation do ) end - defp start_accumulator(args) do - case AccumulatorsPoolSupervisor.start_accumulator(args) do + defp start_accumulator(data_stream_spec) do + case AccumulatorsPoolSupervisor.start_accumulator(data_stream_spec) do {:ok, _} -> :ok @@ -60,7 +79,7 @@ defmodule KafkaBatcher.Collector.Implementation do {:error, reason} -> Logger.warning(""" - KafkaBatcher: Accumulator has failed to start with args: #{inspect(args)}. + KafkaBatcher: Accumulator has failed to start with args: #{inspect(data_stream_spec)}. Reason: #{inspect(reason)}} """) @@ -69,8 +88,15 @@ defmodule KafkaBatcher.Collector.Implementation do end @spec store_partition_count(State.t()) :: State.t() - def store_partitions_count(%State{partitions_count: nil} = state) do - case @producer.get_partitions_count(state.topic_name) do + def store_partition_count(%State{partitions_count: nil} = state) do + %State{ + data_stream_spec: %DataStreamSpec{ + collector_config: %Collector.Config{topic_name: topic_name}, + producer_config: producer_config + } + } = state + + case @producer.get_partitions_count(producer_config, topic_name) do {:ok, partitions_count} -> %State{state | partitions_count: partitions_count} diff --git a/lib/kafka_batcher/collector/state.ex b/lib/kafka_batcher/collector/state.ex index 09fa107..97824d9 100644 --- a/lib/kafka_batcher/collector/state.ex +++ b/lib/kafka_batcher/collector/state.ex @@ -3,16 +3,20 @@ defmodule KafkaBatcher.Collector.State do Describes the state of KafkaBatcher.Collector and functions working with it """ - alias KafkaBatcher.{Accumulator, Collector, MessageObject, TempStorage} + alias KafkaBatcher.{ + Accumulator, + Collector, + DataStreamSpec, + MessageObject, + TempStorage + } + alias KafkaBatcher.Collector.{State, Utils} require Logger @type t :: %State{ - topic_name: String.t() | nil, - config: Keyword.t(), - collect_by_partition: boolean(), - collector: atom() | nil, + data_stream_spec: DataStreamSpec.t(), locked?: boolean(), last_check_timestamp: non_neg_integer() | nil, ready?: boolean(), @@ -20,17 +24,17 @@ defmodule KafkaBatcher.Collector.State do partitions_count: pos_integer() | nil } - defstruct topic_name: nil, - config: [], - collect_by_partition: true, - collector: nil, - # these fields are used to handle case when Kafka went down suddenly - locked?: false, - last_check_timestamp: nil, - # these fields are used to handle case when Kafka is not available at the start - ready?: false, - timer_ref: nil, - partitions_count: nil + @enforce_keys [:data_stream_spec] + defstruct @enforce_keys ++ + [ + # these fields are used to handle case when Kafka went down suddenly + locked?: false, + last_check_timestamp: nil, + # these fields are used to handle case when Kafka is not available at the start + ready?: false, + timer_ref: nil, + partitions_count: nil + ] @spec add_events(t(), [Utils.event()]) :: {:ok, t()} | {:error, term(), t()} def add_events(%State{} = state, events) do @@ -49,7 +53,7 @@ defmodule KafkaBatcher.Collector.State do |> Enum.reduce(:ok, fn %MessageObject{} = event, result -> case choose_partition(state, event) do {:ok, partition} when result == :ok -> - try_to_add_event(event, state.topic_name, partition) + try_to_add_event(state, event, partition) {:ok, partition} -> keep_failed_event(result, event, elem(result, 1), partition) @@ -60,8 +64,10 @@ defmodule KafkaBatcher.Collector.State do end) end - defp try_to_add_event(event, topic_name, partition) do - case Accumulator.add_event(event, topic_name, partition) do + defp try_to_add_event(%State{} = state, event, partition) do + data_stream_spec = DataStreamSpec.set_partition(state.data_stream_spec, partition) + + case Accumulator.add_event(event, data_stream_spec) do :ok -> :ok {:error, reason} -> keep_failed_event(:ok, event, reason, partition) end @@ -84,17 +90,16 @@ defmodule KafkaBatcher.Collector.State do } end - defp choose_partition(%State{collect_by_partition: true} = state, event) do - Collector.Implementation.choose_partition( - event, - state.topic_name, - state.config, - state.partitions_count - ) - end - - defp choose_partition(%State{collect_by_partition: false}, _event) do - {:ok, nil} + defp choose_partition(%State{} = state, event) do + if DataStreamSpec.collect_by_partition?(state.data_stream_spec) do + Collector.Implementation.choose_partition( + event, + state.data_stream_spec, + state.partitions_count + ) + else + {:ok, nil} + end end defp save_failed_events(:ok, _state), do: :ok @@ -103,12 +108,14 @@ defmodule KafkaBatcher.Collector.State do {:error, _reason, failed_event_batches} = result, %State{} = state ) do + %State{data_stream_spec: %DataStreamSpec{} = data_stream_spec} = state + for {partition, failed_events} <- failed_event_batches do TempStorage.save_batch(%TempStorage.Batch{ messages: Enum.reverse(failed_events), - topic: state.topic_name, + topic: DataStreamSpec.get_topic_name(data_stream_spec), partition: partition, - producer_config: state.config + producer_config: data_stream_spec.opts }) end diff --git a/lib/kafka_batcher/config.ex b/lib/kafka_batcher/config.ex index f7aee01..02dadfc 100644 --- a/lib/kafka_batcher/config.ex +++ b/lib/kafka_batcher/config.ex @@ -5,14 +5,6 @@ defmodule KafkaBatcher.Config do Examples of configs can be found in the files config/test.exs and test/support/collectors/collector_handlers.ex """ - defmodule SASLConfigError do - defexception [:message] - - def exception(_term) do - %SASLConfigError{message: "Sasl config error"} - end - end - defmodule CollectorMissingError do defexception [:message] @@ -29,259 +21,96 @@ defmodule KafkaBatcher.Config do end end - @type sasl_mechanism() :: :plain | :scram_sha_256 | :scram_sha_512 - @type sasl_type() :: {sasl_mechanism(), binary(), binary()} | :undefined - - @spec collectors_spec() :: [:supervisor.child_spec()] - def collectors_spec do - collector_configs = get_configs_by_collector!() - - children_specs = - Enum.reduce( - collector_configs, - [], - fn {collector, config}, all_children -> - collector_spec = collector.child_spec(config) - accum_sup_spec = KafkaBatcher.AccumulatorsPoolSupervisor.child_spec(config) - - [collector_spec, accum_sup_spec | all_children] - end - ) - - conn_manager_spec = KafkaBatcher.ConnectionManager.child_spec() - Enum.reverse([conn_manager_spec | children_specs]) - end - - @spec general_producer_config() :: Keyword.t() - def general_producer_config do - Application.get_env(:kafka_batcher, :kafka, []) - |> Keyword.take(allowed_producer_keys()) - |> set_endpoints() - |> set_sasl() - |> set_ssl() - |> then(fn config -> Keyword.merge(default_producer_config(), config) end) - end - - @doc """ - Return all configured topics with its config. - """ - @spec get_configs_by_topic_name() :: list({binary(), Keyword.t()}) - def get_configs_by_topic_name do - get_configs_by_collector!() - |> Enum.map(fn {_, config} -> - {Keyword.fetch!(config, :topic_name), config} - end) - |> Enum.into(%{}) - end - - @spec get_configs_by_collector!() :: list({atom(), Keyword.t()}) - def get_configs_by_collector! do - Enum.map(fetch_runtime_configs(), fn {collector, runtime_config} -> - config = - general_producer_config() - |> Keyword.merge(get_compile_config!(collector)) - |> Keyword.merge(runtime_config) - - case validate_config!({collector, config}) do - :ok -> - {collector, config} - - {:error, reasons} -> - raise(KafkaBatcher.Config.BadConfigError, "Collector config failed: #{inspect(reasons)}") + alias KafkaBatcher.{ + Accumulator, + Collector, + DataStreamSpec.Validator, + Producers + } + + @type t :: %__MODULE__{ + producer_config: Producers.Config.t(), + data_stream_specs: [KafkaBatcher.DataStreamSpec.t()], + kafka_topic_aliases: %{optional(binary()) => binary()}, + kafka_metric_opts: Keyword.t() + } + + @enforce_keys [ + :producer_config, + :data_stream_specs, + :kafka_topic_aliases, + :kafka_metric_opts + ] + defstruct @enforce_keys + + @spec get_client_name(t()) :: atom() + def get_client_name(%__MODULE__{} = config) do + %__MODULE__{ + producer_config: %Producers.Config{client_name: client_name} + } = config + + client_name + end + + @spec build_config!(opts :: Keyword.t()) :: t() + def build_config!(opts) do + producer_config = + opts |> Keyword.get(:kafka, []) |> Producers.Config.build!() + + data_stream_specs = + for collector_mod <- Keyword.fetch!(opts, :collectors) do + producer_config + |> build_data_stream_spec!(collector_mod, opts) + |> validate_data_stream_spec!() end - end) - end - @spec get_collector_config(topic_name :: binary()) :: Keyword.t() - def get_collector_config(topic_name) do - case get_configs_by_topic_name()[topic_name] do - nil -> general_producer_config() - config -> config + %__MODULE__{ + producer_config: producer_config, + data_stream_specs: data_stream_specs, + kafka_topic_aliases: Keyword.get(opts, :kafka_topic_aliases, %{}), + kafka_metric_opts: Keyword.get(opts, :kafka_metric_opts, []) + } + end + + defp build_data_stream_spec!(producer_config, collector_mod, opts) do + opts = + opts + |> Keyword.merge(get_compile_opts!(collector_mod)) + |> Keyword.merge(Keyword.get(opts, collector_mod, [])) + |> Keyword.put(:collector_mod, collector_mod) + + collector_config = Collector.Config.build!(opts) + accumulator_config = Accumulator.Config.build!(opts) + + %KafkaBatcher.DataStreamSpec{ + collector_config: collector_config, + accumulator_config: accumulator_config, + producer_config: producer_config, + opts: + collector_config + |> Collector.Config.to_kwlist() + |> Keyword.merge(Accumulator.Config.to_kwlist(accumulator_config)) + |> Keyword.merge(kafka: Producers.Config.to_kwlist(producer_config)) + } + end + + defp validate_data_stream_spec!(data_stream_spec) do + case Validator.validate(data_stream_spec) do + :ok -> + data_stream_spec + + {:error, reason} -> + raise(BadConfigError, "Collector config failed: #{inspect(reason)}") end end - @spec build_topic_config(opts :: Keyword.t()) :: Keyword.t() - def build_topic_config(opts) do - default_config = default_producer_config() - - allowed_producer_keys() - |> Enum.map(fn key -> {key, Keyword.get(opts, key) || Keyword.get(default_config, key)} end) - |> Enum.filter(fn {_, value} -> value != nil end) - end - - @spec get_endpoints :: list({binary(), non_neg_integer()}) - def get_endpoints do - Application.get_env(:kafka_batcher, :kafka, []) - |> get_endpoints() - end - - @spec get_endpoints(config :: Keyword.t()) :: list({binary(), non_neg_integer()}) - def get_endpoints(config) do - Keyword.fetch!(config, :endpoints) - |> parse_endpoints() - end - - defp parse_endpoints(endpoints) do - endpoints |> String.split(",") |> Enum.map(&parse_endpoint/1) - end - - defp parse_endpoint(url) do - [host, port] = String.split(url, ":") - {host, :erlang.binary_to_integer(port)} - end - - defp validate_config!({collector, config}) do - required_keys() - |> Enum.reduce( - [], - fn - key, acc when is_atom(key) -> - case Keyword.has_key?(config, key) do - true -> - acc - - false -> - ["collector #{inspect(collector)}. Not found required key #{inspect(key)}" | acc] - end - - %{cond: condition, keys: keys}, acc -> - case check_conditions?(condition, config) do - true -> - check_keys(keys, config, collector, acc) - - false -> - acc - end - end - ) - |> case do - [] -> :ok - reasons -> {:error, reasons} - end - end - - defp check_keys(keys, config, collector, acc) do - case keys -- Keyword.keys(config) do - [] -> - acc - - fields -> - ["collector #{inspect(collector)}. Not found required keys #{inspect(fields)}" | acc] - end - end - - defp check_conditions?(cond, config) do - Enum.all?( - cond, - fn {key, value} -> - Keyword.get(config, key) == value - end - ) - end - - defp required_keys do - [ - :topic_name, - %{cond: [collect_by_partition: true], keys: [:partition_fn]}, - %{cond: [collect_by_partition: false], keys: [:partition_strategy]} - ] - end - - defp set_endpoints(config) do - Keyword.put(config, :endpoints, get_endpoints(config)) - end - - defp set_sasl(config) do - new_sasl = - case validate_sasl_config!(Keyword.get(config, :sasl)) do - {:ok, sasl_config_tuple} -> sasl_config_tuple - _ -> :undefined - end - - Keyword.put(config, :sasl, new_sasl) - end - - defp allowed_producer_keys do - keys = - default_producer_config() - |> Keyword.keys() - - [[:endpoints, :partition_fn, :topic_name, :ssl, :sasl] | keys] - |> List.flatten() - end - - @spec validate_sasl_config!(map() | nil) :: {:ok, sasl_type()} | {:error, :is_not_set} | no_return() - defp validate_sasl_config!(%{mechanism: mechanism, login: login, password: password} = config) do - valid_mechanism = mechanism in [:plain, :scram_sha_256, :scram_sha_512] - - if valid_mechanism && password != nil && login != nil do - {:ok, {mechanism, login, password}} - else - raise(KafkaBatcher.Config.SASLConfigError, "SASL config failed: #{inspect(config)}") - end - end - - defp validate_sasl_config!(sasl_config) when sasl_config == nil or sasl_config == %{} do - {:error, :is_not_set} - end - - defp validate_sasl_config!(bad_sasl_config) do - raise(KafkaBatcher.Config.SASLConfigError, "SASL config failed: #{inspect(bad_sasl_config)}") - end - - defp set_ssl(config) do - Keyword.put(config, :ssl, get_ssl(config)) - end - - defp get_ssl(config) do - Keyword.get(config, :ssl, false) - end - - defp get_compile_config!(module) do + defp get_compile_opts!(module) do with {:module, ^module} <- Code.ensure_compiled(module), - {:ok, [_ | _] = config} <- {:ok, module.get_compile_config()} do - config + {:ok, [_ | _] = opts} <- {:ok, module.get_compile_opts()} do + opts else _ -> - raise(KafkaBatcher.Config.CollectorMissingError, "Collector: #{inspect(module)} missing") + raise(CollectorMissingError, "Collector: #{inspect(module)} missing") end end - - defp fetch_runtime_configs do - Application.get_env(:kafka_batcher, :collectors) - |> Enum.map(&fetch_runtime_config/1) - end - - defp fetch_runtime_config(collector_name) do - case Application.fetch_env(:kafka_batcher, collector_name) do - {:ok, config} -> - {collector_name, config} - - _ -> - {collector_name, []} - end - end - - defp default_producer_config do - [ - ## brod producer parameters - ## https://github.com/kafka4beam/brod/blob/master/src/brod_producer.erl - allow_topic_auto_creation: false, - partition_strategy: :random, - required_acks: -1, - ## KafkaBatcher parameters - ## specified start pool processes for collection events by partitions - collect_by_partition: false, - ## send metric values to prom_ex application - telemetry: true, - # This module implements logic for force pushing current batch to Kafka, - # without waiting for other conditions (on size and/or interval). - batch_flusher: KafkaBatcher.Accumulator.DefaultBatchFlusher, - # These parameters are required for the collector - max_wait_time: 1_000, - batch_size: 10, - min_delay: 0, - max_batch_bytesize: 1_000_000 - ] - end end diff --git a/lib/kafka_batcher/connection_manager.ex b/lib/kafka_batcher/connection_manager.ex index ed0cf83..c410927 100644 --- a/lib/kafka_batcher/connection_manager.ex +++ b/lib/kafka_batcher/connection_manager.ex @@ -9,11 +9,18 @@ defmodule KafkaBatcher.ConnectionManager do defmodule State do @moduledoc "State of ConnectionManager process" - defstruct client_started: false, client_pid: nil + @type t :: %State{ + config: KafkaBatcher.Config.t(), + client_started: boolean(), + client_pid: nil | pid() + } - @type t :: %State{client_started: boolean(), client_pid: nil | pid()} + @enforce_keys [:config] + defstruct @enforce_keys ++ [client_started: false, client_pid: nil] end + alias KafkaBatcher.Producers + @producer Application.compile_env(:kafka_batcher, :producer_module, KafkaBatcher.Producers.Kaffe) @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) @reconnect_timeout Application.compile_env(:kafka_batcher, :reconnect_timeout, 5_000) @@ -21,25 +28,29 @@ defmodule KafkaBatcher.ConnectionManager do use GenServer # Public API - @spec start_link() :: :ignore | {:error, any()} | {:ok, pid()} - def start_link do - GenServer.start_link(__MODULE__, [], name: __MODULE__) + @spec start_link(KafkaBatcher.Config.t()) :: GenServer.on_start() + def start_link(%KafkaBatcher.Config{} = config) do + GenServer.start_link( + __MODULE__, + [config], + name: reg_name(config.producer_config) + ) end @doc "Returns a specification to start this module under a supervisor" - @spec child_spec(nil) :: map() - def child_spec(_ \\ nil) do + @spec child_spec(KafkaBatcher.Config.t()) :: Supervisor.child_spec() + def child_spec(%KafkaBatcher.Config{} = config) do %{ - id: __MODULE__, - start: {__MODULE__, :start_link, []}, + id: reg_name(config.producer_config), + start: {__MODULE__, :start_link, [config]}, type: :worker } end @doc "Checks that Kafka client is already started" - @spec client_started?() :: boolean() - def client_started? do - GenServer.call(__MODULE__, :client_started?) + @spec client_started?(client_name :: atom()) :: boolean() + def client_started?(client_name \\ Producers.Config.default_client_name()) do + GenServer.call(reg_name(client_name), :client_started?) end ## @@ -47,9 +58,9 @@ defmodule KafkaBatcher.ConnectionManager do ## @impl GenServer - def init(_opts) do + def init(config) do Process.flag(:trap_exit, true) - {:ok, %State{}, {:continue, :start_client}} + {:ok, %State{config: config}, {:continue, :start_client}} end @impl GenServer @@ -110,7 +121,7 @@ defmodule KafkaBatcher.ConnectionManager do ## defp connect(state) do - case prepare_connection() do + case prepare_connection(state) do {:ok, pid} -> %State{state | client_started: true, client_pid: pid} @@ -120,29 +131,33 @@ defmodule KafkaBatcher.ConnectionManager do end end - defp start_producers do - KafkaBatcher.Config.get_configs_by_topic_name() - |> Enum.reduce_while(:ok, fn {topic_name, config}, _ -> - case @producer.start_producer(topic_name, config) do - :ok -> - {:cont, :ok} + defp start_producers(%State{config: %KafkaBatcher.Config{} = config}) do + config.data_stream_specs + |> Enum.map(&KafkaBatcher.DataStreamSpec.get_topic_name/1) + |> Enum.reduce_while( + :ok, + fn topic_name, _ -> + case @producer.start_producer(config.producer_config, topic_name) do + :ok -> + {:cont, :ok} - {:error, reason} -> - @error_notifier.report( - type: "KafkaBatcherProducerStartFailed", - message: "Topic: #{topic_name}. Reason #{inspect(reason)}" - ) + {:error, reason} -> + @error_notifier.report( + type: "KafkaBatcherProducerStartFailed", + message: "Topic: #{topic_name}. Reason #{inspect(reason)}" + ) - {:halt, :error} + {:halt, :error} + end end - end) + ) end - defp prepare_connection do - case start_client() do - {:ok, pid} -> - case start_producers() do - :ok -> {:ok, pid} + defp prepare_connection(state) do + case start_client(state) do + {:ok, _pid} = ok -> + case start_producers(state) do + :ok -> ok :error -> :retry end @@ -156,10 +171,14 @@ defmodule KafkaBatcher.ConnectionManager do end end - defp start_client do - case @producer.start_client() do - {:ok, pid} -> - {:ok, pid} + defp start_client(%State{} = state) do + %KafkaBatcher.Config{ + producer_config: producer_config + } = state.config + + case @producer.start_client(producer_config) do + {:ok, _pid} = ok -> + ok {:error, {:already_started, pid}} -> Logger.debug("KafkaBatcher: Kafka client already started: #{inspect(pid)}") @@ -169,4 +188,12 @@ defmodule KafkaBatcher.ConnectionManager do error end end + + defp reg_name(%Producers.Config{} = producer_config) do + reg_name(producer_config.client_name) + end + + defp reg_name(client_name) when is_atom(client_name) do + :"#{__MODULE__}.#{client_name}" + end end diff --git a/lib/kafka_batcher/data_stream_spec.ex b/lib/kafka_batcher/data_stream_spec.ex new file mode 100644 index 0000000..70af9fb --- /dev/null +++ b/lib/kafka_batcher/data_stream_spec.ex @@ -0,0 +1,98 @@ +defmodule KafkaBatcher.DataStreamSpec do + @moduledoc false + + alias KafkaBatcher.{ + Accumulator, + Collector, + Producers + } + + @type t :: %__MODULE__{ + collector_config: Collector.Config.t(), + accumulator_config: Accumulator.Config.t(), + producer_config: Producers.Config.t(), + # opts are kept for backwards compatibility + opts: Keyword.t() + } + + @derive { + Inspect, + only: [:collector_config, :accumulator_config, :producer_config] + } + + @enforce_keys [ + :collector_config, + :accumulator_config, + :producer_config, + :opts + ] + defstruct @enforce_keys + + @spec drop_sensitive(t()) :: t() + def drop_sensitive(%__MODULE__{} = data_stream_spec) do + %__MODULE__{ + collector_config: data_stream_spec.collector_config, + accumulator_config: data_stream_spec.accumulator_config, + producer_config: Producers.Config.drop_sensitive(data_stream_spec.producer_config), + opts: [] + } + end + + @spec get_accumulator_mod(t()) :: module() + def get_accumulator_mod(%__MODULE__{} = data_stream_spec) do + %__MODULE__{ + accumulator_config: %Accumulator.Config{accumulator_mod: accumulator_mod} + } = data_stream_spec + + accumulator_mod + end + + @spec get_partition(t()) :: pos_integer() | nil + def get_partition(%__MODULE__{} = data_stream_spec) do + %__MODULE__{ + accumulator_config: %Accumulator.Config{partition: partition} + } = data_stream_spec + + partition + end + + @spec set_partition(t(), pos_integer()) :: t() + def set_partition(%__MODULE__{} = data_stream_spec, partition) do + %__MODULE__{ + data_stream_spec + | accumulator_config: %Accumulator.Config{ + data_stream_spec.accumulator_config + | partition: partition + } + } + end + + @spec get_topic_name(t()) :: String.t() + def get_topic_name(%__MODULE__{} = data_stream_spec) do + %__MODULE__{ + collector_config: %Collector.Config{topic_name: topic_name} + } = data_stream_spec + + topic_name + end + + @spec get_collector_mod(t()) :: module() + def get_collector_mod(%__MODULE__{} = data_stream_spec) do + %__MODULE__{ + collector_config: %Collector.Config{collector_mod: collector_mod} + } = data_stream_spec + + collector_mod + end + + @spec collect_by_partition?(t()) :: boolean() + def collect_by_partition?(%__MODULE__{} = data_stream_spec) do + %__MODULE__{ + collector_config: %Collector.Config{ + collect_by_partition: collect_by_partition + } + } = data_stream_spec + + collect_by_partition + end +end diff --git a/lib/kafka_batcher/data_stream_spec/validator.ex b/lib/kafka_batcher/data_stream_spec/validator.ex new file mode 100644 index 0000000..85a2cea --- /dev/null +++ b/lib/kafka_batcher/data_stream_spec/validator.ex @@ -0,0 +1,30 @@ +defmodule KafkaBatcher.DataStreamSpec.Validator do + @moduledoc false + + alias KafkaBatcher.{Collector, DataStreamSpec, Producers} + + @spec validate(DataStreamSpec.t()) :: :ok | {:error, String.t()} + def validate(%DataStreamSpec{} = data_stream_spec) do + %DataStreamSpec{ + collector_config: %Collector.Config{ + collector_mod: collector_mod, + partition_fn: partition_fn, + collect_by_partition: collect_by_partition + }, + producer_config: %Producers.Config{ + partition_strategy: partition_strategy + } + } = data_stream_spec + + cond do + collect_by_partition and is_nil(partition_fn) -> + {:error, "collector #{inspect(collector_mod)}. Not found required key :partition_fn"} + + not collect_by_partition and is_nil(partition_strategy) -> + {:error, "collector #{inspect(collector_mod)}. Not found required key :partition_strategy"} + + :otherwise -> + :ok + end + end +end diff --git a/lib/kafka_batcher/producers/base.ex b/lib/kafka_batcher/producers/base.ex index eef1165..bd3f058 100644 --- a/lib/kafka_batcher/producers/base.ex +++ b/lib/kafka_batcher/producers/base.ex @@ -5,14 +5,17 @@ defmodule KafkaBatcher.Producers.Base do defmacro __using__(opts) do quote location: :keep, bind_quoted: [opts: opts] do + alias KafkaBatcher.Producers + @error_notifier Application.compile_env(:kafka_batcher, :error_notifier, KafkaBatcher.DefaultErrorNotifier) require Logger - def produce_list(messages, topic, nil, config) when is_list(messages) and is_binary(topic) and is_list(config) do - with {:ok, partitions_count} <- get_partitions_count(topic), - grouped_messages <- group_messages(messages, topic, partitions_count, partition_strategy_from(config)), - :ok <- produce_list_to_topic(grouped_messages, topic, config) do + def produce_list(%Producers.Config{} = config, messages, topic, nil) when is_list(messages) and is_binary(topic) do + with {:ok, partitions_count} <- get_partitions_count(config, topic), + partition_strategy = config.partition_strategy || :random, + grouped_messages <- group_messages(messages, topic, partitions_count, partition_strategy), + :ok <- produce_list_to_topic(config, grouped_messages, topic) do :ok else error -> @@ -29,16 +32,16 @@ defmodule KafkaBatcher.Producers.Base do {:error, :failed_push_to_kafka} end - def produce_list(messages, topic, partition, config) - when is_list(messages) and is_binary(topic) and is_list(config) and is_integer(partition) do - produce_list_to_topic(%{partition => messages}, topic, config) + def produce_list(%Producers.Config{} = config, messages, topic, partition) + when is_list(messages) and is_binary(topic) and is_integer(partition) do + produce_list_to_topic(config, %{partition => messages}, topic) rescue err -> @error_notifier.report(err, stacktrace: __STACKTRACE__) {:error, :failed_push_to_kafka} end - def produce_list(messages, topic, partition, config) do + def produce_list(%Producers.Config{} = config, messages, topic, partition) do @error_notifier.report( type: "KafkaBatcherProducerError", message: """ @@ -50,15 +53,15 @@ defmodule KafkaBatcher.Producers.Base do {:error, :internal_error} end - defp produce_list_to_topic(message_list, topic, config) do + defp produce_list_to_topic(%Producers.Config{} = config, message_list, topic) do message_list |> Enum.reduce_while(:ok, fn {partition, messages}, :ok -> Logger.debug("KafkaBatcher: event#produce_list_to_topic topic=#{topic} partition=#{partition}") start_time = System.monotonic_time() - case __MODULE__.do_produce(messages, topic, partition, config) do + case __MODULE__.do_produce(config, messages, topic, partition) do :ok -> - push_metrics(start_time, topic, partition, messages, telemetry_on?(config)) + push_metrics(start_time, topic, partition, messages, config.telemetry) {:cont, :ok} {:error, _reason} = error -> @@ -82,10 +85,6 @@ defmodule KafkaBatcher.Producers.Base do end) end - defp telemetry_on?(opts) do - Keyword.get(opts, :telemetry, true) - end - defp push_metrics(_start_time, _topic, _partition, _messages, false) do :ok end @@ -106,17 +105,6 @@ defmodule KafkaBatcher.Producers.Base do } ) end - - defp partition_strategy_from(opts) do - case Keyword.fetch(opts, :partition_strategy) do - {:ok, partition_strategy} -> - partition_strategy - - :error -> - KafkaBatcher.Config.general_producer_config() - |> Keyword.get(:partition_strategy, :random) - end - end end end end diff --git a/lib/kafka_batcher/producers/config.ex b/lib/kafka_batcher/producers/config.ex new file mode 100644 index 0000000..82284c7 --- /dev/null +++ b/lib/kafka_batcher/producers/config.ex @@ -0,0 +1,95 @@ +defmodule KafkaBatcher.Producers.Config do + @moduledoc false + + alias KafkaBatcher.Config.BadConfigError + alias KafkaBatcher.MessageObject + alias KafkaBatcher.Producers.Config.BrodConfig + + @typep topic :: String.t() + @typep partition_count :: pos_integer() + + @type partition_fn :: + (topic(), partition_count(), MessageObject.key(), MessageObject.value() -> + pos_integer()) + + @type partition_strategy :: :random | :md5 | partition_fn() + + @type t :: %__MODULE__{ + endpoints: [{String.t(), non_neg_integer()}], + client_name: atom(), + partition_strategy: partition_strategy() | nil, + required_acks: integer(), + telemetry: boolean(), + brod_config: BrodConfig.t() + } + + @default_client_name :kafka_producer_client + + @enforce_keys [ + :endpoints, + :client_name, + :partition_strategy, + :required_acks, + :telemetry, + :brod_config + ] + + defstruct @enforce_keys + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + endpoints: config.endpoints, + client_name: config.client_name, + partition_strategy: config.partition_strategy, + required_acks: config.required_acks, + telemetry: config.telemetry + ] ++ BrodConfig.to_kwlist(config.brod_config) + end + + @spec drop_sensitive(t()) :: t() + def drop_sensitive(%__MODULE__{} = config) do + %__MODULE__{ + config + | brod_config: BrodConfig.drop_sensitive(config.brod_config) + } + end + + @spec default_client_name :: :kafka_producer_client + def default_client_name, do: @default_client_name + + @spec build!(opts :: Keyword.t()) :: t() + def build!(opts) do + %__MODULE__{ + endpoints: build_endpoints!(opts), + client_name: Keyword.get(opts, :client_name, @default_client_name), + partition_strategy: Keyword.get(opts, :partition_strategy), + required_acks: Keyword.get(opts, :required_acks, -1), + telemetry: Keyword.get(opts, :telemetry, true), + brod_config: BrodConfig.build!(opts) + } + end + + defp build_endpoints!(opts) do + case Keyword.fetch(opts, :endpoints) do + {:ok, endpoints} when is_binary(endpoints) -> + for url <- String.split(endpoints, ","), do: parse_endpoint!(url) + + {:ok, endpoints} -> + raise(BadConfigError, "Producer config failed: non-string endpoints given #{inspect(endpoints)}") + + :error -> + raise(BadConfigError, "Producer config failed: no endpoints given") + end + end + + defp parse_endpoint!(endpoint) do + case URI.parse("//" <> endpoint) do + %URI{host: host, port: port} when not is_nil(host) and not is_nil(port) -> + {host, port} + + _ -> + raise(BadConfigError, "Producer config failed: invalid endpoint url format #{inspect(endpoint)}") + end + end +end diff --git a/lib/kafka_batcher/producers/config/brod_config.ex b/lib/kafka_batcher/producers/config/brod_config.ex new file mode 100644 index 0000000..74433c6 --- /dev/null +++ b/lib/kafka_batcher/producers/config/brod_config.ex @@ -0,0 +1,92 @@ +defmodule KafkaBatcher.Producers.Config.BrodConfig do + defmodule SASLConfigError do + defexception [:message] + + def exception(_term) do + %SASLConfigError{message: "Sasl config error"} + end + end + + @moduledoc false + + @type sasl_mechanism() :: :plain | :scram_sha_256 | :scram_sha_512 + @type sasl() :: {sasl_mechanism(), binary(), binary()} | :undefined + + @type t :: %__MODULE__{ + allow_topic_auto_creation: boolean(), + required_acks: integer(), + ssl: boolean(), + sasl: sasl() + } + + @derive {Inspect, only: [:allow_topic_auto_creation, :required_acks, :ssl]} + + @enforce_keys [:allow_topic_auto_creation, :required_acks, :ssl, :sasl] + defstruct @enforce_keys + + @spec drop_sensitive(t()) :: t() + def drop_sensitive(%__MODULE__{} = config) do + case config.sasl do + :undefined -> config + {mechanism, _, _} -> %__MODULE__{config | sasl: {mechanism, "", ""}} + end + end + + @spec to_kwlist(t()) :: Keyword.t() + def to_kwlist(%__MODULE__{} = config) do + [ + allow_topic_auto_creation: config.allow_topic_auto_creation, + required_acks: config.required_acks, + ssl: config.ssl, + sasl: config.sasl + ] + end + + @spec build!(Keyword.t()) :: t() + def build!(opts) do + sasl_config = + case build_sasl_config(Keyword.get(opts, :sasl)) do + {:ok, sasl_config} -> + sasl_config + + {:error, {:invalid, config}} -> + raise(SASLConfigError, "SASL config failed: #{inspect(config)}") + end + + # https://github.com/kafka4beam/brod/blob/master/src/brod_producer.erl + %__MODULE__{ + allow_topic_auto_creation: Keyword.get(opts, :allow_topic_auto_creation, false), + required_acks: Keyword.get(opts, :required_acks, -1), + ssl: Keyword.get(opts, :ssl, false), + sasl: sasl_config + } + end + + @spec build_sasl_config(map() | nil) :: + {:ok, sasl()} | {:error, {:invalid, map()}} + defp build_sasl_config( + %{ + mechanism: mechanism, + login: login, + password: password + } = config + ) + when is_binary(login) and is_binary(password) do + mechanism_valid? = mechanism in [:plain, :scram_sha_256, :scram_sha_512] + + if mechanism_valid? and String.valid?(login) and String.valid?(password) do + {:ok, {mechanism, login, password}} + else + {:error, {:invalid, config}} + end + end + + defp build_sasl_config(sasl_config) + when sasl_config == nil or sasl_config == %{} do + {:ok, :undefined} + end + + defp build_sasl_config(bad_sasl_config) do + {:error, {:invalid, bad_sasl_config}} + end +end diff --git a/lib/kafka_batcher/producers/kaffe.ex b/lib/kafka_batcher/producers/kaffe.ex index 0ec51ea..989b9fa 100644 --- a/lib/kafka_batcher/producers/kaffe.ex +++ b/lib/kafka_batcher/producers/kaffe.ex @@ -2,9 +2,9 @@ defmodule KafkaBatcher.Producers.Kaffe do @moduledoc """ An implementation of the KafkaBatcher.Behaviours.Producer for Kaffe """ + alias KafkaBatcher.{Producers, Producers.Config.BrodConfig} @brod_client Application.compile_env(:kafka_batcher, :brod_client, :brod) - @client_name :kafka_producer_client @behaviour KafkaBatcher.Behaviours.Producer use KafkaBatcher.Producers.Base @@ -14,26 +14,37 @@ defmodule KafkaBatcher.Producers.Kaffe do ## ------------------------------------------------------------------------- @impl true - def start_client do - config = KafkaBatcher.Config.general_producer_config() - endpoints = Keyword.fetch!(config, :endpoints) - - @brod_client.start_link_client(endpoints, @client_name, config) + def start_client(%Producers.Config{} = config) do + @brod_client.start_link_client( + config.endpoints, + config.client_name, + BrodConfig.to_kwlist(config.brod_config) + ) end @impl true - def start_producer(topic_name, config) do - @brod_client.start_producer(@client_name, topic_name, config) + def start_producer(%Producers.Config{} = config, topic_name) do + @brod_client.start_producer( + config.client_name, + topic_name, + BrodConfig.to_kwlist(config.brod_config) + ) end @impl true - def get_partitions_count(topic) do - @brod_client.get_partitions_count(@client_name, topic) + def get_partitions_count(%Producers.Config{} = config, topic) do + @brod_client.get_partitions_count(config.client_name, topic) end @impl true - def do_produce(messages, topic, partition, _config) do - @brod_client.produce_sync(@client_name, topic, partition, "ignored", transform_messages(messages)) + def do_produce(%Producers.Config{} = config, messages, topic, partition) do + @brod_client.produce_sync( + config.client_name, + topic, + partition, + "ignored", + transform_messages(messages) + ) end ## ------------------------------------------------------------------------- diff --git a/lib/kafka_batcher/producers/kafka_ex.ex b/lib/kafka_batcher/producers/kafka_ex.ex index c3748d1..ea90828 100644 --- a/lib/kafka_batcher/producers/kafka_ex.ex +++ b/lib/kafka_batcher/producers/kafka_ex.ex @@ -3,10 +3,10 @@ if Code.ensure_loaded?(KafkaEx) do @moduledoc """ An implementation of the KafkaBatcher.Behaviours.Producer for KafkaEx """ + alias KafkaBatcher.Producers @kafka_ex_client Application.compile_env(:kafka_batcher, :kafka_ex_client, KafkaEx) @metadata_response Application.compile_env(:kafka_batcher, :kafka_ex_metadata, KafkaEx.Protocol.Metadata.Response) - @client_name :kafka_producer_client @behaviour KafkaBatcher.Behaviours.Producer use KafkaBatcher.Producers.Base @@ -17,10 +17,8 @@ if Code.ensure_loaded?(KafkaEx) do ## KafkaEx start worker @impl true - def start_client do - uris = KafkaBatcher.Config.get_endpoints() - - @kafka_ex_client.create_worker(@client_name, uris: uris) + def start_client(%Producers.Config{} = config) do + @kafka_ex_client.create_worker(config.client_name, uris: config.endpoints) end @impl true @@ -29,9 +27,9 @@ if Code.ensure_loaded?(KafkaEx) do end @impl true - def get_partitions_count(topic) do + def get_partitions_count(%Producers.Config{} = config, topic) do count = - @kafka_ex_client.metadata(topic: topic, worker_name: @client_name) + @kafka_ex_client.metadata(topic: topic, worker_name: config.client_name) |> @metadata_response.partitions_for_topic(topic) |> length() @@ -39,15 +37,15 @@ if Code.ensure_loaded?(KafkaEx) do end @impl true - def do_produce(messages, topic, partition, config) do + def do_produce(%Producers.Config{} = config, messages, topic, partition) do case @kafka_ex_client.produce( %KafkaEx.Protocol.Produce.Request{ topic: topic, partition: partition, - required_acks: Keyword.get(config, :required_acks), + required_acks: config.required_acks, messages: transform_messages(messages) }, - worker_name: @client_name + worker_name: config.client_name ) do {:ok, _offset} -> :ok diff --git a/lib/kafka_batcher/prom_ex/plugins/kafka.ex b/lib/kafka_batcher/prom_ex/plugins/kafka.ex index 8f7d61b..80905f5 100644 --- a/lib/kafka_batcher/prom_ex/plugins/kafka.ex +++ b/lib/kafka_batcher/prom_ex/plugins/kafka.ex @@ -59,19 +59,37 @@ if Code.ensure_loaded?(PromEx) do ] @impl true - def event_metrics(_opts) do - metric_prefix = [:prom_ex, :kafka] + def event_metrics(opts) do + %KafkaBatcher.Config{} = + config = + Keyword.get_lazy(opts, :kafka_batcher_config, fn -> + :kafka_batcher + |> Application.get_all_env() + |> KafkaBatcher.Config.build_config!() + end) + + metric_prefix = + [:prom_ex, :kafka, KafkaBatcher.Config.get_client_name(config)] labels = %{} - buckets = Application.get_env(:kafka_batcher, :kafka_metric_opts, []) [ - producer_event_metrics(metric_prefix, labels, buckets), - consumer_event_metrics(metric_prefix, labels, buckets) + producer_event_metrics( + metric_prefix, + labels, + config.kafka_metric_opts, + config.kafka_topic_aliases + ), + consumer_event_metrics( + metric_prefix, + labels, + config.kafka_metric_opts, + config.kafka_topic_aliases + ) ] end - def producer_event_metrics(metric_prefix, labels, buckets) do + def producer_event_metrics(metric_prefix, labels, buckets, aliases) do producer_metrics_tags = Map.keys(labels) ++ [:topic, :partition, :topic_alias] buckets = Keyword.get(buckets, :producer_buckets, @default_producer_buckets) @@ -83,12 +101,13 @@ if Code.ensure_loaded?(PromEx) do labels, :producer, @producer_event_metrics, - buckets + buckets, + aliases ) ) end - def consumer_event_metrics(metric_prefix, labels, buckets) do + def consumer_event_metrics(metric_prefix, labels, buckets, aliases) do consumer_metrics_tags = Map.keys(labels) ++ [:topic, :partition, :topic_alias] buckets = Keyword.get(buckets, :consumer_buckets, @default_consumer_buckets) @@ -100,14 +119,13 @@ if Code.ensure_loaded?(PromEx) do labels, :consumer, @consumer_event_metrics, - buckets + buckets, + aliases ) ) end - defp build_kafka_metrics(metric_prefix, metrics_tags, labels, name, event_name, buckets) do - aliases = Application.get_env(:kafka_batcher, :kafka_topic_aliases, %{}) - + defp build_kafka_metrics(metric_prefix, metrics_tags, labels, name, event_name, buckets, aliases) do [ distribution( metric_prefix ++ [name, :duration, :seconds], diff --git a/lib/kafka_batcher/supervisor.ex b/lib/kafka_batcher/supervisor.ex index 2a7c2fc..c318587 100644 --- a/lib/kafka_batcher/supervisor.ex +++ b/lib/kafka_batcher/supervisor.ex @@ -4,16 +4,65 @@ defmodule KafkaBatcher.Supervisor do Starts Collector & AccumulatorsPoolSupervisor for each configured collector """ + alias KafkaBatcher.{AccumulatorsPoolSupervisor, Collector} + use Supervisor - def start_link(args) do - Supervisor.start_link(__MODULE__, args, name: __MODULE__) + @spec child_spec( + KafkaBatcher.Config.t() + | Keyword.t() + | nil + ) :: Supervisor.child_spec() + def child_spec(%KafkaBatcher.Config{} = config) do + %{id: reg_name(config), start: {__MODULE__, :start_link, [config]}} + end + + def child_spec(opts) when is_nil(opts) or is_list(opts) do + opts = + if is_nil(opts) or opts == [] do + # Backwards compatibility + Application.get_all_env(:kafka_batcher) + else + opts + end + + opts |> KafkaBatcher.Config.build_config!() |> child_spec() + end + + @spec start_link(KafkaBatcher.Config.t()) :: Supervisor.on_start() + def start_link(%KafkaBatcher.Config{} = config) do + Supervisor.start_link(__MODULE__, config, name: reg_name(config)) end - def init(_args) do - children = KafkaBatcher.Config.collectors_spec() + @impl true + def init(%KafkaBatcher.Config{} = config) do + children = + [ + KafkaBatcher.ConnectionManager.child_spec(config) + | build_data_stream_specs(config.data_stream_specs) + ] + |> Enum.reverse() opts = [strategy: :one_for_one] Supervisor.init(children, opts) end + + defp build_data_stream_specs(specs) do + for %KafkaBatcher.DataStreamSpec{} = spec <- specs, reduce: [] do + specs -> + %KafkaBatcher.DataStreamSpec{ + collector_config: %Collector.Config{collector_mod: collector_mod} + } = spec + + [ + collector_mod.child_spec(spec), + AccumulatorsPoolSupervisor.child_spec(spec) + | specs + ] + end + end + + defp reg_name(%KafkaBatcher.Config{} = config) do + :"#{__MODULE__}.#{KafkaBatcher.Config.get_client_name(config)}" + end end diff --git a/lib/kafka_batcher/temp_storage.ex b/lib/kafka_batcher/temp_storage.ex index 3521237..b0f2c96 100644 --- a/lib/kafka_batcher/temp_storage.ex +++ b/lib/kafka_batcher/temp_storage.ex @@ -26,7 +26,9 @@ defmodule KafkaBatcher.TempStorage do end end - defp recheck_and_update(%CollectorState{topic_name: topic, locked?: true} = state, now) do + defp recheck_and_update(%CollectorState{locked?: true} = state, now) do + topic = KafkaBatcher.DataStreamSpec.get_topic_name(state.data_stream_spec) + if @storage_impl.empty?(topic) do %CollectorState{state | locked?: false, last_check_timestamp: nil} else diff --git a/test/support/producers/test_producer.ex b/test/support/producers/test_producer.ex index 9851608..efc114e 100644 --- a/test/support/producers/test_producer.ex +++ b/test/support/producers/test_producer.ex @@ -27,24 +27,24 @@ defmodule KafkaBatcher.Producers.TestProducer do } @impl true - def start_client do - process_callback(%{action: :start_client}, {:ok, self()}) + def start_client(config) do + process_callback(%{action: :start_client, parameters: config}, {:ok, self()}) end @impl true - def start_producer(topic_name, config) do - process_callback(%{action: :start_producer, parameters: {topic_name, config}}, :ok) + def start_producer(config, topic_name) do + process_callback(%{action: :start_producer, parameters: {config, topic_name}}, :ok) end @impl true - def get_partitions_count(topic_name) do + def get_partitions_count(config, topic_name) do response = {:ok, @partition_counts[topic_name]} - process_callback(%{action: :get_partitions_count, parameters: topic_name}, response) + process_callback(%{action: :get_partitions_count, parameters: {config, topic_name}}, response) end @impl true - def do_produce(messages, topic, partition, config) do - process_callback(%{action: :do_produce, parameters: {messages, topic, partition, config}}, :ok) + def do_produce(config, messages, topic, partition) do + process_callback(%{action: :do_produce, parameters: {config, messages, topic, partition}}, :ok) end def topic_name(idx) when idx >= 1 and idx <= 8, do: "topic#{idx}"