Skip to content

Commit eebe960

Browse files
authored
Release 1.5 (#839)
* fixed source delivery and adapter sendwithretries retryConfig + fixed source perf test related tests and added retry to one sample * added retry test to source receive adapter
1 parent 299234f commit eebe960

File tree

4 files changed

+59
-46
lines changed

4 files changed

+59
-46
lines changed

pkg/adapter/adapter.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ type Adapter struct {
8989

9090
var _ adapter.MessageAdapter = (*Adapter)(nil)
9191
var _ adapter.MessageAdapterConstructor = NewAdapter
92+
var (
93+
retryConfig kncloudevents.RetryConfig = kncloudevents.NoRetries()
94+
retriesInt32 int32 = 0
95+
)
9296

9397
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, httpMessageSender *kncloudevents.HTTPMessageSender, reporter source.StatsReporter) adapter.MessageAdapter {
9498
logger := logging.FromContext(ctx).Desugar()
@@ -220,9 +224,20 @@ func (a *Adapter) ConsumeMessages(channel *wabbit.Channel,
220224
func (a *Adapter) PollForMessages(channel *wabbit.Channel,
221225
queue *wabbit.Queue, stopCh <-chan struct{}) error {
222226
logger := a.logger
227+
var err error
228+
retriesInt32 = int32(a.config.Retry)
229+
backoffPolicy := (v1.BackoffPolicyType)(a.config.BackoffPolicy)
230+
backoffDelay := a.config.BackoffDelay.String()
231+
retryConfig, err = kncloudevents.RetryConfigFromDeliverySpec(v1.DeliverySpec{
232+
BackoffPolicy: &backoffPolicy,
233+
BackoffDelay: &backoffDelay,
234+
Retry: &retriesInt32,
235+
})
236+
if err != nil {
237+
a.logger.Error("error retrieving retryConfig from deliverySpec", zap.Error(err))
238+
}
223239

224240
msgs, _ := a.ConsumeMessages(channel, queue, logger)
225-
226241
wg := &sync.WaitGroup{}
227242
workerCount := a.config.ChannelConfig.Parallelism
228243
wg.Add(workerCount)
@@ -291,17 +306,7 @@ func (a *Adapter) postMessage(msg wabbit.Delivery) error {
291306
return err
292307
}
293308

294-
backoffDelay := a.config.BackoffDelay.String()
295-
backoffPolicy := (v1.BackoffPolicyType)(a.config.BackoffPolicy)
296-
res, err := a.httpMessageSender.SendWithRetries(req, &kncloudevents.RetryConfig{
297-
RetryMax: a.config.Retry,
298-
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
299-
return kncloudevents.SelectiveRetry(ctx, resp, nil)
300-
},
301-
BackoffDelay: &backoffDelay,
302-
BackoffPolicy: &backoffPolicy,
303-
})
304-
309+
res, err := a.httpMessageSender.SendWithRetries(req, &retryConfig)
305310
if err != nil {
306311
a.logger.Error("error while sending the message", zap.Error(err))
307312
return err

pkg/adapter/adapter_test.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http/httptest"
2626
"reflect"
2727
"strings"
28+
"sync"
2829
"testing"
2930
"time"
3031

@@ -36,34 +37,38 @@ import (
3637
"go.uber.org/zap"
3738

3839
"knative.dev/eventing/pkg/adapter/v2"
40+
v1 "knative.dev/eventing/pkg/apis/duck/v1"
3941
"knative.dev/eventing/pkg/kncloudevents"
4042
"knative.dev/eventing/pkg/metrics/source"
4143
"knative.dev/pkg/logging"
4244
)
4345

46+
type handlerFunc func(http.ResponseWriter, *http.Request)
47+
4448
func TestPostMessage_ServeHTTP(t *testing.T) {
4549
testCases := map[string]struct {
46-
sink func(http.ResponseWriter, *http.Request)
50+
handlers []handlerFunc
4751
reqBody, expectedEventType string
4852
reqHeaders http.Header
4953
data map[string]interface{}
5054
headers wabbit.Option
5155
attributes map[string]string
5256
withMsgId, isCe, error bool
57+
retry int
5358
}{
5459
"accepted": {
55-
sink: sinkAccepted,
56-
reqBody: `{"key":"value"}`,
57-
data: map[string]interface{}{"key": "value"},
60+
handlers: []handlerFunc{sinkAccepted},
61+
reqBody: `{"key":"value"}`,
62+
data: map[string]interface{}{"key": "value"},
5863
},
5964
"accepted with msg id": {
60-
sink: sinkAccepted,
65+
handlers: []handlerFunc{sinkAccepted},
6166
reqBody: `{"key":"value"}`,
6267
withMsgId: true,
6368
data: map[string]interface{}{"key": "value"},
6469
},
6570
"accepted with binary cloudevent": {
66-
sink: sinkAccepted,
71+
handlers: []handlerFunc{sinkAccepted},
6772
reqBody: `{"test":"test"}`,
6873
withMsgId: true,
6974
reqHeaders: http.Header{
@@ -82,7 +87,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
8287
isCe: true,
8388
},
8489
"accepted with structured cloudevent": {
85-
sink: sinkAccepted,
90+
handlers: []handlerFunc{sinkAccepted},
8691
reqBody: `{"specversion":"1.0","id":1234,` +
8792
`"type":"dev.knative.rabbitmq.event","source":"example/source.uri",` +
8893
`"content-type":"text/plain","data":"test"}`,
@@ -99,17 +104,24 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
99104
isCe: true,
100105
},
101106
"rejected": {
102-
sink: sinkRejected,
103-
reqBody: `{"key":"value"}`,
104-
error: true,
105-
data: map[string]interface{}{"key": "value"},
107+
handlers: []handlerFunc{sinkRejected},
108+
reqBody: `{"key":"value"}`,
109+
error: true,
110+
data: map[string]interface{}{"key": "value"},
111+
},
112+
"retried 3 times succesfull on the 4th ": {
113+
retry: 5,
114+
handlers: []handlerFunc{sinkRejected, sinkRejected, sinkRejected, sinkAccepted},
115+
reqBody: `{"key":"value"}`,
116+
error: false,
117+
data: map[string]interface{}{"key": "value"},
106118
},
107119
}
108120

109121
for n, tc := range testCases {
110122
t.Run(n, func(t *testing.T) {
111123
h := &fakeHandler{
112-
handler: tc.sink,
124+
handlers: tc.handlers,
113125
}
114126
sinkServer := httptest.NewServer(h)
115127
defer sinkServer.Close()
@@ -120,21 +132,12 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
120132
}
121133

122134
statsReporter, _ := source.NewStatsReporter()
123-
135+
config := adapterConfig{}
136+
if tc.retry > 0 {
137+
config = adapterConfig{Retry: tc.retry, BackoffPolicy: string(v1.BackoffPolicyLinear), BackoffDelay: time.Duration(10000)}
138+
}
124139
a := &Adapter{
125-
config: &adapterConfig{
126-
Broker: "amqp://guest:guest@localhost:5672/",
127-
ExchangeConfig: ExchangeConfig{
128-
Type: "topic",
129-
Durable: true,
130-
AutoDelete: false,
131-
},
132-
QueueConfig: QueueConfig{
133-
Name: "",
134-
Durable: false,
135-
AutoDelete: false,
136-
},
137-
},
140+
config: &config,
138141
context: context.TODO(),
139142
httpMessageSender: s,
140143
logger: zap.NewNop(),
@@ -337,11 +340,15 @@ func TestAdapter_StartAmqpClient(t *testing.T) {
337340
type fakeHandler struct {
338341
body []byte
339342
header http.Header
343+
mu sync.Mutex
340344

341-
handler func(http.ResponseWriter, *http.Request)
345+
receiveCount int
346+
handlers []handlerFunc
342347
}
343348

344349
func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
350+
h.mu.Lock()
351+
defer h.mu.Unlock()
345352
h.header = r.Header
346353
body, err := ioutil.ReadAll(r.Body)
347354
if err != nil {
@@ -351,7 +358,8 @@ func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
351358
h.body = body
352359

353360
defer r.Body.Close()
354-
h.handler(w, r)
361+
h.receiveCount++
362+
h.handlers[h.receiveCount](w, r)
355363
}
356364

357365
func sinkAccepted(writer http.ResponseWriter, req *http.Request) {
@@ -502,7 +510,7 @@ func TestAdapter_NewAdapter(t *testing.T) {
502510
ctx := context.TODO()
503511
env := NewEnvConfig()
504512
h := &fakeHandler{
505-
handler: sinkAccepted,
513+
handlers: []handlerFunc{sinkAccepted},
506514
}
507515

508516
sinkServer := httptest.NewServer(h)

samples/source/external-cluster/200-perf-test.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,12 @@ spec:
5555
value: "true"
5656
- name: SIZE
5757
value: "1000"
58-
5958
- name: EXCHANGE
6059
value: "eventing-rabbitmq-source"
6160
- name: TYPE
62-
value: "fanout"
61+
value: "headers"
6362
# This version of PerfTest cannot declare a durable exchange: https://github.com/rabbitmq/rabbitmq-perf-test/issues/281
64-
# This version of PerfTest will create an exchange if it does not exist if if PREDECLARED is set to true: https://github.com/rabbitmq/rabbitmq-perf-test/issues/282
63+
# This version of PerfTest will create an exchange if it does not exist if PREDECLARED is set to true: https://github.com/rabbitmq/rabbitmq-perf-test/issues/282
6564
# This means that we cannot use the messaging topology operator to manage our exchange as it can create race conditions with PerfTest: https://github.com/rabbitmq/messaging-topology-operator
6665
# We have chosen the "lesser evil" and are using a non-durable exchange
6766

samples/source/quick-setup/300-perf-test.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ spec:
5555
value: "true"
5656
- name: SIZE
5757
value: "1000"
58-
58+
- name: PREDECLARED
59+
value: "true"
5960
- name: EXCHANGE
6061
value: "eventing-rabbitmq-source"
6162
- name: TYPE
62-
value: "fanout"
63+
value: "headers"
6364
# This version of PerfTest cannot declare a durable exchange: https://github.com/rabbitmq/rabbitmq-perf-test/issues/281
6465
# This version of PerfTest will create an exchange if it does not exist if if PREDECLARED is set to true: https://github.com/rabbitmq/rabbitmq-perf-test/issues/282
6566
# This means that we cannot use the messaging topology operator to manage our exchange as it can create race conditions with PerfTest: https://github.com/rabbitmq/messaging-topology-operator

0 commit comments

Comments
 (0)