Skip to content
This repository was archived by the owner on Jun 4, 2021. It is now read-only.

Commit cfad5be

Browse files
authored
[0.17] Update eventing to latest (#1657)
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
1 parent 090e2b3 commit cfad5be

35 files changed

+1190
-913
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ require (
3939
k8s.io/apimachinery v0.19.0
4040
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
4141
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451
42-
knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4
42+
knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4
4343
knative.dev/pkg v0.0.0-20200824160247-5343c1d19369
4444
knative.dev/serving v0.17.1
4545
knative.dev/test-infra v0.0.0-20200828171708-f68cb78c80a9

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,8 +2028,8 @@ k8s.io/utils v0.0.0-20200603063816-c1c6865ac451/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
20282028
knative.dev/caching v0.0.0-20190719140829-2032732871ff/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
20292029
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
20302030
knative.dev/caching v0.0.0-20200811171106-48c335fed9c8/go.mod h1:XonDcFC2DLSWP71f2y7oYnXUko5d5HsJRnZtkp0wY7g=
2031-
knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4 h1:Wygx5VC4nZDs9p1Om8EYWqHMSNDZ6gWhio09hj0KVMw=
2032-
knative.dev/eventing v0.17.5-0.20200925065343-049b8e743bd4/go.mod h1:9NwCSwLnMCKmgz3YQBNax18mSgBjud8CvfsUUVOZ1sA=
2031+
knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4 h1:g6ud+UJbnjht9uciWVf29aUFAI3IKn2PfyTQpdXkD3Y=
2032+
knative.dev/eventing v0.17.9-0.20201105153307-2fb113c42ff4/go.mod h1:9NwCSwLnMCKmgz3YQBNax18mSgBjud8CvfsUUVOZ1sA=
20332033
knative.dev/eventing-contrib v0.6.1-0.20190723221543-5ce18048c08b/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
20342034
knative.dev/eventing-contrib v0.11.2/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
20352035
knative.dev/networking v0.0.0-20200812200006-4d518e76538a h1:E1rnQR9IZvDcEAgoOXMW9LWqevaYFVTlMS2ndgoAO6Y=

vendor/knative.dev/eventing/pkg/apis/eventing/v1/trigger_lifecycle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (ts *TriggerStatus) MarkDependencyNotConfigured() {
167167
"DependencyNotConfigured", "Dependency has not yet been reconciled.")
168168
}
169169

170-
func (ts *TriggerStatus) PropagateDependencyStatus(ks *duckv1.KResource) {
170+
func (ts *TriggerStatus) PropagateDependencyStatus(ks *duckv1.Source) {
171171
kc := ks.Status.GetCondition(apis.ConditionReady)
172172
if kc == nil {
173173
ts.MarkDependencyNotConfigured()
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright 2020 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kncloudevents
18+
19+
import (
20+
nethttp "net/http"
21+
"sync"
22+
"time"
23+
24+
"go.opencensus.io/plugin/ochttp"
25+
"knative.dev/pkg/tracing/propagation/tracecontextb3"
26+
)
27+
28+
const (
29+
defaultRetryWaitMin = 1 * time.Second
30+
defaultRetryWaitMax = 30 * time.Second
31+
)
32+
33+
type holder struct {
34+
clientMutex sync.Mutex
35+
connectionArgs *ConnectionArgs
36+
client **nethttp.Client
37+
}
38+
39+
var clientHolder = holder{}
40+
41+
// The used HTTP client is a singleton, so the same http client is reused across all the application.
42+
// If connection args is modified, client is cleaned and a new one is created.
43+
func getClient() *nethttp.Client {
44+
clientHolder.clientMutex.Lock()
45+
defer clientHolder.clientMutex.Unlock()
46+
47+
if clientHolder.client == nil {
48+
// Add connection options to the default transport.
49+
var base = nethttp.DefaultTransport.(*nethttp.Transport).Clone()
50+
clientHolder.connectionArgs.configureTransport(base)
51+
c := &nethttp.Client{
52+
// Add output tracing.
53+
Transport: &ochttp.Transport{
54+
Base: base,
55+
Propagation: tracecontextb3.TraceContextEgress,
56+
},
57+
}
58+
clientHolder.client = &c
59+
}
60+
61+
return *clientHolder.client
62+
}
63+
64+
// ConfigureConnectionArgs configures the new connection args.
65+
// The existing client won't be affected, but a new one will be created.
66+
// Use sparingly, because it might lead to creating a lot of clients, none of them sharing their connection pool!
67+
func ConfigureConnectionArgs(ca *ConnectionArgs) {
68+
clientHolder.clientMutex.Lock()
69+
defer clientHolder.clientMutex.Unlock()
70+
71+
// Check if same config
72+
if clientHolder.connectionArgs != nil &&
73+
ca != nil &&
74+
ca.MaxIdleConns == clientHolder.connectionArgs.MaxIdleConns &&
75+
ca.MaxIdleConnsPerHost == clientHolder.connectionArgs.MaxIdleConnsPerHost {
76+
return
77+
}
78+
79+
if clientHolder.client != nil {
80+
// Let's try to clean up a bit the existing client
81+
// Note: this won't remove it nor close it
82+
(*clientHolder.client).CloseIdleConnections()
83+
84+
// Setting client to nil
85+
clientHolder.client = nil
86+
}
87+
88+
clientHolder.connectionArgs = ca
89+
}
90+
91+
// ConnectionArgs allow to configure connection parameters to the underlying
92+
// HTTP Client transport.
93+
type ConnectionArgs struct {
94+
// MaxIdleConns refers to the max idle connections, as in net/http/transport.
95+
MaxIdleConns int
96+
// MaxIdleConnsPerHost refers to the max idle connections per host, as in net/http/transport.
97+
MaxIdleConnsPerHost int
98+
}
99+
100+
func (ca *ConnectionArgs) configureTransport(transport *nethttp.Transport) {
101+
if ca == nil {
102+
return
103+
}
104+
transport.MaxIdleConns = ca.MaxIdleConns
105+
transport.MaxIdleConnsPerHost = ca.MaxIdleConnsPerHost
106+
}

vendor/knative.dev/eventing/pkg/kncloudevents/message_sender.go

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,10 @@ import (
2525

2626
"github.com/hashicorp/go-retryablehttp"
2727
"github.com/rickb777/date/period"
28-
"go.opencensus.io/plugin/ochttp"
29-
"knative.dev/pkg/tracing/propagation/tracecontextb3"
3028

3129
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
3230
)
3331

34-
const (
35-
defaultRetryWaitMin = 1 * time.Second
36-
defaultRetryWaitMax = 30 * time.Second
37-
)
38-
3932
var noRetries = RetryConfig{
4033
RetryMax: 0,
4134
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
@@ -46,41 +39,15 @@ var noRetries = RetryConfig{
4639
},
4740
}
4841

49-
// ConnectionArgs allow to configure connection parameters to the underlying
50-
// HTTP Client transport.
51-
type ConnectionArgs struct {
52-
// MaxIdleConns refers to the max idle connections, as in net/http/transport.
53-
MaxIdleConns int
54-
// MaxIdleConnsPerHost refers to the max idle connections per host, as in net/http/transport.
55-
MaxIdleConnsPerHost int
56-
}
57-
58-
func (ca *ConnectionArgs) ConfigureTransport(transport *nethttp.Transport) {
59-
if ca == nil {
60-
return
61-
}
62-
transport.MaxIdleConns = ca.MaxIdleConns
63-
transport.MaxIdleConnsPerHost = ca.MaxIdleConnsPerHost
64-
}
65-
6642
type HttpMessageSender struct {
6743
Client *nethttp.Client
6844
Target string
6945
}
7046

71-
func NewHttpMessageSender(connectionArgs *ConnectionArgs, target string) (*HttpMessageSender, error) {
72-
// Add connection options to the default transport.
73-
var base = nethttp.DefaultTransport.(*nethttp.Transport).Clone()
74-
connectionArgs.ConfigureTransport(base)
75-
// Add output tracing.
76-
client := &nethttp.Client{
77-
Transport: &ochttp.Transport{
78-
Base: base,
79-
Propagation: tracecontextb3.TraceContextEgress,
80-
},
81-
}
82-
83-
return &HttpMessageSender{Client: client, Target: target}, nil
47+
// Deprecated: Don't use this anymore, now it has the same effect of NewHTTPMessageSenderWithTarget
48+
// If you need to modify the connection args, use ConfigureConnectionArgs sparingly.
49+
func NewHttpMessageSender(ca *ConnectionArgs, target string) (*HttpMessageSender, error) {
50+
return &HttpMessageSender{Client: getClient(), Target: target}, nil
8451
}
8552

8653
func (s *HttpMessageSender) NewCloudEventRequest(ctx context.Context) (*nethttp.Request, error) {
@@ -139,7 +106,12 @@ func (s *HttpMessageSender) SendWithRetries(req *nethttp.Request, config *RetryC
139106
},
140107
}
141108

142-
return retryableClient.Do(&retryablehttp.Request{Request: req})
109+
retryableReq, err := retryablehttp.FromRequest(req)
110+
if err != nil {
111+
return nil, err
112+
}
113+
114+
return retryableClient.Do(retryableReq)
143115
}
144116

145117
func NoRetries() RetryConfig {
@@ -179,6 +151,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)
179151
return retryConfig, nil
180152
}
181153

182-
func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) {
183-
return resp != nil && resp.StatusCode >= 300, nil
154+
func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) {
155+
return !(resp != nil && resp.StatusCode < 300), err
184156
}

vendor/knative.dev/eventing/test/conformance/helpers/broker_control_plane_test_helper.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"knative.dev/pkg/reconciler"
2525

2626
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
27+
"knative.dev/eventing/test/lib/recordevents"
2728

2829
testlib "knative.dev/eventing/test/lib"
2930
"knative.dev/eventing/test/lib/duck"
@@ -81,8 +82,7 @@ func triggerV1Beta1BeforeBrokerHelper(triggerName string, client *testlib.Client
8182
const etLogger = "logger"
8283
const loggerPodName = "logger-pod"
8384

84-
logPod := resources.EventRecordPod(loggerPodName)
85-
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
85+
_ = recordevents.DeployEventRecordOrFail(client, loggerPodName)
8686
client.WaitForAllTestResourcesReadyOrFail() // Can't do this for the trigger because it's not 'ready' yet
8787
client.CreateTriggerOrFailV1Beta1(triggerName,
8888
resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, etLogger, map[string]interface{}{}),

vendor/knative.dev/eventing/test/conformance/helpers/broker_data_plane_test_helper.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,15 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper(
316316
source := "origin-for-reply"
317317
event.SetSource(source)
318318
msg := []byte(`{"msg":"Transformed!"}`)
319-
transformPod := resources.EventTransformationPod(
319+
recordevents.DeployEventRecordOrFail(
320+
client,
320321
"transformer-pod",
321-
"reply-check-type",
322-
"reply-check-source",
323-
msg,
322+
recordevents.ReplyWithTransformedEvent(
323+
"reply-check-type",
324+
"reply-check-source",
325+
string(msg),
326+
),
324327
)
325-
client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod"))
326328
client.WaitForServiceEndpointsOrFail("transformer-pod", 1)
327329
transformTrigger := client.CreateTriggerOrFailV1Beta1(
328330
"transform-trigger",

vendor/knative.dev/eventing/test/conformance/helpers/broker_tracing_test_helper.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"knative.dev/eventing/pkg/utils"
3030
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
3131
testlib "knative.dev/eventing/test/lib"
32+
"knative.dev/eventing/test/lib/recordevents"
3233
"knative.dev/eventing/test/lib/resources"
3334
"knative.dev/eventing/test/lib/sender"
3435
)
@@ -78,8 +79,7 @@ func setupBrokerTracing(brokerClass string) SetupTracingTestInfrastructureFunc {
7879
)
7980

8081
// Create a logger (EventRecord) Pod and a K8s Service that points to it.
81-
logPod := resources.EventRecordPod(loggerPodName)
82-
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
82+
_ = recordevents.DeployEventRecordOrFail(client, loggerPodName)
8383

8484
// Create a Trigger that receives events (type=bar) and sends them to the logger Pod.
8585
loggerTrigger := client.CreateTriggerOrFailV1Beta1(
@@ -89,15 +89,17 @@ func setupBrokerTracing(brokerClass string) SetupTracingTestInfrastructureFunc {
8989
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerPodName),
9090
)
9191

92-
// Create a transformer (EventTransfrmer) Pod that replies with the same event as the input,
92+
// Create a transformer Pod (recordevents with transform reply) that replies with the same event as the input,
9393
// except the reply's event's type is changed to bar.
94-
eventTransformerPod := resources.EventTransformationPod(
94+
eventTransformerPod := recordevents.DeployEventRecordOrFail(
95+
client,
9596
"transformer",
96-
etLogger,
97-
senderName,
98-
[]byte(eventBody),
97+
recordevents.ReplyWithTransformedEvent(
98+
etLogger,
99+
senderName,
100+
eventBody,
101+
),
99102
)
100-
client.CreatePodOrFail(eventTransformerPod, testlib.WithService(eventTransformerPod.Name))
101103

102104
// Create a Trigger that receives events (type=foo) and sends them to the transformer Pod.
103105
transformerTrigger := client.CreateTriggerOrFailV1Beta1(

vendor/knative.dev/eventing/test/conformance/helpers/channel_status_subscriber_test_helper.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
2323
eventingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
2424
testlib "knative.dev/eventing/test/lib"
25+
"knative.dev/eventing/test/lib/recordevents"
2526
"knative.dev/eventing/test/lib/resources"
2627

2728
corev1 "k8s.io/api/core/v1"
@@ -56,8 +57,7 @@ func channelHasRequiredSubscriberStatus(st *testing.T, client *testlib.Client, c
5657
client.CreateChannelOrFail(channelName, &channel)
5758
client.WaitForResourceReadyOrFail(channelName, &channel)
5859

59-
pod := resources.EventRecordPod(subscriberServiceName + "-pod")
60-
client.CreatePodOrFail(pod, testlib.WithService(subscriberServiceName))
60+
_ = recordevents.DeployEventRecordOrFail(client, subscriberServiceName+"-pod")
6161

6262
subscription := client.CreateSubscriptionOrFail(
6363
subscriberServiceName,

vendor/knative.dev/eventing/test/conformance/helpers/channel_tracing_test_helper.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
3030
testlib "knative.dev/eventing/test/lib"
31+
"knative.dev/eventing/test/lib/recordevents"
3132
"knative.dev/eventing/test/lib/resources"
3233
"knative.dev/eventing/test/lib/sender"
3334
)
@@ -66,17 +67,18 @@ func setupChannelTracingWithReply(
6667
client.CreateChannelOrFail(replyChannelName, channel)
6768

6869
// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
69-
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
70-
client.CreatePodOrFail(recordEventsPod, testlib.WithService(recordEventsPodName))
70+
recordEventsPod := recordevents.DeployEventRecordOrFail(client, recordEventsPodName)
7171

7272
// Create the subscriber, a Pod that mutates the event.
73-
transformerPod := resources.EventTransformationPod(
73+
transformerPod := recordevents.DeployEventRecordOrFail(
74+
client,
7475
"transformer",
75-
"mutated",
76-
eventSource,
77-
nil,
76+
recordevents.ReplyWithTransformedEvent(
77+
"mutated",
78+
eventSource,
79+
"",
80+
),
7881
)
79-
client.CreatePodOrFail(transformerPod, testlib.WithService(transformerPod.Name))
8082

8183
// Create the Subscription linking the Channel to the mutator.
8284
client.CreateSubscriptionOrFail(

0 commit comments

Comments
 (0)