Skip to content

Commit 500072a

Browse files
Fix: gRPC reconnection. Add some client gRPC options
1 parent f720685 commit 500072a

File tree

3 files changed

+219
-68
lines changed

3 files changed

+219
-68
lines changed

examples/grpc/client.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import (
99
"github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
1010
generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb"
1111
"github.com/pkg/errors"
12+
"github.com/rs/zerolog/log"
1213
)
1314

1415
// Client -
1516
type Client struct {
1617
*grpc.Client
1718

1819
output *modules.Output
20+
stream *grpc.Stream[*pb.Response]
1921

2022
client pb.TimeServiceClient
2123
wg *sync.WaitGroup
@@ -33,6 +35,9 @@ func NewClient(server string) *Client {
3335
// Start -
3436
func (client *Client) Start(ctx context.Context) {
3537
client.client = pb.NewTimeServiceClient(client.Connection())
38+
39+
client.wg.Add(1)
40+
go client.reconnect(ctx)
3641
}
3742

3843
// SubscribeOnTime -
@@ -41,17 +46,47 @@ func (client *Client) SubscribeOnTime(ctx context.Context) (uint64, error) {
4146
if err != nil {
4247
return 0, err
4348
}
49+
client.stream = grpc.NewStream[*pb.Response](stream)
50+
51+
client.wg.Add(1)
52+
go client.handleTime(ctx)
4453

45-
return grpc.Subscribe[*pb.Response](
46-
stream,
47-
client.handleTime,
48-
client.wg,
49-
)
54+
return client.stream.Subscribe(ctx)
5055
}
5156

52-
func (client *Client) handleTime(ctx context.Context, data *pb.Response, id uint64) error {
53-
client.output.Push(data)
54-
return nil
57+
func (client *Client) reconnect(ctx context.Context) {
58+
defer client.wg.Done()
59+
60+
for {
61+
select {
62+
case <-ctx.Done():
63+
return
64+
case <-client.Reconnect():
65+
if client.stream != nil {
66+
if err := client.stream.Close(); err != nil {
67+
log.Err(err).Msg("closing stream")
68+
continue
69+
}
70+
}
71+
72+
if _, err := client.SubscribeOnTime(ctx); err != nil {
73+
log.Err(err).Msg("subscription error")
74+
}
75+
}
76+
}
77+
}
78+
79+
func (client *Client) handleTime(ctx context.Context) {
80+
defer client.wg.Done()
81+
82+
for {
83+
select {
84+
case <-ctx.Done():
85+
return
86+
case msg := <-client.stream.Listen():
87+
client.output.Push(msg)
88+
}
89+
}
5590
}
5691

5792
// UnsubscribeFromTime -
@@ -86,3 +121,18 @@ func (client *Client) AttachTo(name string, input *modules.Input) error {
86121
output.Attach(input)
87122
return nil
88123
}
124+
125+
// Close -
126+
func (client *Client) Close() error {
127+
client.wg.Wait()
128+
129+
if client.stream != nil {
130+
if err := client.stream.Close(); err != nil {
131+
return err
132+
}
133+
}
134+
if err := client.Client.Close(); err != nil {
135+
return err
136+
}
137+
return nil
138+
}

pkg/modules/grpc/client.go

Lines changed: 121 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/rs/zerolog/log"
1212
"google.golang.org/grpc"
1313
gogrpc "google.golang.org/grpc"
14+
"google.golang.org/grpc/backoff"
15+
"google.golang.org/grpc/connectivity"
1416
"google.golang.org/grpc/keepalive"
1517
)
1618

@@ -19,12 +21,17 @@ type Client struct {
1921
conn *gogrpc.ClientConn
2022

2123
serverAddress string
24+
reconnect chan struct{}
25+
26+
wg *sync.WaitGroup
2227
}
2328

2429
// NewClient - constructor of client structure
2530
func NewClient(server string) *Client {
2631
return &Client{
2732
serverAddress: server,
33+
reconnect: make(chan struct{}, 1),
34+
wg: new(sync.WaitGroup),
2835
}
2936
}
3037

@@ -37,7 +44,7 @@ func (client *Client) Name() string {
3744
func (client *Client) Connect(ctx context.Context, opts ...ConnectOption) error {
3845
dialOpts := []gogrpc.DialOption{
3946
gogrpc.WithKeepaliveParams(keepalive.ClientParameters{
40-
Time: 20 * time.Second,
47+
Time: 10 * time.Second,
4148
Timeout: 10 * time.Second,
4249
}),
4350
}
@@ -46,33 +53,22 @@ func (client *Client) Connect(ctx context.Context, opts ...ConnectOption) error
4653
opts[i](&connectOpts)
4754
}
4855
dialOpts = append(dialOpts, gogrpc.WithTransportCredentials(connectOpts.creds))
49-
50-
conn, err := gogrpc.Dial(
51-
client.serverAddress,
52-
dialOpts...,
53-
)
54-
if err != nil {
55-
return errors.Wrap(err, "dial connection")
56-
}
57-
client.conn = conn
58-
59-
return nil
60-
}
61-
62-
// WaitConnect - trying to connect to server
63-
func (client *Client) WaitConnect(ctx context.Context, opts ...ConnectOption) error {
64-
dialOpts := []gogrpc.DialOption{
65-
gogrpc.WithBlock(),
66-
gogrpc.WithKeepaliveParams(keepalive.ClientParameters{
67-
Time: 20 * time.Second,
68-
Timeout: 10 * time.Second,
69-
}),
70-
}
71-
connectOpts := newConnectOptions()
72-
for i := range opts {
73-
opts[i](&connectOpts)
56+
dialOpts = append(dialOpts, gogrpc.WithConnectParams(
57+
gogrpc.ConnectParams{
58+
MinConnectTimeout: connectOpts.reconnectTimeout,
59+
Backoff: backoff.Config{
60+
BaseDelay: 1.0 * time.Second,
61+
Multiplier: 1.6,
62+
Jitter: 0.2,
63+
MaxDelay: connectOpts.reconnectionTime,
64+
},
65+
},
66+
))
67+
dialOpts = append(dialOpts, gogrpc.WithUserAgent(connectOpts.userAgent))
68+
69+
if connectOpts.wait {
70+
dialOpts = append(dialOpts, gogrpc.WithBlock())
7471
}
75-
dialOpts = append(dialOpts, gogrpc.WithTransportCredentials(connectOpts.creds))
7672

7773
conn, err := gogrpc.Dial(
7874
client.serverAddress,
@@ -87,69 +83,136 @@ func (client *Client) WaitConnect(ctx context.Context, opts ...ConnectOption) er
8783
}
8884

8985
// Start - starts authentication client module
90-
func (client *Client) Start(ctx context.Context) {}
86+
func (client *Client) Start(ctx context.Context) {
87+
client.wg.Add(1)
88+
go client.checkConnectionState(ctx)
89+
}
90+
91+
// Reconnect - returns channel with reconnection events
92+
func (client *Client) Reconnect() <-chan struct{} {
93+
return client.reconnect
94+
}
9195

9296
// Close - closes authentication client module
9397
func (client *Client) Close() error {
98+
client.wg.Wait()
99+
94100
if err := client.conn.Close(); err != nil {
95101
return err
96102
}
103+
close(client.reconnect)
97104
return nil
98105
}
99106

107+
func (client *Client) checkConnectionState(ctx context.Context) {
108+
defer client.wg.Done()
109+
110+
ticker := time.NewTicker(time.Millisecond * 100)
111+
defer ticker.Stop()
112+
113+
status := connectivity.Ready
114+
for {
115+
select {
116+
case <-ctx.Done():
117+
return
118+
case <-ticker.C:
119+
state := client.conn.GetState()
120+
switch state {
121+
case connectivity.Connecting:
122+
case connectivity.Idle:
123+
client.conn.Connect()
124+
case connectivity.Ready:
125+
if status != state {
126+
client.reconnect <- struct{}{}
127+
}
128+
case connectivity.Shutdown:
129+
case connectivity.TransientFailure:
130+
}
131+
status = state
132+
}
133+
}
134+
}
135+
100136
// Connection - receives connection entity
101137
func (client *Client) Connection() *grpc.ClientConn {
102138
return client.conn
103139
}
104140

105-
// ClientStream -
106-
type ClientStream[T any] interface {
107-
Recv() (T, error)
108-
grpc.ClientStream
141+
// Stream -
142+
type Stream[T any] struct {
143+
stream grpc.ClientStream
144+
data chan *T
145+
146+
wg *sync.WaitGroup
147+
}
148+
149+
// NewStream - creates new stream
150+
func NewStream[T any](stream grpc.ClientStream) *Stream[T] {
151+
return &Stream[T]{
152+
stream: stream,
153+
data: make(chan *T, 1024),
154+
155+
wg: new(sync.WaitGroup),
156+
}
109157
}
110158

111-
// Subscribe - generic function to subscribe on events from server
112-
func Subscribe[T any](
113-
stream ClientStream[T],
114-
handler SubscriptionHandler[T],
115-
wg *sync.WaitGroup,
116-
) (uint64, error) {
159+
// Subscribe - generic function to subscribe on service stream
160+
func (s *Stream[T]) Subscribe(ctx context.Context) (uint64, error) {
117161
var msg pb.SubscribeResponse
118-
if err := stream.RecvMsg(&msg); err != nil {
162+
if err := s.stream.RecvMsg(&msg); err != nil {
119163
return 0, err
120164
}
121165

122-
wg.Add(1)
123-
go listen(stream, msg.Id, handler, wg)
166+
s.wg.Add(1)
167+
go s.listen(ctx, msg.Id)
124168

125169
return msg.Id, nil
126170
}
127171

128-
// SubscriptionHandler is handled on subscription message
129-
type SubscriptionHandler[T any] func(ctx context.Context, data T, subscriptionID uint64) error
172+
// Listen - channel with received messages
173+
func (s *Stream[T]) Listen() <-chan *T {
174+
return s.data
175+
}
130176

131-
func listen[T any](stream ClientStream[T], subscriptionID uint64, handler SubscriptionHandler[T], wg *sync.WaitGroup) {
132-
defer wg.Done()
177+
func (s *Stream[T]) listen(ctx context.Context, id uint64) {
178+
defer s.wg.Done()
133179

134180
for {
135181
select {
136-
case <-stream.Context().Done():
182+
case <-ctx.Done():
183+
log.Info().Msg("stop listening")
137184
return
138185
default:
139-
data, err := stream.Recv()
140-
if err == io.EOF {
141-
continue
142-
}
143-
if err != nil {
186+
var msg T
187+
err := s.stream.RecvMsg(&msg)
188+
switch {
189+
case err == io.EOF:
190+
log.Info().Msg("connection to gRPC was closed")
191+
return
192+
case err != nil:
144193
log.Err(err).Msg("receiving subscription error")
145-
continue
146-
}
147-
148-
if handler != nil {
149-
if err := handler(stream.Context(), data, subscriptionID); err != nil {
150-
log.Err(err).Msg("subscription handler error")
151-
}
194+
return
195+
default:
196+
s.data <- &msg
152197
}
153198
}
154199
}
155200
}
201+
202+
// Close - closes stream
203+
func (s *Stream[T]) Close() error {
204+
s.wg.Wait()
205+
206+
close(s.data)
207+
return nil
208+
}
209+
210+
// Unsubscribe -
211+
func (s *Stream[T]) Unsubscribe(ctx context.Context, id uint64) error {
212+
return s.stream.SendMsg(&pb.UnsubscribeRequest{Id: id})
213+
}
214+
215+
// Context -
216+
func (s *Stream[T]) Context() context.Context {
217+
return s.stream.Context()
218+
}

0 commit comments

Comments
 (0)