From 8c4d082d99219fd5e3c4e8b9e92569c1f5502163 Mon Sep 17 00:00:00 2001 From: Mohammad Mohsin Reza <19354572+mmreza79@users.noreply.github.com> Date: Fri, 25 Apr 2025 12:48:28 +0200 Subject: [PATCH 1/3] fix displaying missing topic name --- topic_manager.go | 2 +- topic_manager_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/topic_manager.go b/topic_manager.go index a104d38f..27526972 100644 --- a/topic_manager.go +++ b/topic_manager.go @@ -182,7 +182,7 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[ // (or not) if m.topicManagerConfig.NoCreate { - return fmt.Errorf("topic %s does not exist but the manager is configured with NoCreate, so it will not attempt to create it", topic) + return fmt.Errorf("topic `%s` does not exist but the manager is configured with NoCreate, so it will not attempt to create it", topic) } return m.createTopic(topic, diff --git a/topic_manager_test.go b/topic_manager_test.go index 468307d8..7d078720 100644 --- a/topic_manager_test.go +++ b/topic_manager_test.go @@ -269,7 +269,7 @@ func TestTM_EnsureStreamExists(t *testing.T) { ) err := tm.EnsureStreamExists(topic, npar) - require.ErrorContains(t, err, "will not attempt to create it") + require.Equal(t, err.Error(), "topic `some-topic` does not exist but the manager is configured with NoCreate, so it will not attempt to create it") }) t.Run("fail", func(t *testing.T) { tm, bm, ctrl := createTopicManager(t) From 46299d769668d702215f20520b40d4b67cd5f1b2 Mon Sep 17 00:00:00 2001 From: Mohammad Mohsin Reza <19354572+mmreza79@users.noreply.github.com> Date: Fri, 25 Apr 2025 12:54:46 +0200 Subject: [PATCH 2/3] remove unused param --- topic_manager.go | 4 ++-- topic_manager_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/topic_manager.go b/topic_manager.go index 27526972..4034d42c 100644 --- a/topic_manager.go +++ b/topic_manager.go @@ -46,10 +46,10 @@ func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManager if err != nil { return nil, fmt.Errorf("Error creating the kafka client: %v", err) } - return newTopicManager(brokers, saramaConfig, topicManagerConfig, client, checkBroker) + return newTopicManager(saramaConfig, topicManagerConfig, client, checkBroker) } -func newTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) { +func newTopicManager(saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) { if client == nil { return nil, errors.New("cannot create topic manager with nil client") } diff --git a/topic_manager_test.go b/topic_manager_test.go index 7d078720..4bcea56d 100644 --- a/topic_manager_test.go +++ b/topic_manager_test.go @@ -107,7 +107,7 @@ func TestTM_newTopicManager(t *testing.T) { bm.client.EXPECT().Brokers().Return([]*sarama.Broker{ new(sarama.Broker), }) - tm, err := newTopicManager(tmTestBrokers, DefaultConfig(), NewTopicManagerConfig(), bm.client, trueCheckFunc) + tm, err := newTopicManager(DefaultConfig(), NewTopicManagerConfig(), bm.client, trueCheckFunc) require.NoError(t, err) require.Equal(t, tm.client, bm.client) require.NotNil(t, tm.admin) @@ -117,10 +117,10 @@ func TestTM_newTopicManager(t *testing.T) { defer ctrl.Finish() bm := newBuilderMock(ctrl) - _, err := newTopicManager(tmTestBrokers, nil, nil, bm.client, trueCheckFunc) + _, err := newTopicManager(nil, nil, bm.client, trueCheckFunc) require.Error(t, err) - _, err = newTopicManager(tmTestBrokers, nil, NewTopicManagerConfig(), nil, trueCheckFunc) + _, err = newTopicManager(nil, NewTopicManagerConfig(), nil, trueCheckFunc) require.Error(t, err) }) t.Run("fail_check", func(t *testing.T) { @@ -133,7 +133,7 @@ func TestTM_newTopicManager(t *testing.T) { new(sarama.Broker), }) - _, err := newTopicManager(tmTestBrokers, DefaultConfig(), NewTopicManagerConfig(), bm.client, falseCheckFunc) + _, err := newTopicManager(DefaultConfig(), NewTopicManagerConfig(), bm.client, falseCheckFunc) require.Equal(t, err.Error(), "broker check error") }) } From 48f770c962d2f223dc20c726fb960c4ee29822cf Mon Sep 17 00:00:00 2001 From: Mohammad Mohsin Reza <19354572+mmreza79@users.noreply.github.com> Date: Fri, 25 Apr 2025 17:22:11 +0200 Subject: [PATCH 3/3] organize error --- errors.go | 6 ++++++ topic_manager.go | 10 ++++------ topic_manager_test.go | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/errors.go b/errors.go index 6fa1dc75..39ad7455 100644 --- a/errors.go +++ b/errors.go @@ -17,6 +17,12 @@ var ( errTopicNotFound = errors.New("requested topic was not found") ) +type topicNoCreateError string + +func (e topicNoCreateError) Error() string { + return fmt.Sprintf("topic '%s' does not exist but the manager is configured with NoCreate, so it will not attempt to create it", string(e)) +} + // this regex matches the package name + some hash info, if we're in gomod but not subpackages // examples which match // * github.com/lovoo/goka/processor.go diff --git a/topic_manager.go b/topic_manager.go index 4034d42c..202c989c 100644 --- a/topic_manager.go +++ b/topic_manager.go @@ -172,17 +172,15 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[ partitions, err := m.Partitions(topic) - if err != nil { - if err != errTopicNotFound { - return fmt.Errorf("error checking topic: %v", err) - } + if err != nil && err != errTopicNotFound { + return fmt.Errorf("error checking topic: %v", err) } // no topic yet, let's create it if len(partitions) == 0 { // (or not) if m.topicManagerConfig.NoCreate { - return fmt.Errorf("topic `%s` does not exist but the manager is configured with NoCreate, so it will not attempt to create it", topic) + return topicNoCreateError(topic) } return m.createTopic(topic, @@ -195,7 +193,7 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[ // partitions do not match if len(partitions) != npar { - return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic %s. Need %d, but existing topic has %d", topic, npar, len(partitions))) + return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic '%s'. Need %d, but existing topic has %d", topic, npar, len(partitions))) } // check additional config values via the cluster admin if our current version supports it diff --git a/topic_manager_test.go b/topic_manager_test.go index 4bcea56d..488f0c0b 100644 --- a/topic_manager_test.go +++ b/topic_manager_test.go @@ -269,7 +269,7 @@ func TestTM_EnsureStreamExists(t *testing.T) { ) err := tm.EnsureStreamExists(topic, npar) - require.Equal(t, err.Error(), "topic `some-topic` does not exist but the manager is configured with NoCreate, so it will not attempt to create it") + require.ErrorIs(t, err, topicNoCreateError(topic)) }) t.Run("fail", func(t *testing.T) { tm, bm, ctrl := createTopicManager(t)