Skip to content

Commit 25a6f37

Browse files
apply: Allow ignoring a specific error returned from updatePartitions() (#184)
When applying topic's configuration, we'd like to be able avoid failing in case the desired topic's partition count is smaller than the actual topic's partitions count (on the broker). We know that Kafka doesn't allow partitions decrease, so instead of failing the operation we'd like to continue without doing anything. This is very convenient when trying to apply a batch of topics in which case, we don't want to fail the rest of the batch for one topic that its partition count configuration is lower than the actual partitions count. This change is backward compatible, as the default behavior is kept. Signed-off-by: shimon-armis <[email protected]>
1 parent b83ceba commit 25a6f37

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

cmd/topicctl/subcmd/apply.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type applyCmdConfig struct {
3535
autoContinueRebalance bool
3636
retentionDropStepDurationStr string
3737
skipConfirm bool
38+
ignoreFewerPartitionsError bool
3839
sleepLoopDuration time.Duration
3940

4041
shared sharedOptions
@@ -99,6 +100,12 @@ func init() {
99100
false,
100101
"Skip confirmation prompts during apply process",
101102
)
103+
applyCmd.Flags().BoolVar(
104+
&applyConfig.ignoreFewerPartitionsError,
105+
"ignore-fewer-partitions-error",
106+
false,
107+
"Don't return error when topic's config specifies fewer partitions than it currently has",
108+
)
102109
applyCmd.Flags().DurationVar(
103110
&applyConfig.sleepLoopDuration,
104111
"sleep-loop-duration",
@@ -231,6 +238,7 @@ func applyTopic(
231238
AutoContinueRebalance: applyConfig.autoContinueRebalance,
232239
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
233240
SkipConfirm: applyConfig.skipConfirm,
241+
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
234242
SleepLoopDuration: applyConfig.sleepLoopDuration,
235243
TopicConfig: topicConfig,
236244
}

pkg/apply/apply.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
log "github.com/sirupsen/logrus"
2424
)
2525

26+
var ErrFewerPartitions = errors.New("fewer partitions in topic config")
27+
2628
// TopicApplierConfig contains the configuration for a TopicApplier struct.
2729
type TopicApplierConfig struct {
2830
BrokerThrottleMBsOverride int
@@ -34,6 +36,7 @@ type TopicApplierConfig struct {
3436
AutoContinueRebalance bool
3537
RetentionDropStepDuration time.Duration
3638
SkipConfirm bool
39+
IgnoreFewerPartitionsError bool
3740
SleepLoopDuration time.Duration
3841
TopicConfig config.TopicConfig
3942
}
@@ -213,6 +216,11 @@ func (t *TopicApplier) applyExistingTopic(
213216
}
214217

215218
if err := t.updatePartitions(ctx, topicInfo); err != nil {
219+
if errors.Is(err, ErrFewerPartitions) && t.config.IgnoreFewerPartitionsError {
220+
log.Warnf("UpdatePartitions failure ignored. topic: %v, error: %v", t.topicName, err)
221+
return nil
222+
}
223+
216224
return err
217225
}
218226

@@ -477,7 +485,8 @@ func (t *TopicApplier) updatePartitions(
477485

478486
if currPartitions > t.topicConfig.Spec.Partitions {
479487
return fmt.Errorf(
480-
"Fewer partitions in topic config (%d) than observed (%d); this cannot be resolved by topicctl",
488+
"%w (%d) than observed (%d); this cannot be resolved by topicctl",
489+
ErrFewerPartitions,
481490
t.topicConfig.Spec.Partitions,
482491
currPartitions,
483492
)

0 commit comments

Comments
 (0)