Skip to content

Commit 41ce8b8

Browse files
committed
Support deletion
1 parent aedd232 commit 41ce8b8

File tree

4 files changed

+46
-6
lines changed

4 files changed

+46
-6
lines changed

cmd/topicctl/subcmd/apply.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type applyCmdConfig struct {
3636
retentionDropStepDurationStr string
3737
skipConfirm bool
3838
ignoreFewerPartitionsError bool
39+
allowSettingsDeletion bool
3940
sleepLoopDuration time.Duration
4041
failFast bool
4142

@@ -107,6 +108,12 @@ func init() {
107108
false,
108109
"Don't return error when topic's config specifies fewer partitions than it currently has",
109110
)
111+
applyCmd.Flags().BoolVar(
112+
&applyConfig.allowSettingsDeletion,
113+
"allow-settings-deletion",
114+
false,
115+
"Deletes topic settings from the broker if the setting is set on the broker but not in config",
116+
)
110117
applyCmd.Flags().DurationVar(
111118
&applyConfig.sleepLoopDuration,
112119
"sleep-loop-duration",
@@ -259,6 +266,7 @@ func applyTopic(
259266
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
260267
SkipConfirm: applyConfig.skipConfirm,
261268
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
269+
AllowSettingsDeletion: applyConfig.allowSettingsDeletion,
262270
SleepLoopDuration: applyConfig.sleepLoopDuration,
263271
TopicConfig: topicConfig,
264272
}

cmd/topicctl/subcmd/rebalance.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func rebalanceApplyTopic(
307307
AutoContinueRebalance: true, // to continue without prompts
308308
RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance
309309
SkipConfirm: true, // to enforce action: rebalance
310+
AllowSettingsDeletion: false, // Irrelevant here
310311
SleepLoopDuration: rebalanceConfig.sleepLoopDuration,
311312
TopicConfig: topicConfig,
312313
}

pkg/apply/apply.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type TopicApplierConfig struct {
3737
RetentionDropStepDuration time.Duration
3838
SkipConfirm bool
3939
IgnoreFewerPartitionsError bool
40+
AllowSettingsDeletion bool
4041
SleepLoopDuration time.Duration
4142
TopicConfig config.TopicConfig
4243
}
@@ -392,6 +393,8 @@ func (t *TopicApplier) updateSettings(
392393
return err
393394
}
394395

396+
configEntries := []kafka.ConfigEntry{}
397+
395398
if len(diffKeys) > 0 {
396399
diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys)
397400
if err != nil {
@@ -416,6 +419,23 @@ func (t *TopicApplier) updateSettings(
416419
)
417420
}
418421

422+
configEntries, err = topicSettings.ToConfigEntries(diffKeys)
423+
if err != nil {
424+
return err
425+
}
426+
}
427+
428+
if len(missingKeys) > 0 && t.config.AllowSettingsDeletion {
429+
log.Infof(
430+
"Found %d key(s) set in cluster but missing from config for deletion:\n%s",
431+
len(missingKeys),
432+
FormatMissingKeys(topicInfo.Config, missingKeys),
433+
)
434+
435+
configEntries = append(configEntries, topicSettings.ToEmptyConfigEntries(missingKeys)...)
436+
}
437+
438+
if len(configEntries) > 0 {
419439
if t.config.DryRun {
420440
log.Infof("Skipping update because dryRun is set to true")
421441
return nil
@@ -430,11 +450,6 @@ func (t *TopicApplier) updateSettings(
430450
}
431451
log.Infof("OK, updating")
432452

433-
configEntries, err := topicSettings.ToConfigEntries(diffKeys)
434-
if err != nil {
435-
return err
436-
}
437-
438453
_, err = t.adminClient.UpdateTopicConfig(
439454
ctx,
440455
t.topicName,
@@ -446,7 +461,7 @@ func (t *TopicApplier) updateSettings(
446461
}
447462
}
448463

449-
if len(missingKeys) > 0 {
464+
if len(missingKeys) > 0 && !t.config.AllowSettingsDeletion {
450465
log.Warnf(
451466
"Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.",
452467
len(missingKeys),

pkg/config/settings.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,22 @@ func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, erro
346346
return entries, nil
347347
}
348348

349+
// Produces a slice of kafka-go config entries with empty value. Thus used
350+
// for deletion of the setting.
351+
func (t TopicSettings) ToEmptyConfigEntries(keys []string) []kafka.ConfigEntry {
352+
entries := []kafka.ConfigEntry{}
353+
354+
if keys != nil {
355+
for _, key := range keys {
356+
entries = append(
357+
entries,
358+
kafka.ConfigEntry{ConfigName: key, ConfigValue: ""},
359+
)
360+
}
361+
}
362+
return entries
363+
}
364+
349365
// HasKey returns whether the current settings instance contains the argument key.
350366
func (t TopicSettings) HasKey(key string) bool {
351367
_, ok := t[key]

0 commit comments

Comments
 (0)