Skip to content

Commit f7a31c2

Browse files
CLD-769: Improve catalog client reliability (#573)
## CLD-769: Improve catalog client reliability - Add keepalive pings to ensure that a stream stays alive even when there is no traffic - Add grpc retry policy for retriable transient errors. This will allow the grpc client to retry connecting to catalog when a network error is received improving the user experience instead of instantly failing the workflow - Add connection object to the `CatalogClient`. This will allow us to close the grpc connection when needed. This works on top of closing our side of the stream with the `CloseStream` functions
1 parent 7d024a2 commit f7a31c2

File tree

3 files changed

+66
-0
lines changed

3 files changed

+66
-0
lines changed

.changeset/eleven-dragons-laugh.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink-deployments-framework": minor
3+
---
4+
5+
add grpc keepalive, retries and connection closure functionality

datastore/catalog/remote/grpc.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,36 @@ package remote
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
8+
"time"
79

810
pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore"
911
"google.golang.org/grpc"
1012
"google.golang.org/grpc/credentials"
13+
"google.golang.org/grpc/keepalive"
1114
"google.golang.org/protobuf/proto"
1215
)
1316

17+
const retryPolicy = `{
18+
"methodConfig": [{
19+
"name": [{"service": "op_catalog.v1.datastore.Datastore"}],
20+
"retryPolicy": {
21+
"maxAttempts": 5,
22+
"initialBackoff": "0.1s",
23+
"maxBackoff": "1s",
24+
"backoffMultiplier": 2,
25+
"retryableStatusCodes": [
26+
"UNAVAILABLE",
27+
"DEADLINE_EXCEEDED",
28+
"INTERNAL",
29+
"RESOURCE_EXHAUSTED"
30+
]
31+
}
32+
}]
33+
}`
34+
1435
type CatalogClient struct {
1536
protoClient pb.DatastoreClient
1637
// ctx is cached here, because we need the context that created the client, not the current
@@ -23,6 +44,7 @@ type CatalogClient struct {
2344
//
2445
//nolint:containedctx
2546
ctx context.Context
47+
conn *grpc.ClientConn
2648
cachedStream grpc.BidiStreamingClient[pb.DataAccessRequest, pb.DataAccessResponse]
2749
hmacConfig *HMACAuthConfig
2850
streamInitOnce sync.Once
@@ -55,6 +77,7 @@ func (c *CatalogClient) DataAccess(req proto.Message) (grpc.BidiStreamingClient[
5577
return c.cachedStream, c.streamInitErr
5678
}
5779

80+
// CloseStream closes the current stream.
5881
func (c *CatalogClient) CloseStream() error {
5982
if c.cachedStream == nil {
6083
return nil
@@ -68,6 +91,19 @@ func (c *CatalogClient) CloseStream() error {
6891
return nil
6992
}
7093

94+
// Close closes the underlying gRPC connection.
95+
func (c *CatalogClient) Close() error {
96+
if c.cachedStream != nil {
97+
return errors.New("stream is not closed")
98+
}
99+
100+
if c.conn != nil {
101+
return c.conn.Close()
102+
}
103+
104+
return nil
105+
}
106+
71107
type CatalogConfig struct {
72108
GRPC string
73109
Creds credentials.TransportCredentials
@@ -98,6 +134,7 @@ func NewCatalogClient(ctx context.Context, cfg CatalogConfig) (*CatalogClient, e
98134
client := CatalogClient{
99135
ctx: ctx,
100136
hmacConfig: cfg.HMACConfig,
137+
conn: conn,
101138
protoClient: pb.NewDatastoreClient(conn),
102139
}
103140

@@ -122,6 +159,16 @@ func newCatalogConnection(cfg CatalogConfig) (*grpc.ClientConn, error) {
122159
opts = append(opts, grpc.WithAuthority(cfg.HMACConfig.Authority))
123160
}
124161

162+
// Keepalive for long-lived bidirectional streams
163+
// Ping every 20 seconds, wait up to 10 seconds for a response
164+
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
165+
Time: 20 * time.Second,
166+
Timeout: 10 * time.Second,
167+
PermitWithoutStream: true,
168+
}))
169+
170+
opts = append(opts, grpc.WithDefaultServiceConfig(retryPolicy))
171+
125172
conn, err := grpc.NewClient(cfg.GRPC, opts...)
126173
if err != nil {
127174
return nil, err

datastore/catalog/remote/grpc_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,17 @@ func TestNewCatalogClient_Success(t *testing.T) {
7070
})
7171
}
7272
}
73+
74+
func TestCatalogClient_Close(t *testing.T) {
75+
t.Parallel()
76+
77+
client, err := remote.NewCatalogClient(t.Context(), remote.CatalogConfig{
78+
GRPC: "localhost:9090",
79+
Creds: insecure.NewCredentials(),
80+
})
81+
82+
require.NoError(t, err)
83+
require.NotNil(t, client)
84+
85+
require.NoError(t, client.Close())
86+
}

0 commit comments

Comments
 (0)