Skip to content

Commit bfdee3a

Browse files
author
Gleb Ivanov
committed
Move default client name to a variable and add client_name doc
1 parent d9ae0d7 commit bfdee3a

File tree

3 files changed

+42
-28
lines changed

3 files changed

+42
-28
lines changed

README.md

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ A library to increase the throughput of producing messages (coming one at a time
2323
def start(_type, _args) do
2424
children = [
2525
# Describe the child spec
26-
KafkaBatcher.Supervisor
26+
{KafkaBatcher.Supervisor, Application.fetch_env!(:kafka_batcher, :default_client)}
2727
]
2828

2929
opts = [strategy: :one_for_one, name: MyApp.Supervisor, max_restarts: 3, max_seconds: 5]
@@ -36,32 +36,36 @@ A library to increase the throughput of producing messages (coming one at a time
3636
Config example:
3737

3838
```elixir
39-
config :kafka_batcher, KafkaBatcher.Collector1, topic_name: "topic1"
40-
config :kafka_batcher, KafkaBatcher.Collector2, topic_name: "topic2"
41-
config :kafka_batcher, KafkaBatcher.Collector3, topic_name: "topic3"
42-
config :kafka_batcher, KafkaBatcher.Collector4, topic_name: "topic4"
43-
config :kafka_batcher, KafkaBatcher.Collector5, topic_name: "topic5"
44-
45-
config :kafka_batcher, collectors:
46-
[
47-
KafkaBatcher.Collector1,
48-
KafkaBatcher.Collector2,
49-
KafkaBatcher.Collector3,
50-
KafkaBatcher.Collector4,
51-
KafkaBatcher.Collector5
52-
]
39+
config :kafka_batcher, :default_client, [
40+
{KafkaBatcher.Collector1, [topic_name: "topic1"]},
41+
{KafkaBatcher.Collector2, [topic_name: "topic2"]},
42+
{KafkaBatcher.Collector3, [topic_name: "topic3"]},
43+
{KafkaBatcher.Collector4, [topic_name: "topic4"]},
44+
{KafkaBatcher.Collector5, [topic_name: "topic5"]}
45+
]
5346

54-
config :kafka_batcher, :kafka,
55-
endpoints: "localhost:9092",
56-
# in case you use SASL
57-
# sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"},
58-
# ssl: true,
59-
telemetry: true,
60-
allow_topic_auto_creation: false,
47+
config :kafka_batcher, :default_client,
48+
collectors: [
49+
KafkaBatcher.Collector1,
50+
KafkaBatcher.Collector2,
51+
KafkaBatcher.Collector3,
52+
KafkaBatcher.Collector4,
53+
KafkaBatcher.Collector5
54+
],
55+
kafka: [
56+
client_name: :default_client,
57+
endpoints: "localhost:9092",
58+
# in case you use SASL
59+
# sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"},
60+
# ssl: true,
61+
telemetry: true,
62+
allow_topic_auto_creation: false
63+
],
6164
kafka_topic_aliases: %{
6265
"real_topic_name1" => "incoming-events",
6366
"real_topic_name2" => "special-topic"
64-
}
67+
},
68+
kafka_metric_opts: []
6569

6670
# In case you use KafkaEx, you need to disable default worker to avoid crashes
6771
config :kafka_ex, :disable_default_worker, true
@@ -88,6 +92,7 @@ Available parameters:
8892
* `:ssl` - optional parameter. Ssl should be type boolean(). By default `:ssl` is `false`.
8993
* `:min_delay` - optional parameter. Set minimal delay before send events. This parameter allows to increase max throughput in case when you get more messages (in term of count per second) than you expected when set `batch_size` parameter.
9094
* `:max_batch_bytesize` - optional parameter. Allows to set a limit on the maximum batch size. By default it is 1_000_000 bytes.
95+
* `:client_name` - client name that is used to start a producer. Included in metrics namespace. Defaults to `:kafka_producer_client`.
9196

9297
**Important:** The size of one message should not exceed `max_batch_bytesize` setting. If you need to work with large messages you must increase `max_batch_bytesize` value and value of Kafka topic setting `max.message.bytes` as well.
9398

lib/kafka_batcher/connection_manager.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ defmodule KafkaBatcher.ConnectionManager do
4848
end
4949

5050
@doc "Checks that Kafka client is already started"
51-
@spec client_started?(Producers.Config.t()) :: boolean()
52-
def client_started?(%Producers.Config{} = producer_config) do
53-
GenServer.call(reg_name(producer_config), :client_started?)
51+
@spec client_started?(client_name :: atom()) :: boolean()
52+
def client_started?(client_name \\ Producers.Config.default_client_name()) do
53+
GenServer.call(reg_name(client_name), :client_started?)
5454
end
5555

5656
##
@@ -190,6 +190,10 @@ defmodule KafkaBatcher.ConnectionManager do
190190
end
191191

192192
defp reg_name(%Producers.Config{} = producer_config) do
193-
:"#{__MODULE__}.#{producer_config.client_name}"
193+
reg_name(producer_config.client_name)
194+
end
195+
196+
defp reg_name(client_name) when is_atom(client_name) do
197+
:"#{__MODULE__}.#{client_name}"
194198
end
195199
end

lib/kafka_batcher/producers/config.ex

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ defmodule KafkaBatcher.Producers.Config do
2323
brod_config: BrodConfig.t()
2424
}
2525

26+
@default_client_name :kafka_producer_client
27+
2628
@enforce_keys [
2729
:endpoints,
2830
:client_name,
@@ -53,11 +55,14 @@ defmodule KafkaBatcher.Producers.Config do
5355
}
5456
end
5557

58+
@spec default_client_name :: :kafka_producer_client
59+
def default_client_name, do: @default_client_name
60+
5661
@spec build!(opts :: Keyword.t()) :: t()
5762
def build!(opts) do
5863
%__MODULE__{
5964
endpoints: build_endpoints!(opts),
60-
client_name: Keyword.get(opts, :client_name, :kafka_producer_client),
65+
client_name: Keyword.get(opts, :client_name, @default_client_name),
6166
partition_strategy: Keyword.get(opts, :partition_strategy),
6267
required_acks: Keyword.get(opts, :required_acks, -1),
6368
telemetry: Keyword.get(opts, :telemetry, true),

0 commit comments

Comments
 (0)