Skip to content

Commit d5d0bfd

Browse files
authored
[receiver/kafka]Protect partition lost/revoke callback against unassigned groups (open-telemetry#40921)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The `OnPartitionLost` function can be called without the group ever being assigned ([ref](https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnPartitionsLost)). However, the kafka receiver code didn't protect against this. The PR adds a check to protect against this case. Quoting from the linked docs for `OnPartitionLost`: ``` OnPartitionsLost sets the function to be called on "fatal" group errors, such as IllegalGeneration, UnknownMemberID, and authentication failures. This function differs from [OnPartitionsRevoked](https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnPartitionsRevoked) in that it is unlikely that commits will succeed when partitions are outright lost, whereas commits likely will succeed when revoking partitions. Because this function is called on any fatal group error, it is possible for this function to be called without the group ever being joined. ``` <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#40920 <!--Describe what testing was performed and which tests were added.--> #### Testing Since the test is a callback, I can't think of a good way to test it other than coding the exact scenario which I think would be too brittle. <!--Describe the documentation added.--> #### Documentation Added godoc comments for clarity. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent ea75b50 commit d5d0bfd

File tree

3 files changed

+72
-11
lines changed

3 files changed

+72
-11
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Protect partition lost/revoke callback against unassigned groups
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40920]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/consumer_franz.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -378,17 +378,25 @@ func (c *franzConsumer) lost(ctx context.Context, _ *kgo.Client,
378378
for topic, partitions := range lost {
379379
for _, partition := range partitions {
380380
tp := topicPartition{topic: topic, partition: partition}
381-
pc := c.assignments[tp]
382-
delete(c.assignments, tp)
383-
pc.cancel(errors.New(
384-
"stopping processing: partition reassigned or lost",
385-
))
386-
wg.Add(1)
387-
go func() {
388-
defer wg.Done()
389-
pc.wg.Wait()
390-
}()
391-
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(context.Background(), 1)
381+
// In some cases, it is possible for the `lost` to be called with
382+
// no assignments. So, check if assignments exists first.
383+
//
384+
// - OnPartitionLost can be called without the group ever joining
385+
// and getting assigned.
386+
// - OnPartitionRevoked can be called multiple times for cooperative
387+
// balancer on topic lost/deleted.
388+
if pc, ok := c.assignments[tp]; ok {
389+
delete(c.assignments, tp)
390+
pc.cancel(errors.New(
391+
"stopping processing: partition reassigned or lost",
392+
))
393+
wg.Add(1)
394+
go func() {
395+
defer wg.Done()
396+
pc.wg.Wait()
397+
}()
398+
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(context.Background(), 1)
399+
}
392400
}
393401
}
394402
if fatal {

receiver/kafkareceiver/consumer_franz_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,29 @@ func TestConsumerShutdownNotStarted(t *testing.T) {
243243
"kafka consumer: consumer isn't running")
244244
}
245245
}
246+
247+
func TestLost(t *testing.T) {
248+
// It is possible that lost is called multiple times for the same partition
249+
// or called with a topic/partition that hasn't been assigned. This test
250+
// ensures that `lost` works without error in both cases.
251+
_, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test"))
252+
settings, _, _ := mustNewSettings(t)
253+
254+
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
255+
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
256+
return nil
257+
}, nil
258+
}
259+
c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, consumeFn)
260+
require.NoError(t, err)
261+
require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
262+
defer func() { require.NoError(t, c.Shutdown(context.Background())) }()
263+
264+
// Call lost couple of times for same partition
265+
lostM := map[string][]int32{"test": {0}}
266+
c.lost(context.Background(), nil, lostM, false)
267+
c.lost(context.Background(), nil, lostM, false)
268+
269+
// Call lost for a topic and partition that was not assigned
270+
c.lost(context.Background(), nil, map[string][]int32{"404": {0}}, true)
271+
}

0 commit comments

Comments
 (0)