Skip to content

Commit 407b071

Browse files
Created function to create client pool
1 parent 66dd489 commit 407b071

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed

internal/grpcpool/clientpool.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package grpcpool
2+
3+
import (
4+
"fmt"
5+
6+
"google.golang.org/grpc"
7+
)
8+
9+
/* creates a new client pool */
10+
func NewClientPool(opts ...grpc.DialOption) *ClientPool {
11+
return &ClientPool{
12+
conns: make(map[string]*grpc.ClientConn),
13+
dialOptions: opts,
14+
stopCh: make(chan struct{}),
15+
}
16+
}
17+
18+
/*
19+
creates a connection to given server
20+
allows multiple connections to be established to daemons for any transactions in execution
21+
*/
22+
func (p *ClientPool) GetConn(addr string, errCh chan<-error) (*grpc.ClientConn, error) {
23+
/* check if connection exists or not */
24+
p.mu.RLock()
25+
conn, exists := p.conns[addr]
26+
p.mu.RUnlock()
27+
28+
/* return the connection if it exists */
29+
if exists {
30+
return conn, nil
31+
}
32+
33+
/* so connection doesn't exist, create a new one */
34+
p.mu.Lock()
35+
defer p.mu.Unlock()
36+
37+
/* double check again (might been created between if exists and this line) */
38+
conn, exists = p.conns[addr]
39+
if exists {
40+
return conn, nil
41+
}
42+
43+
/* create a new client for gRPC server */
44+
newConn, err := grpc.NewClient(addr, p.dialOptions...)
45+
if err != nil {
46+
return nil, fmt.Errorf("failed to add new connection", err)
47+
}
48+
49+
/* add connection to the pool */
50+
p.conns[addr] = newConn
51+
52+
/*
53+
in case of connection issues, it will remove itself from connection pool
54+
when connection is demanded again, whole logic written above will be executed again
55+
*/
56+
go p.MonitorHealth(addr, newConn, errCh)
57+
58+
/* return connection */
59+
return newConn, nil
60+
}
61+
62+
/*
63+
close all connections in the pool
64+
call this while error channel exists
65+
*/
66+
func (p *ClientPool) CloseAll(errCh chan<-error) {
67+
p.mu.Lock()
68+
defer p.mu.Unlock()
69+
70+
/* iterate over all the connections and attempt to close them all */
71+
for _, conn := range p.conns {
72+
if err := conn.Close(); err != nil {
73+
errCh <- fmt.Errorf("error while closing gRPC connection: %w", err)
74+
}
75+
}
76+
77+
p.conns = make(map[string]*grpc.ClientConn)
78+
}

0 commit comments

Comments
 (0)