Skip to content

Commit 136a7fc

Browse files
authored
Merge pull request #458 from lovoo/disable-topic-creation
Add Option to disable topic creation alltogether
2 parents 19b594d + ac2a46d commit 136a7fc

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

topic_manager.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[
179179
}
180180
// no topic yet, let's create it
181181
if len(partitions) == 0 {
182+
183+
// (or not)
184+
if m.topicManagerConfig.NoCreate {
185+
return fmt.Errorf("topic does not exist but the manager is configured with NoCreate, so it will not attempt to create it")
186+
}
187+
182188
return m.createTopic(topic,
183189
npar,
184190
rfactor,
@@ -361,6 +367,10 @@ type TopicManagerConfig struct {
361367
// TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be
362368
// treated
363369
MismatchBehavior TMConfigMismatchBehavior
370+
371+
// If set to true, the topic manager will not attempt to create the topic.
372+
// This can be used if topic creation should be done externally.
373+
NoCreate bool
364374
}
365375

366376
func (tmc *TopicManagerConfig) streamCleanupPolicy() string {

topic_manager_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,28 @@ func TestTM_EnsureStreamExists(t *testing.T) {
249249
err := tm.EnsureStreamExists(topic, npar)
250250
require.NoError(t, err)
251251
})
252+
t.Run("no-create", func(t *testing.T) {
253+
tm, bm, ctrl := createTopicManager(t)
254+
defer ctrl.Finish()
255+
var (
256+
topic = "some-topic"
257+
npar = 1
258+
rfactor = 1
259+
)
260+
261+
tm.topicManagerConfig.Stream.Replication = rfactor
262+
tm.topicManagerConfig.Stream.Retention = time.Second
263+
tm.topicManagerConfig.NoCreate = true
264+
265+
bm.client.EXPECT().RefreshMetadata().Return(nil).AnyTimes()
266+
267+
gomock.InOrder(
268+
bm.client.EXPECT().Topics().Return(nil, nil),
269+
)
270+
271+
err := tm.EnsureStreamExists(topic, npar)
272+
require.ErrorContains(t, err, "will not attempt to create it")
273+
})
252274
t.Run("fail", func(t *testing.T) {
253275
tm, bm, ctrl := createTopicManager(t)
254276
defer ctrl.Finish()

0 commit comments

Comments
 (0)