Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
func (input *kafkaInput) Test(ctx input.TestContext) error {
client, err := sarama.NewClient(input.config.Hosts, input.saramaConfig)
if err != nil {
ctx.Logger.Error(err)
return err
}
topics, err := client.Topics()
if err != nil {
Expand All @@ -100,7 +100,7 @@
}

if len(missingTopics) > 0 {
return fmt.Errorf("Of configured topics %v, topics: %v are not in available topics %v", input.config.Topics, missingTopics, topics)

Check failure on line 103 in filebeat/input/kafka/input.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

ST1005: error strings should not be capitalized (staticcheck)

Check failure on line 103 in filebeat/input/kafka/input.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

ST1005: error strings should not be capitalized (staticcheck)

Check failure on line 103 in filebeat/input/kafka/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1005: error strings should not be capitalized (staticcheck)
}

return nil
Expand Down Expand Up @@ -253,7 +253,7 @@
}

func (c channelCtx) Deadline() (deadline time.Time, ok bool) {
//nolint:nakedret // omitting the return gives a build error

Check failure on line 256 in filebeat/input/kafka/input.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

directive `//nolint:nakedret // omitting the return gives a build error` is unused for linter "nakedret" (nolintlint)

Check failure on line 256 in filebeat/input/kafka/input.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

directive `//nolint:nakedret // omitting the return gives a build error` is unused for linter "nakedret" (nolintlint)

Check failure on line 256 in filebeat/input/kafka/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

directive `//nolint:nakedret // omitting the return gives a build error` is unused for linter "nakedret" (nolintlint)
return
}

Expand Down
15 changes: 15 additions & 0 deletions filebeat/input/kafka/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ package kafka
import (
"testing"

"github.com/elastic/sarama"
"github.com/stretchr/testify/require"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/tests/resources"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -40,6 +42,19 @@ func TestNewInputDone(t *testing.T) {
AssertNotStartedInputCanBeDone(t, config)
}

// AssertNotStartedInputCanBeDone checks that the context of an input can be
// done before starting the input, and it doesn't leak goroutines. This is
// important to confirm that leaks don't happen with CheckConfig.
func TestTestNoPanicOnClientError(t *testing.T) {
Comment on lines +45 to +48
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// AssertNotStartedInputCanBeDone checks that the context of an input can be
// done before starting the input, and it doesn't leak goroutines. This is
// important to confirm that leaks don't happen with CheckConfig.
func TestTestNoPanicOnClientError(t *testing.T) {
// TestNoPanicOnClientError checks that the context of an input can be
// done before starting the input, and it doesn't leak goroutines. This is
// important to confirm that leaks don't happen with CheckConfig.
func TestNoPanicOnClientError(t *testing.T) {

k := &kafkaInput{
config: kafkaInputConfig{Hosts: []string{"127.0.0.1:1"}, Topics: []string{"topic-a"}},
saramaConfig: sarama.NewConfig(),
}

err := k.Test(input.TestContext{Logger: logp.NewLogger("kafka-repro")})
require.Error(t, err)
}

// AssertNotStartedInputCanBeDone checks that the context of an input can be
// done before starting the input, and it doesn't leak goroutines. This is
// important to confirm that leaks don't happen with CheckConfig.
Expand Down
Loading