Skip to content

Commit b7c2570

Browse files
committed
pkg/loop/internal/net: gracefully handle ctx done when refreshing conn
Ticket: https://smartcontract-it.atlassian.net/browse/CCIP-8538 * Check whether the context is done in the refresh loops prior to retrying. This is to avoid blocking the caller forever. * Add a regression test
1 parent b1e41f5 commit b7c2570

File tree

2 files changed

+204
-0
lines changed

2 files changed

+204
-0
lines changed

pkg/loop/internal/net/client.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,26 @@ func (c *clientConn) Invoke(ctx context.Context, method string, args any, reply
8484
for cc != nil {
8585
err := cc.Invoke(ctx, method, args, reply, opts...)
8686
if isErrTerminal(err) {
87+
if ctx.Err() != nil {
88+
// If the context gets canceled or its deadline is exceeded, stop trying to
89+
// connect in order to not block the caller indefinitely.
90+
// If for whatever reason the LOOPP is down the client should retry whatever
91+
// call they were making.
92+
c.Logger.Warnw("clientConn: Invoke: context canceled, stopping refresh",
93+
"method", method,
94+
"err", err,
95+
"ctxErr", ctx.Err())
96+
// TODO: also return ctx.Err()?
97+
return err
98+
}
8799
if method == pb.Service_Close_FullMethodName {
88100
// don't reconnect just to call Close
89101
c.Logger.Warnw("clientConn: Invoke: terminal error", "method", method, "err", err)
90102
return err
91103
}
92104
c.Logger.Errorw("clientConn: Invoke: terminal error, refreshing connection", "method", method, "err", err)
93105
cc, refErr = c.refresh(ctx, cc)
106+
// TODO: refErr is never checked, should it? e.g. context being done and having to return?
94107
continue
95108
}
96109
return err
@@ -110,6 +123,18 @@ func (c *clientConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, metho
110123
for cc != nil {
111124
s, err := cc.NewStream(ctx, desc, method, opts...)
112125
if isErrTerminal(err) {
126+
if ctx.Err() != nil {
127+
// If the context gets canceled or its deadline is exceeded, stop trying to
128+
// connect in order to not block the caller indefinitely.
129+
// If for whatever reason the LOOPP is down the client should retry whatever
130+
// call they were making.
131+
c.Logger.Warnw("clientConn: Invoke: context canceled, stopping refresh",
132+
"method", method,
133+
"err", err,
134+
"ctxErr", ctx.Err())
135+
// TODO: also return ctx.Err()?
136+
return s, err
137+
}
113138
c.Logger.Errorw("clientConn: NewStream: terminal error, refreshing connection", "err", err)
114139
cc, refErr = c.refresh(ctx, cc)
115140
continue
@@ -181,3 +206,16 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) (*grpc.
181206

182207
return c.cc, nil
183208
}
209+
210+
// Close closes the underlying connection and all dependencies.
211+
func (c *clientConn) Close() error {
212+
c.mu.Lock()
213+
defer c.mu.Unlock()
214+
if c.cc != nil {
215+
if err := c.cc.Close(); err != nil {
216+
c.Logger.Errorw("Client close failed", "err", err)
217+
}
218+
}
219+
c.CloseAll(c.deps...)
220+
return nil
221+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package net_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
9+
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
10+
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net/test"
11+
"github.com/stretchr/testify/require"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/status"
15+
)
16+
17+
// fnCloser implements io.Closer with a func().
18+
type fnCloser func()
19+
20+
func (s fnCloser) Close() error {
21+
s()
22+
return nil
23+
}
24+
25+
type lifecycleClient interface {
26+
grpc.ClientConnInterface
27+
Close() error
28+
}
29+
30+
type lifecycleTestSetup struct {
31+
lifecycleClient
32+
newClientCalls int
33+
}
34+
35+
func setupLifecycleTest(t *testing.T, name string) *lifecycleTestSetup {
36+
broker := &test.Broker{T: t}
37+
brokerExt := &net.BrokerExt{
38+
Broker: broker,
39+
BrokerConfig: net.BrokerConfig{
40+
Logger: logger.Test(t),
41+
},
42+
}
43+
44+
setup := &lifecycleTestSetup{}
45+
newClientFn := func(ctx context.Context) (uint32, net.Resources, error) {
46+
setup.newClientCalls++
47+
id := broker.NextId()
48+
lis, err := broker.Accept(id)
49+
if err != nil {
50+
return 0, nil, err
51+
}
52+
s := grpc.NewServer()
53+
go func() {
54+
if err := s.Serve(lis); err != nil {
55+
// ignore
56+
}
57+
}()
58+
59+
res := net.Resource{Closer: fnCloser(func() { s.Stop() }), Name: "Server"}
60+
var resources net.Resources
61+
resources.Add(res)
62+
63+
return id, resources, nil
64+
}
65+
66+
c := brokerExt.NewClientConn(name, newClientFn)
67+
t.Cleanup(func() { c.Close() })
68+
setup.lifecycleClient = c
69+
70+
return setup
71+
}
72+
73+
func TestClient_Lifecycle(t *testing.T) {
74+
t.Parallel()
75+
76+
t.Run("Invoke Context Cancellation", func(t *testing.T) {
77+
setup := setupLifecycleTest(t, "TestClient")
78+
79+
// 3. Establish connection by calling Invoke() on a non-existent method.
80+
ctx := context.Background()
81+
// Method "Foo" doesn't exist, should return Unimplemented (code 12), which is not a terminal error.
82+
err := setup.Invoke(ctx, "/Service/Foo", nil, nil)
83+
require.Error(t, err)
84+
require.Equal(t, codes.Unimplemented, status.Code(err))
85+
require.Equal(t, 1, setup.newClientCalls, "Should have called newClientFn once to establish connection")
86+
87+
// 4. Now call with CANCELLED context.
88+
ctxCancelled, cancel := context.WithCancel(context.Background())
89+
cancel() // Cancel immediately
90+
91+
// This call should return Canceled immediately and NOT trigger refresh.
92+
err = setup.Invoke(ctxCancelled, "/Service/Foo", nil, nil)
93+
94+
require.Equal(t, codes.Canceled, status.Code(err))
95+
96+
// If the bug exists, newClientFn will be called AGAIN (incrementing to 2).
97+
// If fixed, it should remain 1.
98+
require.Equal(t, 1, setup.newClientCalls, "Should NOT refresh connection on context cancellation")
99+
})
100+
101+
t.Run("Invoke Context Deadline Exceeded", func(t *testing.T) {
102+
setup := setupLifecycleTest(t, "TestClient_Deadline")
103+
104+
// 3. Establish connection first
105+
ctx := context.Background()
106+
err := setup.Invoke(ctx, "/Service/Foo", nil, nil)
107+
require.Error(t, err)
108+
require.Equal(t, codes.Unimplemented, status.Code(err))
109+
require.Equal(t, 1, setup.newClientCalls, "Should have called newClientFn once to establish connection")
110+
111+
// 4. Now call with TIMED OUT context
112+
// We use a very short timeout and wait for it to expire
113+
ctxTimeout, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
114+
defer cancel()
115+
<-ctxTimeout.Done() // Wait for timeout
116+
117+
// This call should return DeadlineExceeded immediately and NOT trigger refresh.
118+
err = setup.Invoke(ctxTimeout, "/Service/Foo", nil, nil)
119+
120+
require.Equal(t, codes.DeadlineExceeded, status.Code(err))
121+
require.Equal(t, 1, setup.newClientCalls, "Should NOT refresh connection on context deadline exceeded")
122+
})
123+
124+
t.Run("NewStream Context Cancellation", func(t *testing.T) {
125+
setup := setupLifecycleTest(t, "TestClient_Stream")
126+
127+
// 3. Establish connection first
128+
ctx := context.Background()
129+
err := setup.Invoke(ctx, "/Service/Probe", nil, nil) // Force connection
130+
require.Error(t, err) // Unimplemented
131+
132+
// 4. Now call with CANCELLED context
133+
ctxCancelled, cancel := context.WithCancel(context.Background())
134+
cancel()
135+
136+
stream, err := setup.NewStream(ctxCancelled, &grpc.StreamDesc{StreamName: "Foo", ClientStreams: true, ServerStreams: true}, "/Service/Foo")
137+
138+
require.Error(t, err)
139+
require.Equal(t, codes.Canceled, status.Code(err))
140+
require.Nil(t, stream)
141+
142+
require.Equal(t, 1, setup.newClientCalls, "Should NOT refresh connection on NewStream context cancellation")
143+
})
144+
145+
t.Run("NewStream Context Deadline Exceeded", func(t *testing.T) {
146+
setup := setupLifecycleTest(t, "TestClient_Stream_Deadline")
147+
148+
// 3. Establish connection first
149+
ctx := context.Background()
150+
err := setup.Invoke(ctx, "/Service/Probe", nil, nil) // Force connection
151+
require.Error(t, err) // Unimplemented
152+
153+
// 4. Now call with TIMED OUT context
154+
ctxTimeout, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
155+
defer cancel()
156+
<-ctxTimeout.Done()
157+
158+
stream, err := setup.NewStream(ctxTimeout, &grpc.StreamDesc{StreamName: "Foo", ClientStreams: true, ServerStreams: true}, "/Service/Foo")
159+
160+
require.Error(t, err)
161+
require.Equal(t, codes.DeadlineExceeded, status.Code(err))
162+
require.Nil(t, stream)
163+
164+
require.Equal(t, 1, setup.newClientCalls, "Should NOT refresh connection on NewStream context deadline exceeded")
165+
})
166+
}

0 commit comments

Comments
 (0)