Skip to content

Commit 8489b1c

Browse files
authored
Convert config to keyword (#160)
Map based config doesn't automatically merge config values, which means that each environment, if changing a consumer, must define the _entire_ consumer's config.
1 parent 94d4a33 commit 8489b1c

File tree

10 files changed

+81
-65
lines changed

10 files changed

+81
-65
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# 2.0.0
2+
3+
### Breaking Changes
4+
5+
* Allow keyword configuration for subscribers. Note, keywords require atom keys, so if your current version of `kaffe` is 1.27.0 or higher, adopting to the keyword subscribers is a breaking (and highly encouraged) change.
6+
17
# 1.28.0
28

39
### Enhancements

README.md

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ There is also legacy support for single message consumers, which process one mes
7676

7777
```elixir
7878
config :kaffe,
79-
consumers: %{
80-
"subscriber_1" => [
79+
consumers: [
80+
subscriber_1: [
8181
endpoints: [kafka: 9092],
8282
topics: ["interesting-topic"],
8383
consumer_group: "your-app-consumer-group",
@@ -95,7 +95,7 @@ There is also legacy support for single message consumers, which process one mes
9595
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
9696
}
9797
],
98-
"subscriber_2" => [
98+
subscriber_2: [
9999
endpoints: [kafka: 9092],
100100
topics: ["topic-2"],
101101
consumer_group: "your-app-consumer-group",
@@ -104,7 +104,7 @@ There is also legacy support for single message consumers, which process one mes
104104
max_bytes: 50_000,
105105
worker_allocation_strategy: :worker_per_topic_partition
106106
]
107-
}
107+
]
108108
```
109109

110110
3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree.
@@ -117,12 +117,12 @@ There is also legacy support for single message consumers, which process one mes
117117
children = [
118118
%{
119119
id: Kaffe.GroupMemberSupervisor.Subscriber1,
120-
start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_1"]},
120+
start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_1]},
121121
type: :supervisor
122122
},
123123
%{
124124
id: Kaffe.GroupMemberSupervisor.Subscriber2,
125-
start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_2"]},
125+
start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_2]},
126126
type: :supervisor
127127
}
128128
]
@@ -201,16 +201,18 @@ _For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended i
201201

202202
```elixir
203203
config :kaffe,
204-
consumer: [
205-
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
206-
topics: ["interesting-topic"], # the topic(s) that will be consumed
207-
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
208-
message_handler: MessageProcessor, # the module from Step 1 that will process messages
209-
210-
# optional
211-
async_message_ack: false, # see "async message acknowledgement" below
212-
start_with_earliest_message: true # default false
213-
],
204+
consumers: [
205+
subscriber_1: [
206+
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
207+
topics: ["interesting-topic"], # the topic(s) that will be consumed
208+
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
209+
message_handler: MessageProcessor, # the module from Step 1 that will process messages
210+
211+
# optional
212+
async_message_ack: false, # see "async message acknowledgement" below
213+
start_with_earliest_message: true # default false
214+
]
215+
]
214216
```
215217

216218
The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.
@@ -221,11 +223,13 @@ _For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended i
221223

222224
```elixir
223225
config :kaffe,
224-
consumer: [
225-
heroku_kafka_env: true,
226-
topics: ["interesting-topic"],
227-
consumer_group: "your-app-consumer-group",
228-
message_handler: MessageProcessor
226+
consumers: [
227+
subscriber_1: [
228+
heroku_kafka_env: true,
229+
topics: ["interesting-topic"],
230+
consumer_group: "your-app-consumer-group",
231+
message_handler: MessageProcessor
232+
]
229233
]
230234
```
231235

config/test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ config :kaffe,
44
kafka_mod: TestBrod,
55
group_subscriber_mod: TestBrodGroupSubscriber,
66
test_partition_count: 32,
7-
consumers: %{
8-
"subscriber_name" => [
7+
consumers: [
8+
subscriber_name: [
99
endpoints: [kafka: 9092],
1010
topics: ["kaffe-test"],
1111
consumer_group: "kaffe-test-group",
@@ -24,7 +24,7 @@ config :kaffe,
2424
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
2525
}
2626
]
27-
},
27+
],
2828
producer: [
2929
endpoints: [kafka: 9092],
3030
topics: ["kaffe-test"],

lib/kaffe/config/consumer.ex

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,11 @@ defmodule Kaffe.Config.Consumer do
108108

109109
def consumer_group(config_key), do: config_get!(config_key, :consumer_group)
110110

111-
def subscriber_name(config_key),
112-
do: config_get(config_key, :subscriber_name, consumer_group(config_key)) |> String.to_atom()
111+
def subscriber_name(config_key) do
112+
config_key
113+
|> config_get!(:subscriber_name)
114+
|> to_atom()
115+
end
113116

114117
def topics(config_key), do: config_get!(config_key, :topics)
115118

@@ -215,34 +218,37 @@ defmodule Kaffe.Config.Consumer do
215218

216219
def config_get!(config_key, key) do
217220
Application.get_env(:kaffe, :consumers)
218-
|> Map.get(config_key)
221+
|> Access.get(config_key)
219222
|> Keyword.fetch!(key)
220223
end
221224

222225
def config_get(config_key, :subscriber_name, _default), do: config_key
223226

224227
def config_get(config_key, key, default) do
225228
Application.get_env(:kaffe, :consumers)
226-
|> Map.get(config_key)
229+
|> Access.get(config_key)
227230
|> Keyword.get(key, default)
228231
end
229232

230233
def validate_configuration!() do
231234
if Application.get_env(:kaffe, :consumers) == nil do
232235
old_config = Application.get_env(:kaffe, :consumer) || []
233-
subscriber_name = old_config |> Keyword.get(:subscriber_name, "subscriber_name")
236+
subscriber_name = old_config |> Keyword.get(:subscriber_name, :subscriber_name)
234237

235238
raise("""
236239
UPDATE CONSUMERS CONFIG:
237240
238-
Set :kaffe, :consumers to a map with subscriber names as keys and config as values.
241+
Set :kaffe, :consumers to a keyword list with subscriber names as keys and config as values.
239242
For example:
240243
241244
config :kaffe,
242-
consumers: %{
245+
consumers: [
243246
#{inspect(subscriber_name)} => #{inspect(old_config)}
244-
}
247+
]
245248
""")
246249
end
247250
end
251+
252+
defp to_atom(val) when is_atom(val), do: val
253+
defp to_atom(val) when is_binary(val), do: String.to_atom(val)
248254
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule Kaffe.Mixfile do
22
use Mix.Project
33

44
@source_url "https://github.com/spreedly/kaffe"
5-
@version "1.28.0"
5+
@version "2.0.0"
66

77
def project do
88
[

test/kaffe/config/consumer_test.exs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ defmodule Kaffe.Config.ConsumerTest do
44
def change_config(subscriber_name, update_fn) do
55
config = Application.get_env(:kaffe, :consumers)[subscriber_name]
66
config = update_fn.(config)
7-
Application.put_env(:kaffe, :consumers, %{subscriber_name => config})
7+
Application.put_env(:kaffe, :consumers, Keyword.new([{subscriber_name, config}]))
88
end
99

1010
describe "configuration/1" do
1111
setup do
12-
change_config("subscriber_name", fn config ->
12+
change_config(:subscriber_name, fn config ->
1313
config
1414
|> Keyword.delete(:offset_reset_policy)
1515
|> Keyword.delete(:ssl)
@@ -18,9 +18,9 @@ defmodule Kaffe.Config.ConsumerTest do
1818
end
1919

2020
test "correct settings are extracted" do
21-
sasl = Kaffe.Config.Consumer.config_get!("subscriber_name", :sasl)
21+
sasl = Kaffe.Config.Consumer.config_get!(:subscriber_name, :sasl)
2222

23-
change_config("subscriber_name", fn config ->
23+
change_config(:subscriber_name, fn config ->
2424
config |> Keyword.delete(:sasl)
2525
end)
2626

@@ -52,18 +52,18 @@ defmodule Kaffe.Config.ConsumerTest do
5252
}
5353

5454
on_exit(fn ->
55-
change_config("subscriber_name", fn config ->
55+
change_config(:subscriber_name, fn config ->
5656
Keyword.put(config, :sasl, sasl)
5757
end)
5858
end)
5959

60-
assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
60+
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
6161
end
6262

6363
test "string endpoints parsed correctly" do
64-
endpoints = Kaffe.Config.Consumer.config_get!("subscriber_name", :endpoints)
64+
endpoints = Kaffe.Config.Consumer.config_get!(:subscriber_name, :endpoints)
6565

66-
change_config("subscriber_name", fn config ->
66+
change_config(:subscriber_name, fn config ->
6767
config |> Keyword.put(:endpoints, "kafka:9092,localhost:9092")
6868
end)
6969

@@ -95,19 +95,19 @@ defmodule Kaffe.Config.ConsumerTest do
9595
}
9696

9797
on_exit(fn ->
98-
change_config("subscriber_name", fn config ->
98+
change_config(:subscriber_name, fn config ->
9999
Keyword.put(config, :endpoints, endpoints)
100100
end)
101101
end)
102102

103-
assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
103+
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
104104
end
105105
end
106106

107107
test "correct settings with sasl plain are extracted" do
108-
sasl = Kaffe.Config.Consumer.config_get!("subscriber_name", :sasl)
108+
sasl = Kaffe.Config.Consumer.config_get!(:subscriber_name, :sasl)
109109

110-
change_config("subscriber_name", fn config ->
110+
change_config(:subscriber_name, fn config ->
111111
Keyword.put(config, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"})
112112
end)
113113

@@ -140,18 +140,18 @@ defmodule Kaffe.Config.ConsumerTest do
140140
}
141141

142142
on_exit(fn ->
143-
change_config("subscriber_name", fn config ->
143+
change_config(:subscriber_name, fn config ->
144144
Keyword.put(config, :sasl, sasl)
145145
end)
146146
end)
147147

148-
assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
148+
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
149149
end
150150

151151
test "correct settings with ssl are extracted" do
152-
ssl = Kaffe.Config.Consumer.config_get("subscriber_name", :ssl, false)
152+
ssl = Kaffe.Config.Consumer.config_get(:subscriber_name, :ssl, false)
153153

154-
change_config("subscriber_name", fn config ->
154+
change_config(:subscriber_name, fn config ->
155155
Keyword.put(config, :ssl, true)
156156
end)
157157

@@ -184,21 +184,21 @@ defmodule Kaffe.Config.ConsumerTest do
184184
}
185185

186186
on_exit(fn ->
187-
change_config("subscriber_name", fn config ->
187+
change_config(:subscriber_name, fn config ->
188188
Keyword.put(config, :ssl, ssl)
189189
end)
190190
end)
191191

192-
assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
192+
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
193193
end
194194

195195
describe "offset_reset_policy" do
196196
test "computes correctly from start_with_earliest_message == true" do
197-
change_config("subscriber_name", fn config ->
197+
change_config(:subscriber_name, fn config ->
198198
config |> Keyword.delete(:offset_reset_policy)
199199
end)
200200

201-
assert Kaffe.Config.Consumer.configuration("subscriber_name").offset_reset_policy == :reset_by_subscriber
201+
assert Kaffe.Config.Consumer.configuration(:subscriber_name).offset_reset_policy == :reset_by_subscriber
202202
end
203203
end
204204
end

test/kaffe/consumer_group/group_manager_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ defmodule Kaffe.GroupManagerTest do
3737

3838
test "subscribe from config" do
3939
Process.register(self(), :test_case)
40-
config = Kaffe.Config.Consumer.configuration("subscriber_name")
40+
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
4141
{:ok, _group_manager_pid} = GroupManager.start_link(config)
4242

4343
:timer.sleep(config.rebalance_delay_ms)
@@ -52,7 +52,7 @@ defmodule Kaffe.GroupManagerTest do
5252

5353
test "subscribe to topics dynamically" do
5454
Process.register(self(), :test_case)
55-
config = Kaffe.Config.Consumer.configuration("subscriber_name")
55+
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
5656
{:ok, _group_manager_pid} = GroupManager.start_link(config)
5757

5858
:timer.sleep(config.rebalance_delay_ms)
@@ -74,7 +74,7 @@ defmodule Kaffe.GroupManagerTest do
7474

7575
test "duplicate topic subscription does nothing" do
7676
Process.register(self(), :test_case)
77-
config = Kaffe.Config.Consumer.configuration("subscriber_name")
77+
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
7878
{:ok, _group_manager_pid} = GroupManager.start_link(config)
7979

8080
:timer.sleep(config.rebalance_delay_ms)

test/kaffe/consumer_group/subscriber/group_member_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ defmodule Kaffe.GroupMemberTest do
4444

4545
test "handle assignments_received" do
4646
Process.register(self(), :test_case)
47-
config = Kaffe.Config.Consumer.configuration("subscriber_name")
47+
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
4848
{:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config)
4949

5050
GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}])
@@ -59,7 +59,7 @@ defmodule Kaffe.GroupMemberTest do
5959

6060
test "handle assignments_revoked" do
6161
Process.register(self(), :test_case)
62-
config = Kaffe.Config.Consumer.configuration("subscriber_name")
62+
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
6363
{:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config)
6464

6565
GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}])
@@ -77,7 +77,7 @@ defmodule Kaffe.GroupMemberTest do
7777

7878
test "handle assignments_received without assignments_revoked" do
7979
Process.register(self(), :test_case)
80-
config = Kaffe.Config.Consumer.configuration("subscriber_name")
80+
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
8181
{:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config)
8282

8383
GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}])

0 commit comments

Comments
 (0)