Skip to content

Commit 2657108

Browse files
authored
fix(api): make stick sessions actually work and make them non-racy (#12665)
* fix(api): make stick sessions actually work and make them non-racy We apparently have a way to specify that all "related" requests should go to the same node. However: 1. It didn't work at all. All future requests would go to the first successful node from the first request. Because that's how stack variables work. 2. It was racy if the context was re-used concurrently. But only the first time, see point 1. * test(api): test the API merge proxy 1. Test whether or not it works. 2. Test stickiness. * fix(api): update OnSingleNode documentation
1 parent dcc903c commit 2657108

File tree

3 files changed

+93
-15
lines changed

3 files changed

+93
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
- Fix issue where F3 wouldn't start participating again if Lotus restarted without restarting the Miner ([filecoin-project/lotus#12640](https://github.com/filecoin-project/lotus/pull/12640)).
2525
- Change the F3 HeadLookback parameter to 4 ([filecoin-project/lotus#12648](https://github.com/filecoin-project/lotus/pull/12648)).
2626
- Upgrade go-f3 to 0.7.1 to resolve Tipset not found errors when trying to establish instance start time ([filecoin-project/lotus#12651](https://github.com/filecoin-project/lotus/pull/12651)).
27+
- The mining loop will now correctly "stick" to the same upstream lotus node for all operations pertaining to mining a single block ([filecoin-project/lotus#12665](https://github.com/filecoin-project/lotus/pull/12665)).
2728

2829
## Deps
2930

cli/util/api.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os/signal"
1111
"reflect"
1212
"strings"
13+
"sync/atomic"
1314
"syscall"
1415
"time"
1516

@@ -229,9 +230,12 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err
229230

230231
type contextKey string
231232

232-
// OnSingleNode is not thread safe
233+
// OnSingleNode returns a modified context that, when passed to a method on a FullNodeProxy, will
234+
// cause all calls to be directed at the same node when possible.
235+
//
236+
// Think "sticky sessions".
233237
func OnSingleNode(ctx context.Context) context.Context {
234-
return context.WithValue(ctx, contextKey("retry-node"), new(*int))
238+
return context.WithValue(ctx, contextKey("retry-node"), new(atomic.Int32))
235239
}
236240

237241
func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
@@ -262,27 +266,29 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
262266

263267
ctx := args[0].Interface().(context.Context)
264268

265-
curr := -1
266-
267269
// for calls that need to be performed on the same node
268270
// primarily for miner when calling create block and submit block subsequently
269-
key := contextKey("retry-node")
270-
if ctx.Value(key) != nil {
271-
if (*ctx.Value(key).(**int)) == nil {
272-
*ctx.Value(key).(**int) = &curr
273-
} else {
274-
curr = **ctx.Value(key).(**int) - 1
275-
}
271+
var curr *atomic.Int32
272+
if v, ok := ctx.Value(contextKey("retry-node")).(*atomic.Int32); ok {
273+
curr = v
274+
} else {
275+
curr = new(atomic.Int32)
276276
}
277277

278-
total := len(rins)
278+
total := int32(len(rins))
279279
result, _ := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() ([]reflect.Value, error) {
280-
curr = (curr + 1) % total
281-
282-
result := fns[curr].Call(args)
280+
idx := curr.Load()
281+
result := fns[idx].Call(args)
283282
if result[len(result)-1].IsNil() {
284283
return result, nil
285284
}
285+
// On failure, switch to the next node.
286+
//
287+
// We CAS instead of incrementing because this might have
288+
// already been incremented by a concurrent call if we have
289+
// a shared `curr` (we're sticky to a single node).
290+
curr.CompareAndSwap(idx, (idx+1)%total)
291+
286292
e := result[len(result)-1].Interface().(error)
287293
return result, e
288294
})

itests/merged_api_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package itests
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/filecoin-project/go-state-types/big"
10+
11+
"github.com/filecoin-project/lotus/chain/types"
12+
cliutil "github.com/filecoin-project/lotus/cli/util"
13+
"github.com/filecoin-project/lotus/itests/kit"
14+
)
15+
16+
func TestAPIMergeProxy(t *testing.T) {
17+
ctx := context.Background()
18+
19+
// The default is too high for many nodes.
20+
initialBalance := types.MustParseFIL("100000FIL")
21+
22+
nopts := []kit.NodeOpt{
23+
kit.ThroughRPC(),
24+
kit.WithAllSubsystems(),
25+
kit.OwnerBalance(big.Int(initialBalance)),
26+
}
27+
ens := kit.NewEnsemble(t, kit.MockProofs())
28+
nodes := make([]*kit.TestFullNode, 10)
29+
for i := range nodes {
30+
var nd kit.TestFullNode
31+
ens.FullNode(&nd, nopts...)
32+
nodes[i] = &nd
33+
}
34+
merged := kit.MergeFullNodes(nodes)
35+
36+
var miner kit.TestMiner
37+
ens.Miner(&miner, merged, nopts...)
38+
39+
ens.Start()
40+
41+
nd1ID, err := nodes[0].ID(ctx)
42+
require.NoError(t, err)
43+
nd2ID, err := nodes[1].ID(ctx)
44+
require.NoError(t, err)
45+
46+
// Expect to start on node 1, and switch to node 2 on failure.
47+
mergedID, err := merged.ID(ctx)
48+
require.NoError(t, err)
49+
require.Equal(t, nd1ID, mergedID)
50+
require.NoError(t, nodes[0].Stop(ctx))
51+
mergedID, err = merged.ID(ctx)
52+
require.NoError(t, err)
53+
require.Equal(t, nd2ID, mergedID)
54+
55+
// Now see if sticky sessions work
56+
stickyCtx := cliutil.OnSingleNode(ctx)
57+
for i, nd := range nodes[1:] {
58+
// kill off the previous node.
59+
require.NoError(t, nodes[i].Stop(ctx))
60+
61+
got, err := merged.ID(stickyCtx)
62+
require.NoError(t, err)
63+
expected, err := nd.ID(ctx)
64+
require.NoError(t, err)
65+
require.Equal(t, expected, got)
66+
}
67+
68+
// This should fail because we'll run out of retries because it's _not_ sticky!
69+
_, err = merged.ID(ctx)
70+
require.Error(t, err)
71+
}

0 commit comments

Comments
 (0)