-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathimpl.go
More file actions
262 lines (219 loc) · 6.47 KB
/
impl.go
File metadata and controls
262 lines (219 loc) · 6.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
package substrate
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/cenkalti/backoff"
gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
const (
// acceptable delay is amount of blocks (in second) that a node can
// be behind before we don't accept it. block time is 6 seconds, so
// right now we only allow 2 blocks delay
acceptableDelay = 2 * 6 * time.Second
)
var (
//ErrInvalidVersion is returned if version 4bytes is invalid
ErrInvalidVersion = fmt.Errorf("invalid version")
//ErrUnknownVersion is returned if version number is not supported
ErrUnknownVersion = fmt.Errorf("unknown version")
//ErrNotFound is returned if an object is not found
ErrNotFound = fmt.Errorf("object not found")
//ErrClosed is returned if the client is closed
ErrClosed = fmt.Errorf("client closed")
)
// Versioned base for all types
type Versioned struct {
Version uint32 `json:"version"`
}
type Conn = *gsrpc.SubstrateAPI
type Meta = *types.Metadata
type Manager interface {
Raw() (Conn, Meta, error)
Substrate() (*Substrate, error)
}
type mgrImpl struct {
urls []string
r int
m sync.Mutex
}
func NewManager(url ...string) Manager {
if len(url) == 0 {
panic("at least one url is required")
}
// the shuffle is needed so if one endpoints fails, and the next one
// is tried, we will end up moving all connections to the "next" endpoint
// which will get overloaded. Instead the shuffle helps to make the "next"
// different for reach instace of the pool.
rand.Shuffle(len(url), func(i, j int) {
url[i], url[j] = url[j], url[i]
})
return &mgrImpl{
urls: url,
r: rand.Intn(len(url)), // start with random url, then roundrobin
}
}
// endpoint return the next endpoint to use
// in roundrobin fashion. need to be called
// while lock is acquired.
func (p *mgrImpl) endpoint() string {
defer func() {
p.r = (p.r + 1) % len(p.urls)
}()
return p.urls[p.r]
}
// Substrate return a new wrapped substrate connection
// the connection must be closed after you are done using it
func (p *mgrImpl) Substrate() (*Substrate, error) {
cl, meta, err := p.Raw()
if err != nil {
return nil, err
}
return newSubstrate(cl, meta, p.put, p.connect)
}
// Raw returns a RPC substrate client. plus meta. The returned connection
// is not tracked by the pool, nor reusable. It's the caller responsibility
// to close the connection when done
func (p *mgrImpl) Raw() (Conn, Meta, error) {
// right now this pool implementation just tests the connection
// makes sure that it is still active, otherwise, tries again
// until the connection is restored.
// A better pool implementation can be done later were multiple connections
// can be handled
// TODO: thread safety!
p.m.Lock()
defer p.m.Unlock()
boff := backoff.WithMaxRetries(
backoff.NewConstantBackOff(200*time.Millisecond),
4*uint64(len(p.urls)),
)
var (
cl *gsrpc.SubstrateAPI
meta *types.Metadata
err error
)
err = backoff.RetryNotify(func() error {
endpoint := p.endpoint()
log.Debug().Str("url", endpoint).Msg("connecting")
cl, err = newSubstrateAPI(endpoint)
if err != nil {
return errors.Wrapf(err, "error connecting to substrate at '%s'", endpoint)
}
meta, err = cl.RPC.State.GetMetadataLatest()
if err != nil {
return errors.Wrapf(err, "error getting latest metadata at '%s'", endpoint)
}
t, err := getTime(cl, meta)
if err != nil {
return errors.Wrapf(err, "error getting node time at '%s'", endpoint)
}
if time.Since(t) > acceptableDelay {
return fmt.Errorf("node '%s' is behind acceptable delay with timestamp '%s'", endpoint, t)
}
return nil
}, boff, func(err error, _ time.Duration) {
log.Error().Err(err).Msg("failed to connect to endpoint, retrying")
})
return cl, meta, err
}
// connect connects to the next endpoint in roundrobin fashion
// and replaces the current connection with the new one.
// need to be called while lock is acquired.
func (p *mgrImpl) connect(s *Substrate) error {
cl, meta, err := p.Raw()
if err != nil {
return err
}
// close the old connection if it exists
if s.cl != nil {
s.cl.Client.Close()
log.Info().Str("url", s.cl.Client.URL()).Msg("unhealthy connection closed")
}
// set the new connection
s.cl = cl
s.meta = meta
log.Info().Str("url", s.cl.Client.URL()).Msg("connection restored")
return nil
}
// TODO: implement reusable connections instead of
// closing the connection.
func (p *mgrImpl) put(s *Substrate) {
// naive put implementation for now
// we just immediately kill the connection
if s.cl != nil {
s.cl.Client.Close()
}
s.cl = nil
s.meta = nil
}
// Substrate client
type Substrate struct {
mu sync.Mutex
cl Conn
meta Meta
closed bool
close func(s *Substrate)
connect func(s *Substrate) error
}
// NewSubstrate creates a substrate client
func newSubstrate(cl Conn, meta Meta, close func(*Substrate), connect func(s *Substrate) error) (*Substrate, error) {
return &Substrate{cl: cl, meta: meta, close: close, connect: connect}, nil
}
func (s *Substrate) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.close(s)
s.closed = true
}
func (s *Substrate) GetClient() (Conn, Meta, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil, nil, ErrClosed
}
// check if connection is healthy
if _, err := getTime(s.cl, s.meta); err != nil {
log.Info().Str("url", s.cl.Client.URL()).Msg("connection unhealthy, attempting failover")
err := s.connect(s)
if err != nil {
return nil, nil, err // all attempts failed, no connection available
}
}
return s.cl, s.meta, nil
}
func (s *Substrate) getVersion(b types.StorageDataRaw) (uint32, error) {
var ver Versioned
if err := Decode(b, &ver); err != nil {
return 0, errors.Wrapf(ErrInvalidVersion, "failed to load version (reason: %s)", err)
}
return ver.Version, nil
}
func (s *Substrate) Time() (t time.Time, err error) {
cl, meta, err := s.GetClient()
if err != nil {
return t, err
}
return getTime(cl, meta)
}
func getTime(cl Conn, meta Meta) (t time.Time, err error) {
key, err := types.CreateStorageKey(meta, "Timestamp", "Now", nil)
if err != nil {
return t, errors.Wrap(err, "failed to create substrate query key")
}
raw, err := cl.RPC.State.GetStorageRawLatest(key)
if err != nil {
return t, errors.Wrap(err, "failed to lookup entity")
}
var stamp types.Moment
if err := Decode(*raw, &stamp); err != nil {
return t, errors.Wrap(err, "failed to get node time")
}
return stamp.Time, nil
}