Skip to content

Commit 8c66164

Browse files
authored
Add Process.set_label/1 to Broadway stages (#363)
Fixes #353 Signed-off-by: Yordis Prieto <[email protected]>
1 parent d32d30d commit 8c66164

File tree

10 files changed

+156
-3
lines changed

10 files changed

+156
-3
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ jobs:
1616
fail-fast: false
1717
matrix:
1818
include:
19-
# Earliest-supported versions.
20-
- elixir: "1.7.4"
21-
otp: "21.3.8.17"
19+
# Minimum supported versions on ubuntu-22.04.
20+
# OTP 24.2 is the earliest available on ubuntu-22.04.
21+
# Elixir 1.12 is the earliest version compatible with OTP 24.
22+
- elixir: "1.12"
23+
otp: "24.2"
2224

2325
# Latest versions.
2426
- elixir: "1.18"
@@ -34,6 +36,7 @@ jobs:
3436
with:
3537
otp-version: ${{matrix.otp}}
3638
elixir-version: ${{matrix.elixir}}
39+
version-type: strict
3740

3841
- name: Cache Mix dependencies
3942
uses: actions/cache@v3

lib/broadway/process.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
defmodule Broadway.Process do
2+
@moduledoc false
3+
4+
# TODO: Remove this module once we require Elixir 1.17+.
5+
# Process.set_label/1 was added in Elixir 1.17.0.
6+
7+
if function_exported?(Process, :set_label, 1) do
8+
def set_label(label) do
9+
Process.set_label(label)
10+
end
11+
12+
def labels_supported?, do: true
13+
else
14+
def set_label(_label) do
15+
:ok
16+
end
17+
18+
def labels_supported?, do: false
19+
end
20+
end

lib/broadway/topology/batch_processor_stage.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ defmodule Broadway.Topology.BatchProcessorStage do
2929
producer: args[:producer]
3030
}
3131

32+
Broadway.Process.set_label({:broadway_batch_processor, state.topology_name, state.partition})
33+
3234
{:consumer, state, []}
3335
end
3436

lib/broadway/topology/batcher_stage.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ defmodule Broadway.Topology.BatcherStage do
4545
context: args[:context]
4646
}
4747

48+
Broadway.Process.set_label({:broadway_batcher, state.topology_name, state.batcher})
49+
4850
{:producer_consumer, state, dispatcher: dispatcher}
4951
end
5052

lib/broadway/topology/processor_stage.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ defmodule Broadway.Topology.ProcessorStage do
3333
producer: args[:producer]
3434
}
3535

36+
Broadway.Process.set_label(
37+
{:broadway_processor, state.topology_name, state.processor_key, state.partition}
38+
)
39+
3640
case type do
3741
:consumer ->
3842
{:consumer, state, []}

lib/broadway/topology/producer_stage.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ defmodule Broadway.Topology.ProducerStage do
6161
rate_limiting: rate_limiting_state
6262
}
6363

64+
topology_name = args[:broadway][:name]
65+
Broadway.Process.set_label({:broadway_producer, topology_name, index})
66+
6467
case module.init(arg) do
6568
{:producer, module_state} ->
6669
{:producer, %{state | module_state: module_state}, dispatcher: dispatcher}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
defmodule Broadway.Topology.BatchProcessorStageTest do
2+
use ExUnit.Case, async: true
3+
4+
test "sets process label with topology name and partition" do
5+
topology_name = :test_topology
6+
partition = 3
7+
8+
{:ok, pid} =
9+
Broadway.Topology.BatchProcessorStage.start_link(
10+
[
11+
topology_name: topology_name,
12+
name: :test_batch_processor,
13+
module: __MODULE__,
14+
context: %{},
15+
terminator: __MODULE__,
16+
resubscribe: :never,
17+
partition: partition,
18+
batcher: :test_batcher,
19+
producer: nil
20+
],
21+
[]
22+
)
23+
24+
if Broadway.Process.labels_supported?() do
25+
label = Process.info(pid, :label)
26+
assert label == {:label, {:broadway_batch_processor, topology_name, partition}}
27+
else
28+
# Labels not supported in this Elixir version, skip assertion
29+
:ok
30+
end
31+
end
32+
end

test/broadway/topology/batcher_stage_test.exs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,34 @@ defmodule Broadway.Topology.BatcherStageTest do
2222
%{state: state} = :sys.get_state(pid)
2323
assert state.subscription_options[:max_demand] == 123
2424
end
25+
26+
test "sets process label with topology name and batcher key" do
27+
topology_name = :test_topology
28+
batcher_key = :my_batcher
29+
30+
{:ok, pid} =
31+
Broadway.Topology.BatcherStage.start_link(
32+
[
33+
topology_name: topology_name,
34+
name: :test_batcher,
35+
context: %{},
36+
terminator: __MODULE__,
37+
resubscribe: :never,
38+
batcher: batcher_key,
39+
processors: [:some_processor],
40+
batch_size: 10,
41+
batch_timeout: 1000,
42+
partition: batcher_key
43+
],
44+
[]
45+
)
46+
47+
if Broadway.Process.labels_supported?() do
48+
label = Process.info(pid, :label)
49+
assert label == {:label, {:broadway_batcher, topology_name, batcher_key}}
50+
else
51+
# Labels not supported in this Elixir version, skip assertion
52+
:ok
53+
end
54+
end
2555
end

test/broadway/topology/processor_stage_test.exs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,38 @@ defmodule Broadway.Topology.ProcessorStageTest do
2222
assert state.subscription_options[:min_demand] == 3
2323
assert state.subscription_options[:max_demand] == 6
2424
end
25+
26+
test "sets process label with topology name, processor key, and partition" do
27+
topology_name = :test_topology
28+
processor_key = :default
29+
partition = 5
30+
31+
{:ok, pid} =
32+
Broadway.Topology.ProcessorStage.start_link(
33+
[
34+
topology_name: topology_name,
35+
name: :test_processor,
36+
module: __MODULE__,
37+
context: %{},
38+
type: :consumer,
39+
terminator: __MODULE__,
40+
resubscribe: :never,
41+
processor_config: [min_demand: 1, max_demand: 10],
42+
processor_key: processor_key,
43+
producers: [:sample],
44+
partition: partition,
45+
batchers: :none,
46+
producer: nil
47+
],
48+
[]
49+
)
50+
51+
if Broadway.Process.labels_supported?() do
52+
label = Process.info(pid, :label)
53+
assert label == {:label, {:broadway_processor, topology_name, processor_key, partition}}
54+
else
55+
# Labels not supported in this Elixir version, skip assertion
56+
:ok
57+
end
58+
end
2559
end

test/broadway/topology/producer_stage_test.exs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,27 @@ defmodule Broadway.Topology.ProducerStageTest do
184184
assert ProducerStage.terminate(:normal, state) == :ok
185185
end
186186
end
187+
188+
test "sets process label with topology name and index" do
189+
topology_name = :test_topology
190+
index = 2
191+
192+
args = %{
193+
module: {FakeProducer, []},
194+
broadway: [name: topology_name, index: index],
195+
transformer: nil,
196+
dispatcher: GenStage.DemandDispatcher,
197+
rate_limiter: nil
198+
}
199+
200+
{:ok, pid} = ProducerStage.start_link(args, index)
201+
202+
if Broadway.Process.labels_supported?() do
203+
label = Process.info(pid, :label)
204+
assert label == {:label, {:broadway_producer, topology_name, index}}
205+
else
206+
# Labels not supported in this Elixir version, skip assertion
207+
:ok
208+
end
209+
end
187210
end

0 commit comments

Comments
 (0)