Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ var (
errTopicNotFound = errors.New("requested topic was not found")
)

type topicNoCreateError string

func (e topicNoCreateError) Error() string {
return fmt.Sprintf("topic '%s' does not exist but the manager is configured with NoCreate, so it will not attempt to create it", string(e))
}

// this regex matches the package name + some hash info, if we're in gomod but not subpackages
// examples which match
// * github.com/lovoo/goka/processor.go
Expand Down
14 changes: 6 additions & 8 deletions topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManager
if err != nil {
return nil, fmt.Errorf("Error creating the kafka client: %v", err)
}
return newTopicManager(brokers, saramaConfig, topicManagerConfig, client, checkBroker)
return newTopicManager(saramaConfig, topicManagerConfig, client, checkBroker)
}

func newTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) {
func newTopicManager(saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig, client sarama.Client, check checkFunc) (*topicManager, error) {
if client == nil {
return nil, errors.New("cannot create topic manager with nil client")
}
Expand Down Expand Up @@ -172,17 +172,15 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[

partitions, err := m.Partitions(topic)

if err != nil {
if err != errTopicNotFound {
return fmt.Errorf("error checking topic: %v", err)
}
if err != nil && err != errTopicNotFound {
return fmt.Errorf("error checking topic: %v", err)
}
// no topic yet, let's create it
if len(partitions) == 0 {

// (or not)
if m.topicManagerConfig.NoCreate {
return fmt.Errorf("topic %s does not exist but the manager is configured with NoCreate, so it will not attempt to create it", topic)
return topicNoCreateError(topic)
}

return m.createTopic(topic,
Expand All @@ -195,7 +193,7 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[

// partitions do not match
if len(partitions) != npar {
return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic %s. Need %d, but existing topic has %d", topic, npar, len(partitions)))
return m.handleConfigMismatch(fmt.Sprintf("partition count mismatch for topic '%s'. Need %d, but existing topic has %d", topic, npar, len(partitions)))
}

// check additional config values via the cluster admin if our current version supports it
Expand Down
10 changes: 5 additions & 5 deletions topic_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestTM_newTopicManager(t *testing.T) {
bm.client.EXPECT().Brokers().Return([]*sarama.Broker{
new(sarama.Broker),
})
tm, err := newTopicManager(tmTestBrokers, DefaultConfig(), NewTopicManagerConfig(), bm.client, trueCheckFunc)
tm, err := newTopicManager(DefaultConfig(), NewTopicManagerConfig(), bm.client, trueCheckFunc)
require.NoError(t, err)
require.Equal(t, tm.client, bm.client)
require.NotNil(t, tm.admin)
Expand All @@ -117,10 +117,10 @@ func TestTM_newTopicManager(t *testing.T) {
defer ctrl.Finish()
bm := newBuilderMock(ctrl)

_, err := newTopicManager(tmTestBrokers, nil, nil, bm.client, trueCheckFunc)
_, err := newTopicManager(nil, nil, bm.client, trueCheckFunc)
require.Error(t, err)

_, err = newTopicManager(tmTestBrokers, nil, NewTopicManagerConfig(), nil, trueCheckFunc)
_, err = newTopicManager(nil, NewTopicManagerConfig(), nil, trueCheckFunc)
require.Error(t, err)
})
t.Run("fail_check", func(t *testing.T) {
Expand All @@ -133,7 +133,7 @@ func TestTM_newTopicManager(t *testing.T) {
new(sarama.Broker),
})

_, err := newTopicManager(tmTestBrokers, DefaultConfig(), NewTopicManagerConfig(), bm.client, falseCheckFunc)
_, err := newTopicManager(DefaultConfig(), NewTopicManagerConfig(), bm.client, falseCheckFunc)
require.Equal(t, err.Error(), "broker check error")
})
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestTM_EnsureStreamExists(t *testing.T) {
)

err := tm.EnsureStreamExists(topic, npar)
require.ErrorContains(t, err, "will not attempt to create it")
require.ErrorIs(t, err, topicNoCreateError(topic))
})
t.Run("fail", func(t *testing.T) {
tm, bm, ctrl := createTopicManager(t)
Expand Down