diff --git a/errors.go b/errors.go index 6fa1dc75..39ad7455 100644 --- a/errors.go +++ b/errors.go @@ -17,6 +17,12 @@ var ( errTopicNotFound = errors.New("requested topic was not found") ) +type topicNoCreateError string + +func (e topicNoCreateError) Error() string { + return fmt.Sprintf("topic '%s' does not exist but the manager is configured with NoCreate, so it will not attempt to create it", string(e)) +} + // this regex matches the package name + some hash info, if we're in gomod but not subpackages // examples which match // * github.com/lovoo/goka/processor.go diff --git a/topic_manager.go b/topic_manager.go index a104d38f..202c989c 100644 --- a/topic_manager.go +++ b/topic_manager.go @@ -46,10 +46,10 @@ func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManager if err != nil { return nil, fmt.Errorf("Error creating the kafka client: %v", err) } - return newTopicManager(brokers, saramaConfig, topicManagerConfig, client, checkBroker) + return newTopicManager(saramaConfig, topicManagerConfig, client, checkBroker) } -func newTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) { +func newTopicManager(saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) { if client == nil { return nil, errors.New("cannot create topic manager with nil client") } @@ -172,17 +172,15 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[ partitions, err := m.Partitions(topic) - if err != nil { - if err != errTopicNotFound { - return fmt.Errorf("error checking topic: %v", err) - } + if err != nil && err != errTopicNotFound { + return fmt.Errorf("error checking topic: %v", err) } // no topic yet, let's create it if len(partitions) == 0 { // (or not) if m.topicManagerConfig.NoCreate { - return fmt.Errorf("topic %s does not exist but the manager is configured with NoCreate, so it will not attempt to create it", topic) + return topicNoCreateError(topic) } return m.createTopic(topic, @@ -195,7 +193,7 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[ // partitions do not match if len(partitions) != npar { - return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic %s. Need %d, but existing topic has %d", topic, npar, len(partitions))) + return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic '%s'. Need %d, but existing topic has %d", topic, npar, len(partitions))) } // check additional config values via the cluster admin if our current version supports it diff --git a/topic_manager_test.go b/topic_manager_test.go index 468307d8..488f0c0b 100644 --- a/topic_manager_test.go +++ b/topic_manager_test.go @@ -107,7 +107,7 @@ func TestTM_newTopicManager(t *testing.T) { bm.client.EXPECT().Brokers().Return([]*sarama.Broker{ new(sarama.Broker), }) - tm, err := newTopicManager(tmTestBrokers, DefaultConfig(), NewTopicManagerConfig(), bm.client, trueCheckFunc) + tm, err := newTopicManager(DefaultConfig(), NewTopicManagerConfig(), bm.client, trueCheckFunc) require.NoError(t, err) require.Equal(t, tm.client, bm.client) require.NotNil(t, tm.admin) @@ -117,10 +117,10 @@ func TestTM_newTopicManager(t *testing.T) { defer ctrl.Finish() bm := newBuilderMock(ctrl) - _, err := newTopicManager(tmTestBrokers, nil, nil, bm.client, trueCheckFunc) + _, err := newTopicManager(nil, nil, bm.client, trueCheckFunc) require.Error(t, err) - _, err = newTopicManager(tmTestBrokers, nil, NewTopicManagerConfig(), nil, trueCheckFunc) + _, err = newTopicManager(nil, NewTopicManagerConfig(), nil, trueCheckFunc) require.Error(t, err) }) t.Run("fail_check", func(t *testing.T) { @@ -133,7 +133,7 @@ func TestTM_newTopicManager(t *testing.T) { new(sarama.Broker), }) - _, err := newTopicManager(tmTestBrokers, DefaultConfig(), NewTopicManagerConfig(), bm.client, falseCheckFunc) + _, err := newTopicManager(DefaultConfig(), NewTopicManagerConfig(), bm.client, falseCheckFunc) require.Equal(t, err.Error(), "broker check error") }) } @@ -269,7 +269,7 @@ func TestTM_EnsureStreamExists(t *testing.T) { ) err := tm.EnsureStreamExists(topic, npar) - require.ErrorContains(t, err, "will not attempt to create it") + require.ErrorIs(t, err, topicNoCreateError(topic)) }) t.Run("fail", func(t *testing.T) { tm, bm, ctrl := createTopicManager(t)