@@ -39,6 +39,7 @@ import (
3939 "github.com/cloudevents/sdk-go/v2/types"
4040 "github.com/kelseyhightower/envconfig"
4141 "go.opencensus.io/trace"
42+ "go.uber.org/atomic"
4243 "go.uber.org/zap"
4344 "k8s.io/apimachinery/pkg/util/wait"
4445 "knative.dev/pkg/logging"
@@ -116,6 +117,10 @@ type generator struct {
116117 eventQueue []conformanceevent.Event
117118}
118119
120+ var (
121+ verifyConnectionCounter = atomic .NewUint64 (0 )
122+ )
123+
119124func Start (ctx context.Context , logs * eventshub.EventLogs , clientOpts ... eventshub.ClientOption ) error {
120125 var env generator
121126 if err := envconfig .Process ("" , & env ); err != nil {
@@ -143,27 +148,9 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
143148 logging .FromContext (ctx ).Info ("awake, continuing" )
144149 }
145150
146- httpClient := nethttp .DefaultClient
147-
148- if env .EnforceTLS {
149- caCertPool , err := x509 .SystemCertPool ()
150- if err != nil {
151- return fmt .Errorf ("failed to create cert pool %s: %w" , env .Sink , err )
152- }
153- caCertPool .AppendCertsFromPEM ([]byte (env .CACerts ))
154-
155- transport := nethttp .DefaultTransport .(* nethttp.Transport ).Clone ()
156- transport .TLSClientConfig = & tls.Config {
157- RootCAs : caCertPool ,
158- MinVersion : tls .VersionTLS12 ,
159- VerifyConnection : func (state tls.ConnectionState ) error {
160- if err := logs .Vent (env .peerCertificatesReceived (state )); err != nil {
161- return err
162- }
163- return nil
164- },
165- }
166- httpClient = & nethttp.Client {Transport : transport }
151+ httpClient , _ , err := createClient (ctx , env , logs )
152+ if err != nil {
153+ return err
167154 }
168155
169156 if env .ProbeSink {
@@ -185,12 +172,6 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
185172 }
186173 }
187174
188- for _ , opt := range clientOpts {
189- if err := opt (httpClient ); err != nil {
190- return fmt .Errorf ("unable to apply option: %w" , err )
191- }
192- }
193-
194175 switch env .EventEncoding {
195176 case "binary" :
196177 ctx = cloudevents .WithEncodingBinary (ctx )
@@ -203,6 +184,19 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
203184 ticker := time .NewTicker (period )
204185 for {
205186
187+ // when enforcing TLS we want to create multiple transports to force multiple TLS handshakes
188+ // on each request sent so that VerifyConnection is called multiple times.
189+ httpClient , _ , err = createClient (ctx , env , logs )
190+ if err != nil {
191+ return err
192+ }
193+
194+ for _ , opt := range clientOpts {
195+ if err := opt (httpClient ); err != nil {
196+ return fmt .Errorf ("unable to apply option: %w" , err )
197+ }
198+ }
199+
206200 ctx , span := trace .StartSpan (ctx , "eventshub-sender" )
207201
208202 req , event , err := env .next (ctx )
@@ -251,13 +245,46 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
251245 }
252246}
253247
254- func (g * generator ) peerCertificatesReceived (state tls.ConnectionState ) eventshub.EventInfo {
248+ func createClient (ctx context.Context , env generator , logs * eventshub.EventLogs ) (* nethttp.Client , * nethttp.Transport , error ) {
249+ if env .EnforceTLS {
250+ caCertPool , err := x509 .SystemCertPool ()
251+ if err != nil {
252+ return nil , nil , fmt .Errorf ("failed to create cert pool %s: %w" , env .Sink , err )
253+ }
254+ caCertPool .AppendCertsFromPEM ([]byte (env .CACerts ))
255+
256+ transport := nethttp .DefaultTransport .(* nethttp.Transport ).Clone ()
257+
258+ // Force multiple TLS handshakes
259+ transport .DisableKeepAlives = true
260+ transport .IdleConnTimeout = 500 * time .Millisecond
261+
262+ transport .TLSClientConfig = & tls.Config {
263+ RootCAs : caCertPool ,
264+ MinVersion : tls .VersionTLS12 ,
265+ VerifyConnection : func (state tls.ConnectionState ) error {
266+ logging .FromContext (ctx ).Infow ("VerifyConnection" )
267+
268+ if err := logs .Vent (env .peerCertificatesReceived (verifyConnectionCounter .Inc (), state )); err != nil {
269+ return err
270+ }
271+ return nil
272+ },
273+ }
274+ return & nethttp.Client {Transport : transport }, transport , nil
275+ }
276+
277+ return nethttp .DefaultClient , nethttp .DefaultTransport .(* nethttp.Transport ), nil
278+ }
279+
280+ func (g * generator ) peerCertificatesReceived (counter uint64 , state tls.ConnectionState ) eventshub.EventInfo {
255281 return eventshub.EventInfo {
256282 Kind : eventshub .PeerCertificatesReceived ,
257283 Connection : eventshub .TLSConnectionStateToConnection (& state ),
258284 Origin : g .SenderName ,
259285 Observer : g .SenderName ,
260286 Time : time .Now (),
287+ Sequence : counter ,
261288 }
262289}
263290
0 commit comments