Skip to content

Commit 94d4a33

Browse files
authored
Adds Kaffe.MessageHandler behaviour (#154)
This should make implementation easier, and also makes the contract a little more clear. It will also give better warnings in implementing code if (as an example) the wrong return type is possible. Also fixes some doc formatting.
1 parent 01923eb commit 94d4a33

File tree

6 files changed

+88
-33
lines changed

6 files changed

+88
-33
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
* Configures CI to run on pull request.
88

9+
* Add `Kaffe.MessageHandler` behaviour. To utilize it, add the behaviour to your configured `message_handler` and `@impl Kaffe.MessageHandler` on `handle_messages/1`.
10+
911
### Fixes
1012

1113
* Stops compiler warnings on duplicate doc definitions

README.md

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,7 @@ An opinionated, highly specific, Elixir wrapper around [Brod](https://github.com
3939
end
4040
```
4141

42-
2. Ensure `kaffe` is started with your application:
43-
44-
```elixir
45-
def application do
46-
[applications: [:logger, :kaffe]]
47-
end
48-
```
49-
50-
3. Configure a Kaffe Consumer and/or Producer
42+
2. Configure a Kaffe Consumer and/or Producer
5143

5244
## Kaffe Consumer Usage
5345

@@ -61,14 +53,15 @@ There is also legacy support for single message consumers, which process one mes
6153

6254
### Kaffe GroupMember - Batch Message Consumer
6355

64-
1. Define a `handle_messages/1` function in the provided module.
65-
66-
`handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
56+
1. Define a `handle_messages/1` function in the provided module implementing the `Kaffe.MessageHandler` behaviour.
6757

68-
The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`.
58+
`handle_messages/1` will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
6959

7060
```elixir
7161
defmodule MessageProcessor do
62+
@behaviour Kaffe.MessageHandler
63+
64+
@impl Kaffe.MessageHandler
7265
def handle_messages(messages) do
7366
for %{key: key, value: value} = message <- messages do
7467
IO.inspect message
@@ -154,6 +147,9 @@ Example:
154147
155148
```elixir
156149
defmodule MessageProcessor do
150+
@behaviour Kaffe.MessageHandler
151+
152+
@impl Kaffe.MessageHandler
157153
def handle_messages(messages) do
158154
for %{key: key, value: value} = message <- messages do
159155
IO.inspect message

lib/kaffe/consumer.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ defmodule Kaffe.Consumer do
1818
@kafka Application.compile_env(:kaffe, :kafka_mod, :brod)
1919
@group_subscriber Application.compile_env(:kaffe, :group_subscriber_mod, :brod_group_subscriber)
2020

21+
# See kafka_message in "brod/include/brod.hrl"
22+
@type message() :: %{
23+
key: binary(),
24+
value: binary(),
25+
topic: binary(),
26+
offset: non_neg_integer(),
27+
partition: non_neg_integer(),
28+
ts: non_neg_integer(),
29+
ts_type: :undefined | :create | :append,
30+
headers: list()
31+
}
32+
2133
require Record
2234
import Record, only: [defrecord: 2, extract: 2]
2335
defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl")

lib/kaffe/consumer_group/subscriber/subscriber.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ defmodule Kaffe.Subscriber do
1212
1313
The subscriber reads the following options out of the configuration:
1414
15-
- `max_bytes` - The maximum number of message bytes to receive in a batch
16-
- `min_bytes` - The minimum number of message bytes to receive in a batch
17-
- `max_wait_time` - Maximum number of milliseconds broker will wait for `:min_bytes` of messages
18-
to be collected
19-
- `offset_reset_policy` - The native `auto.offset.reset` option,
20-
either `:reset_to_earliest` or `:reset_to_latest`.
15+
- `max_bytes` - The maximum number of message bytes to receive in a batch
16+
- `min_bytes` - The minimum number of message bytes to receive in a batch
17+
- `max_wait_time` - Maximum number of milliseconds broker will wait for `:min_bytes` of messages
18+
to be collected
19+
- `offset_reset_policy` - The native `auto.offset.reset` option,
20+
either `:reset_to_earliest` or `:reset_to_latest`.
2121
2222
See: https://github.com/klarna/brucke/blob/master/src/brucke_member.erl
2323
Also: https://github.com/klarna/brod/blob/master/src/brod_consumer.erl

lib/kaffe/consumer_group/worker/worker.ex

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ defmodule Kaffe.Worker do
22
@moduledoc """
33
A worker receives messages for a single topic for a single partition.
44
5-
Processing the message set is delegated to the configured message handler. It's
6-
responsible for any error handling as well. The message handler must define a
7-
`handle_messages` function (*note* the pluralization!) to accept a list of messages.
8-
9-
The result of `handle_messages` is sent back to the subscriber. Additionally, the
10-
message handler should inform the subscriber on what to do with the offsets after
11-
processing the message set.
5+
Processing the message set is delegated to the configured message
6+
handler (See `Kaffe.MessageHandler`). The result of `handle_messages`
7+
is sent back to the subscriber. Additionally, the message handler should
8+
inform the subscriber on what to do with the offsets after processing the
9+
message set.
1210
"""
1311

1412
use GenServer
@@ -39,13 +37,11 @@ defmodule Kaffe.Worker do
3937
{:ok, %{message_handler: message_handler, worker_name: worker_name}}
4038
end
4139

42-
@doc """
43-
Entry point for processing a message set received by a subscriber.
44-
45-
Note that the response from the message handler is what dictates how a
46-
subscriber should deal with the message offset. Depending on the situation,
47-
a message processor may not want to have it's most recent offsets committed.
48-
"""
40+
# Entry point for processing a message set received by a subscriber.
41+
#
42+
# Note that the response from the message handler is what dictates how a
43+
# subscriber should deal with the message offset. Depending on the situation,
44+
# a message processor may not want to have it's most recent offsets committed.
4945
@impl GenServer
5046
def handle_cast(
5147
{:process_messages, subscriber_pid, topic, partition, generation_id, messages},

lib/kaffe/message_handler.ex

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
defmodule Kaffe.MessageHandler do
2+
@moduledoc """
3+
The behaviour for a message handler.
4+
5+
The module implementing this behaviour needs to be configured for the consumer
6+
under Kaffe config.
7+
8+
```
9+
config :kaffe,
10+
consumers: %{
11+
"subscriber_1" => [
12+
...
13+
message_handler: MyApp.MessageHandler
14+
]
15+
}
16+
```
17+
"""
18+
alias Kaffe.Consumer
19+
20+
@doc """
21+
The functionality responsible for handling the message set from Kaffe.
22+
23+
Each message will include the topic and partition in addition to the normal Kafka
24+
message metadata.
25+
26+
The response from the message handler is what dictates how a
27+
subscriber should deal with the message offset. Depending on the situation,
28+
a message processor may not want to have its most recent offsets committed.
29+
30+
In some cases you may not want to commit back the most recent offset after
31+
processing a list of messages. For example, if you're batching messages to be
32+
sent elsewhere and want to ensure that a batch can be rebuilt should there be
33+
an error further downstream. In that example you might want to keep the offset
34+
of the first message in your batch so your consumer can restart back at that point
35+
to reprocess and rebatch the messages.
36+
37+
The following returns are supported:
38+
- `:ok` - commit back the most recent offset and request more messages
39+
- `{:ok, :no_commit}` - do _not_ commit back the most recent offset and
40+
request more messages from the offset of the last message
41+
- `{:ok, offset}` - commit back at the offset specified and request
42+
messages from that point forward
43+
44+
Because the return types are only success based, the message handler needs
45+
to handle errors.
46+
"""
47+
@callback handle_messages(messages :: list(Consumer.message())) ::
48+
:ok | {:ok, :no_commit} | {:ok, offset :: :brod.offset()}
49+
end

0 commit comments

Comments
 (0)