Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Confluent's Golang client for Apache Kafka

## v2.12.0

This is a feature release:

confluent-kafka-go is based on librdkafka v2.12.0, see the
[librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

### Enhancements

* Add support for forwarding librdkafka log events to a Go channel in AdminClient (#1448)

## v2.11.1

This is a maintenance release:
Expand All @@ -8,7 +20,6 @@ confluent-kafka-go is based on librdkafka v2.11.1, see the
[librdkafka v2.11.1 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.1)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.11.0

This is a feature release:
Expand All @@ -17,7 +28,6 @@ confluent-kafka-go is based on librdkafka v2.11.0, see the
[librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v2.10.1

This is a maintenance release:
Expand Down
27 changes: 25 additions & 2 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type AdminClient struct {
handle *handle
isDerived bool // Derived from existing client handle
isClosed uint32 // to check if Admin Client is closed or not.
adminTermChan chan bool // For log channel termination
}

// IsClosed returns boolean representing if client is closed or not
Expand Down Expand Up @@ -2737,6 +2738,9 @@ func (a *AdminClient) Close() {
return
}

if a.adminTermChan != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a Logs() method onto the AdminClient, too, which returns a.handle.logs, to be used in case "go.logs.channel.enable": true, but "go.logs.channel": nil, or unset.

After this, if handle.closeLogsChan is true, then also close the logs channel here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @milindl for review
Addressed feedback in commit 7245731

close(a.adminTermChan)
}
a.handle.cleanup()

C.rd_kafka_destroy(a.handle.rk)
Expand Down Expand Up @@ -3724,9 +3728,19 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {

a := &AdminClient{}
a.handle = &handle{}
a.isClosed = 0

// before we do anything with the configuration, create a copy such that
// the original is not mutated.
confCopy := conf.clone()

logsChanEnable, logsChan, err := confCopy.extractLogConfig()
if err != nil {
return nil, err
}

// Convert ConfigMap to librdkafka conf_t
cConf, err := conf.convert()
cConf, err := confCopy.convert()
if err != nil {
return nil, err
}
Expand All @@ -3746,7 +3760,11 @@ func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
a.isDerived = false
a.handle.setup()

a.isClosed = 0
// Setup log channel if enabled
if logsChanEnable {
a.adminTermChan = make(chan bool)
a.handle.setupLogQueue(logsChan, a.adminTermChan)
}

return a, nil
}
Expand Down Expand Up @@ -3778,3 +3796,8 @@ func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
a.isClosed = 0
return a, nil
}

// Logs returns the log channel if enabled, or nil otherwise.
func (a *AdminClient) Logs() chan LogEvent {
return a.handle.logs
}
128 changes: 128 additions & 0 deletions kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,3 +1336,131 @@

a.Close()
}

func TestAdminClientLog(t *testing.T) {

Check failure on line 1340 in kafka/adminapi_test.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

kafka/adminapi_test.go#L1340

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this to reduce the Cognitive Complexity

Suggested change
func TestAdminClientLog(t *testing.T) {
func TestAdminClientLog(t *testing.T) {
logsChan := make(chan LogEvent, 100)
admin, err := NewAdminClient(&ConfigMap{
"debug": "all",
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
"go.logs.channel.enable": true,
"go.logs.channel": logsChan,
})
if err != nil {
t.Fatalf("Failed to create AdminClient: %v", err)
}
defer func() {
admin.Close()
close(logsChan)
}()
// Verify that Logs() method returns the correct channel
if admin.Logs() != logsChan {
t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan)
}
// Define expected logs to validate
expectedLogs := []struct {
tag string
message string
found bool
}{
{"INIT", "librdkafka", false},
}
go func() {
for log := range logsChan {
t.Log(log.String())
// Check each expected log
for i := range expectedLogs {
if !expectedLogs[i].found && log.Tag == expectedLogs[i].tag &&
strings.Contains(log.Message, expectedLogs[i].message) {
expectedLogs[i].found = true
}
}
}
}()
<-time.After(time.Second * 5)
// Validate all expected logs were found
for _, expected := range expectedLogs {
if !expected.found {
t.Errorf("Expected to find log with tag `%s' and message containing `%s', but didn't find any.",
expected.tag, expected.message)
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in commit 63a61a8

logsChan := make(chan LogEvent, 100)

admin, err := NewAdminClient(&ConfigMap{
"debug": "all",
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
"go.logs.channel.enable": true,
"go.logs.channel": logsChan,
})
if err != nil {
t.Fatalf("Failed to create AdminClient: %v", err)
}

defer func() {
admin.Close()
close(logsChan)
}()

// Verify that Logs() method returns the correct channel
if admin.Logs() != logsChan {
t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan)
}

expectedLogs := map[struct {
tag string
message string
}]bool{
{"INIT", "librdkafka"}: false,
}

go func() {
for {
select {
case log, ok := <-logsChan:
if !ok {
return
}

t.Log(log.String())

for expectedLog, found := range expectedLogs {
if found {
continue
}
if log.Tag != expectedLog.tag {
continue
}
if strings.Contains(log.Message, expectedLog.message) {
expectedLogs[expectedLog] = true
}
}
}
}
}()

<-time.After(time.Second * 5)

for expectedLog, found := range expectedLogs {
if !found {
t.Errorf(
"Expected to find log with tag `%s' and message containing `%s',"+
" but didn't find any.",
expectedLog.tag,
expectedLog.message)
}
}
}

func TestAdminClientLogWithoutChannel(t *testing.T) {

Check failure on line 1408 in kafka/adminapi_test.go

View check run for this annotation

SonarQube-Confluent / confluent-kafka-go Sonarqube Results

kafka/adminapi_test.go#L1408

Refactor this method to reduce its Cognitive Complexity from 18 to the 15 allowed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the other comment

Suggested change
func TestAdminClientLogWithoutChannel(t *testing.T) {
func TestAdminClientLogWithoutChannel(t *testing.T) {
admin, err := NewAdminClient(&ConfigMap{
"debug": "all",
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
"go.logs.channel.enable": true,
// Note: no "go.logs.channel" specified
})
if err != nil {
t.Fatalf("Failed to create AdminClient: %v", err)
}
defer admin.Close()
// Verify that Logs() method returns a channel when enabled but no channel provided
logsChan := admin.Logs()
if logsChan == nil {
t.Fatalf("Expected admin.Logs() to return a channel, got nil")
}
// Define expected logs to validate
expectedLogs := []struct {
tag string
message string
found bool
}{
{"INIT", "librdkafka", false},
}
go func() {
for log := range logsChan {
t.Log(log.String())
// Check each expected log
for i := range expectedLogs {
if !expectedLogs[i].found && log.Tag == expectedLogs[i].tag &&
strings.Contains(log.Message, expectedLogs[i].message) {
expectedLogs[i].found = true
}
}
}
}()
<-time.After(time.Second * 5)
// Validate all expected logs were found
for _, expected := range expectedLogs {
if !expected.found {
t.Errorf("Expected to find log with tag `%s' and message containing `%s', but didn't find any.",
expected.tag, expected.message)
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in commit 63a61a8

admin, err := NewAdminClient(&ConfigMap{
"debug": "all",
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs
"go.logs.channel.enable": true,
// Note: no "go.logs.channel" specified
})
if err != nil {
t.Fatalf("Failed to create AdminClient: %v", err)
}

defer admin.Close()

// Verify that Logs() method returns a channel when enabled but no channel provided
logsChan := admin.Logs()
if logsChan == nil {
t.Fatalf("Expected admin.Logs() to return a channel, got nil")
}

// Define expected logs to validate
expectedLogs := []struct {
tag string
message string
found bool
}{
{"INIT", "librdkafka", false},
}

go func() {
Copy link
Preview

Copilot AI Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding an explicit termination condition in the goroutine reading from logsChan to avoid potential goroutine leaks when no more log events are sent. For example, signal closure of logsChan when the test completes.

Copilot uses AI. Check for mistakes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in acdb44b

for {
select {
case log, ok := <-logsChan:
if !ok {
return
}

t.Log(log.String())

// Check each expected log
for i := range expectedLogs {
if !expectedLogs[i].found && log.Tag == expectedLogs[i].tag &&
strings.Contains(log.Message, expectedLogs[i].message) {
expectedLogs[i].found = true
}
}
}
}
}()

<-time.After(time.Second * 5)

// Validate all expected logs were found
for _, expected := range expectedLogs {
if !expected.found {
t.Errorf("Expected to find log with tag `%s' and message containing `%s', but didn't find any.",
expected.tag, expected.message)
}
}
}