Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
name: Linting & Tests

# Define workflow that runs when changes are pushed to the
# `master` branch or pushed to a PR branch that targets the `master`
# branch.
on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]

# Sets the ENV `MIX_ENV` to `test` for running tests
env:
MIX_ENV: test

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-22.04
name: Test on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}}
strategy:
# Specify the OTP and Elixir versions to use when building
# and running the workflow steps.
matrix:
otp: ['25.0.4', '27.2'] # Define the OTP version [required]
elixir: ['1.14.1', '1.18.4'] # Define the elixir version [required]
exclude:
- otp: '27.2'
elixir: '1.14.1'
steps:
# Step: Setup Elixir + Erlang image as the base.
- name: Set up Elixir
uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.otp}}
elixir-version: ${{matrix.elixir}}

# Step: Check out the code.
- name: Checkout code
uses: actions/checkout@v4

# Step: Define how to cache deps. Restores existing cache if present.
- name: Cache deps
id: cache-deps
uses: actions/cache@v4
env:
cache-name: cache-elixir-deps
with:
path: deps
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-${{ env.cache-name }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-${{ env.cache-name }}

# Step: Define how to cache the `_build` directory. After the first run,
# this speeds up test runs. This includes not re-compiling our deps every run.
- name: Cache compiled build
id: cache-build
uses: actions/cache@v4
env:
cache-name: cache-compiled-build
with:
path: _build
key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-${{ env.cache-name }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-${{ env.cache-name }}-
${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-mix-

# Step: Conditionally bust the cache when job is re-run.
# Sometimes, we may have issues with incremental builds that are fixed by
# doing a full recompile. In order to not waste dev time on such trivial
# issues (while also reaping the time savings of incremental builds for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is helpful as heck!

# *most* day-to-day development), force a full recompile only on builds
# that are retried.
- name: Clean to rule out incremental build as a source of flakiness
if: github.run_attempt != '1'
run: |
mix deps.clean --all
mix clean
shell: sh

# Step: Download project dependencies. If unchanged, uses
# the cached version.
- name: Install dependencies
run: mix deps.get

# Step: Compile the project treating any warnings as errors.
# Customize this step if a different behavior is desired.
- name: Compiles without warnings
run: mix compile --warnings-as-errors

# Step: Check that the checked in code has already been formatted.
# This step fails if something was found unformatted.
# Customize this step as desired.
- name: Check Formatting
run: mix format --check-formatted

# Step: Execute the tests.
- name: Run tests
run: mix test
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
# 1.28.0

### Enhancements

* Allow `auto_start_producers` and `allow_topic_auto_creation` to be configurable for brod clients. If configuration of either of these values is desired, update your producer or consumer configs.

* Configures CI to run on pull request.

### Fixes

* Stops compiler warnings on duplicate doc definitions
* Fix doc formatting and typos

# 1.27.2

* Relax `:retry` requirement
Expand Down
10 changes: 5 additions & 5 deletions lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ defmodule Kaffe.Config.Consumer do
option, [`auto.offset.reset`](https://kafka.apache.org/documentation/#newconsumerconfigs). Valid values for
this option are:

* `:reset_to_earliest` Reset to the earliest available offset.
* `:reset_to_latest` Reset to the latest offset.
* `:reset_by_subscriber` The subscriber receives the `OffsetOutOfRange` error.
* `:reset_to_earliest` Reset to the earliest available offset.
* `:reset_to_latest` Reset to the latest offset.
* `:reset_by_subscriber` The subscriber receives the `OffsetOutOfRange` error.

More information in the [Brod consumer](https://github.com/klarna/brod/blob/master/src/brod_consumer.erl).

* `:worker_allocation_strategy` Controls how workers are allocated with respect to consumed topics and partitions.

* `:worker_per_partition` The default (for backward compatibilty) and allocates a single worker per partition
* `:worker_per_partition` The default (for backward compatibilty) and allocates a single worker per partition
across topics. This is useful for managing concurrent processing of messages that may be received from any
consumed topic.

* `:worker_per_topic_partition` This strategy allocates a worker per topic partition. This means there will be
* `:worker_per_topic_partition` This strategy allocates a worker per topic partition. This means there will be
a worker for every topic partition consumed. Unless you need to control concurrency across topics, you should
use this strategy.
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/kaffe/config/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Kaffe.Config.Producer do
* -1: If it is -1 the broker will block until the message is committed by all in sync replicas before acking.

* `:ack_timeout` Maximum time in milliseconds the broker can await the receipt of the number of
acknowledgements in `required_acks'. The timeout is not an exact limit on the request time. Defaults to 1000.
acknowledgements in `required_acks`. The timeout is not an exact limit on the request time. Defaults to 1000.
See `brod` for more details.

* `:partition_buffer_limit` How many requests (per-partition) can be buffered without blocking the caller.
Expand Down
25 changes: 8 additions & 17 deletions lib/kaffe/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ defmodule Kaffe.Producer do

`messages` must be a list of type `message()` or `message_object()`

Alternatively, synchronously produce the given `key`/`value` to the first Kafka topic.

This is a simpler way to produce if you've only given Producer a single topic
for production and don't want to specify the topic for each call.

Returns:

* `:ok` on successfully producing each message
Expand All @@ -70,17 +75,6 @@ defmodule Kaffe.Producer do
produce_list(topic, message_list, global_partition_strategy())
end

@doc """
Synchronously produce the given `key`/`value` to the first Kafka topic.

This is a simpler way to produce if you've only given Producer a single topic
for production and don't want to specify the topic for each call.

Returns:

* `:ok` on successfully producing the message
* `{:error, reason}` for any error
"""
def produce_sync(key, value) do
topic = config().topics |> List.first()
produce_value(topic, key, value)
Expand All @@ -89,7 +83,9 @@ defmodule Kaffe.Producer do
@doc """
Synchronously produce the `message_list` to `topic`/`partition`

`message_list` must be a list of type `message()` or `message_type()`
`message_list` must be a list of type `message()` or `message_object()`

Alternatively, synchronously produce the `key`/`value` to `topic`

Returns:

Expand All @@ -100,11 +96,6 @@ defmodule Kaffe.Producer do
produce_list(topic, message_list, fn _, _, _, _ -> partition end)
end

@doc """
Synchronously produce the `key`/`value` to `topic`

See `produce_sync/2` for returns.
"""
def produce_sync(topic, key, value) do
produce_value(topic, key, value)
end
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ defmodule Kaffe.Mixfile do
[
extras: [
"README.md": [title: "Overview"],
"LICENSE.md": [title: "License"]
"LICENSE.md": [title: "License"],
"CHANGELOG.md": [title: "Changelog"]
],
main: "readme",
source_url: @source_url,
Expand Down