Skip to content

Commit f14ef91

Browse files
authored
fix: issue-238 (#315)
* fix: issue-238 Signed-off-by: delu <[email protected]> * fix: issue-238 Signed-off-by: delu <[email protected]> Signed-off-by: delu <[email protected]>
1 parent ba27ab9 commit f14ef91

File tree

12 files changed

+143
-154
lines changed

12 files changed

+143
-154
lines changed

internal/controller/trigger/controller_test.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -296,22 +296,6 @@ func TestController_UpdateSubscription(t *testing.T) {
296296
So(resp.SinkCredential.GetAws().SecretAccessKey, ShouldEqual, primitive.SecretsMask)
297297
})
298298
})
299-
Convey("update rate limit", func() {
300-
subManager.EXPECT().UpdateSubscription(gomock.Any(), gomock.Any()).Return(nil)
301-
request := &ctrlpb.UpdateSubscriptionRequest{
302-
Id: subID.Uint64(),
303-
Subscription: &ctrlpb.SubscriptionRequest{
304-
EventBus: "test-eb",
305-
Sink: "test-sink",
306-
Config: &metapb.SubscriptionConfig{
307-
RateLimit: -1,
308-
},
309-
},
310-
}
311-
resp, err := ctrl.UpdateSubscription(ctx, request)
312-
So(err, ShouldBeNil)
313-
So(resp.Config.RateLimit, ShouldEqual, request.Subscription.Config.RateLimit)
314-
})
315299
Convey("update subscription sink", func() {
316300
subManager.EXPECT().UpdateSubscription(gomock.Any(), gomock.Any()).Return(nil)
317301
request := &ctrlpb.UpdateSubscriptionRequest{

internal/controller/trigger/validation/subscripton.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,6 @@ func validateSubscriptionConfig(ctx context.Context, cfg *metapb.SubscriptionCon
138138
if cfg == nil {
139139
return nil
140140
}
141-
if cfg.RateLimit < -1 {
142-
return errors.ErrInvalidRequest.WithMessage("rate limit is -1 or gt than 0")
143-
}
144141
switch cfg.OffsetType {
145142
case metapb.SubscriptionConfig_LATEST, metapb.SubscriptionConfig_EARLIEST:
146143
case metapb.SubscriptionConfig_TIMESTAMP:
@@ -153,7 +150,7 @@ func validateSubscriptionConfig(ctx context.Context, cfg *metapb.SubscriptionCon
153150
if cfg.DeadLetterEventbus != "" {
154151
return errors.ErrInvalidRequest.WithMessage("no support to set dead letter eventbus")
155152
}
156-
if cfg.MaxRetryAttempts > primitive.MaxRetryAttempts {
153+
if cfg.GetMaxRetryAttempts() > primitive.MaxRetryAttempts {
157154
return errors.ErrInvalidRequest.WithMessage(
158155
fmt.Sprintf("could not set max retry attempts greater than %d", primitive.MaxRetryAttempts))
159156
}

internal/controller/trigger/validation/suscription_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,16 @@ func TestSubscriptionRequestValidator(t *testing.T) {
5353
func TestValidateSubscriptionConfig(t *testing.T) {
5454
ctx := context.Background()
5555
Convey("test validate subscription config", t, func() {
56-
Convey("test rate limit", func() {
57-
config := &metapb.SubscriptionConfig{
58-
RateLimit: -2,
59-
}
60-
So(validateSubscriptionConfig(ctx, config), ShouldNotBeNil)
61-
})
6256
Convey("test offset timestamp", func() {
6357
config := &metapb.SubscriptionConfig{
6458
OffsetType: metapb.SubscriptionConfig_TIMESTAMP,
6559
}
6660
So(validateSubscriptionConfig(ctx, config), ShouldNotBeNil)
6761
})
6862
Convey("test max retry attempts", func() {
63+
attempt := uint32(10000)
6964
config := &metapb.SubscriptionConfig{
70-
MaxRetryAttempts: 10000,
65+
MaxRetryAttempts: &attempt,
7166
}
7267
So(validateSubscriptionConfig(ctx, config), ShouldNotBeNil)
7368
})

internal/primitive/subscription.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,23 @@ const (
6464
)
6565

6666
type SubscriptionConfig struct {
67-
RateLimit int32 `json:"rate_limit,omitempty"`
67+
RateLimit uint32 `json:"rate_limit,omitempty"`
6868
// consumer from
6969
OffsetType OffsetType `json:"offset_type,omitempty"`
7070
OffsetTimestamp *uint64 `json:"offset_timestamp,omitempty"`
71-
DeliveryTimeout int32 `json:"delivery_timeout,omitempty"`
72-
MaxRetryAttempts int32 `json:"max_retry_attempts,omitempty"`
71+
DeliveryTimeout uint32 `json:"delivery_timeout,omitempty"`
72+
MaxRetryAttempts *uint32 `json:"max_retry_attempts,omitempty"`
7373
DeadLetterEventbus string `json:"dead_letter_eventbus,omitempty"`
7474
}
7575

76+
// GetMaxRetryAttempts return MaxRetryAttempts if nil return -1.
77+
func (c *SubscriptionConfig) GetMaxRetryAttempts() int32 {
78+
if c != nil && c.MaxRetryAttempts != nil {
79+
return int32(*c.MaxRetryAttempts)
80+
}
81+
return -1
82+
}
83+
7684
func (c *SubscriptionConfig) String() string {
7785
if c == nil {
7886
return ""

internal/trigger/trigger/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type Config struct {
3333
BufferSize int
3434
MaxRetryAttempts int32
3535
DeliveryTimeout time.Duration
36-
RateLimit int32
36+
RateLimit uint32
3737
Controllers []string
3838
DeadLetterEventbus string
3939
}
@@ -71,27 +71,27 @@ func WithBufferSize(size int) Option {
7171

7272
func WithMaxRetryAttempts(attempts int32) Option {
7373
return func(t *trigger) {
74-
if attempts <= 0 {
74+
if attempts < 0 {
7575
attempts = primitive.MaxRetryAttempts
7676
}
7777
t.config.MaxRetryAttempts = attempts
7878
}
7979
}
8080

81-
func WithDeliveryTimeout(timeout int32) Option {
81+
func WithDeliveryTimeout(timeout uint32) Option {
8282
return func(t *trigger) {
83-
if timeout <= 0 {
83+
if timeout == 0 {
8484
t.config.DeliveryTimeout = defaultDeliveryTimeout
8585
return
8686
}
8787
t.config.DeliveryTimeout = time.Duration(timeout) * time.Millisecond
8888
}
8989
}
9090

91-
func WithRateLimit(rateLimit int32) Option {
91+
func WithRateLimit(rateLimit uint32) Option {
9292
return func(t *trigger) {
9393
t.config.RateLimit = rateLimit
94-
if rateLimit <= 0 {
94+
if rateLimit == 0 {
9595
t.rateLimiter = ratelimit.NewUnlimited()
9696
return
9797
}

internal/trigger/trigger/trigger.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ func (t *trigger) changeConfig(config primitive.SubscriptionConfig) {
172172
if config.DeliveryTimeout != t.subscription.Config.DeliveryTimeout {
173173
t.applyOptions(WithDeliveryTimeout(config.DeliveryTimeout))
174174
}
175-
if config.MaxRetryAttempts != t.subscription.Config.MaxRetryAttempts {
176-
t.applyOptions(WithMaxRetryAttempts(config.MaxRetryAttempts))
175+
if config.GetMaxRetryAttempts() != t.subscription.Config.GetMaxRetryAttempts() {
176+
t.applyOptions(WithMaxRetryAttempts(config.GetMaxRetryAttempts()))
177177
}
178178
t.subscription.Config = config
179179
}

internal/trigger/trigger/trigger_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ func TestTrigger_Options(t *testing.T) {
4949
WithDeliveryTimeout(0)(tg)
5050
So(tg.config.DeliveryTimeout, ShouldEqual, defaultDeliveryTimeout)
5151
size = rand.Intn(1000) + size
52-
WithDeliveryTimeout(int32(size))(tg)
52+
WithDeliveryTimeout(uint32(size))(tg)
5353
So(tg.config.DeliveryTimeout, ShouldEqual, time.Duration(size)*time.Millisecond)
54-
WithMaxRetryAttempts(0)(tg)
54+
WithMaxRetryAttempts(-1)(tg)
5555
So(tg.config.MaxRetryAttempts, ShouldEqual, primitive.MaxRetryAttempts)
5656
size = rand.Intn(1000) + size
5757
WithMaxRetryAttempts(int32(size))(tg)
@@ -64,7 +64,7 @@ func TestTrigger_Options(t *testing.T) {
6464
WithRateLimit(0)(tg)
6565
So(tg.config.RateLimit, ShouldEqual, 0)
6666
size = rand.Intn(1000) + size
67-
WithRateLimit(int32(size))(tg)
67+
WithRateLimit(uint32(size))(tg)
6868
So(tg.config.RateLimit, ShouldEqual, size)
6969
WithDeadLetterEventbus("")(tg)
7070
So(tg.config.DeadLetterEventbus, ShouldEqual, primitive.DeadLetterEventbusName)
@@ -220,7 +220,7 @@ func TestTriggerRateLimit(t *testing.T) {
220220
tg := NewTrigger(makeSubscription(id), WithControllers([]string{"test"})).(*trigger)
221221
tg.eventCli = cli
222222
cli.EXPECT().Send(gomock.Any(), gomock.Any()).AnyTimes().Return(client.Success)
223-
rateLimit := int32(10000)
223+
rateLimit := uint32(10000)
224224
Convey("test no rate limit", func() {
225225
c := testSendEvent(tg)
226226
So(c, ShouldBeGreaterThan, rateLimit)

internal/trigger/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (w *worker) getTriggerOptions(subscription *primitive.Subscription) []trigg
290290
config := subscription.Config
291291
opts = append(opts, trigger.WithRateLimit(config.RateLimit),
292292
trigger.WithDeliveryTimeout(config.DeliveryTimeout),
293-
trigger.WithMaxRetryAttempts(config.MaxRetryAttempts),
293+
trigger.WithMaxRetryAttempts(config.GetMaxRetryAttempts()),
294294
trigger.WithDeadLetterEventbus(config.DeadLetterEventbus))
295295
return opts
296296
}

0 commit comments

Comments
 (0)