Skip to content

Commit 1c300a9

Browse files
authored
Merge branch 'main' into main
2 parents 7481e69 + d5d0bfd commit 1c300a9

File tree

3 files changed

+72
-11
lines changed

3 files changed

+72
-11
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Protect partition lost/revoke callback against unassigned groups
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40920]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/consumer_franz.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -378,17 +378,25 @@ func (c *franzConsumer) lost(ctx context.Context, _ *kgo.Client,
378378
for topic, partitions := range lost {
379379
for _, partition := range partitions {
380380
tp := topicPartition{topic: topic, partition: partition}
381-
pc := c.assignments[tp]
382-
delete(c.assignments, tp)
383-
pc.cancel(errors.New(
384-
"stopping processing: partition reassigned or lost",
385-
))
386-
wg.Add(1)
387-
go func() {
388-
defer wg.Done()
389-
pc.wg.Wait()
390-
}()
391-
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(context.Background(), 1)
381+
// In some cases, it is possible for the `lost` to be called with
382+
// no assignments. So, check if assignments exists first.
383+
//
384+
// - OnPartitionLost can be called without the group ever joining
385+
// and getting assigned.
386+
// - OnPartitionRevoked can be called multiple times for cooperative
387+
// balancer on topic lost/deleted.
388+
if pc, ok := c.assignments[tp]; ok {
389+
delete(c.assignments, tp)
390+
pc.cancel(errors.New(
391+
"stopping processing: partition reassigned or lost",
392+
))
393+
wg.Add(1)
394+
go func() {
395+
defer wg.Done()
396+
pc.wg.Wait()
397+
}()
398+
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(context.Background(), 1)
399+
}
392400
}
393401
}
394402
if fatal {

receiver/kafkareceiver/consumer_franz_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,29 @@ func TestConsumerShutdownNotStarted(t *testing.T) {
243243
"kafka consumer: consumer isn't running")
244244
}
245245
}
246+
247+
func TestLost(t *testing.T) {
248+
// It is possible that lost is called multiple times for the same partition
249+
// or called with a topic/partition that hasn't been assigned. This test
250+
// ensures that `lost` works without error in both cases.
251+
_, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test"))
252+
settings, _, _ := mustNewSettings(t)
253+
254+
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
255+
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
256+
return nil
257+
}, nil
258+
}
259+
c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, consumeFn)
260+
require.NoError(t, err)
261+
require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
262+
defer func() { require.NoError(t, c.Shutdown(context.Background())) }()
263+
264+
// Call lost couple of times for same partition
265+
lostM := map[string][]int32{"test": {0}}
266+
c.lost(context.Background(), nil, lostM, false)
267+
c.lost(context.Background(), nil, lostM, false)
268+
269+
// Call lost for a topic and partition that was not assigned
270+
c.lost(context.Background(), nil, map[string][]int32{"404": {0}}, true)
271+
}

0 commit comments

Comments
 (0)