Skip to content

Commit 78b411f

Browse files
committed
Add partial mempool loading of requests
1 parent 728d014 commit 78b411f

File tree

5 files changed

+63
-32
lines changed

5 files changed

+63
-32
lines changed

clients/iscmove/iscmoveclient/client_request.go

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package iscmoveclient
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"sort"
68
"time"
79

810
"github.com/samber/lo"
11+
"golang.org/x/exp/maps"
912

1013
"github.com/iotaledger/wasp/clients/iota-go/iotaclient"
1114
"github.com/iotaledger/wasp/clients/iota-go/iotago"
@@ -212,10 +215,17 @@ func (c *Client) parseRequestAndFetchAssetsBag(obj *iotajsonrpc.IotaObjectData)
212215
}, nil
213216
}
214217

215-
func (c *Client) GetRequestsWithCB(ctx context.Context, packageID iotago.PackageID, anchorAddress *iotago.ObjectID, cb func(error, *iscmove.RefWithObject[iscmove.Request])) error {
218+
func (c *Client) GetRequestsSorted(ctx context.Context, packageID iotago.PackageID, anchorAddress *iotago.ObjectID, maxAmountOfRequests int, cb func(error, *iscmove.RefWithObject[iscmove.Request])) {
216219
var lastSeen *iotago.ObjectID
220+
221+
pulledRequests := map[iotago.ObjectID]*iotajsonrpc.IotaObjectData{}
222+
sortedRequestIDs := make([]iotago.ObjectID, 0)
223+
224+
_ = (pulledRequests)
225+
_ = sortedRequestIDs
226+
217227
for {
218-
res, err := c.GetOwnedObjects(ctx, iotaclient.GetOwnedObjectsRequest{
228+
objs, err := c.GetOwnedObjects(ctx, iotaclient.GetOwnedObjectsRequest{
219229
Address: anchorAddress,
220230
Query: &iotajsonrpc.IotaObjectResponseQuery{
221231
Filter: &iotajsonrpc.IotaObjectDataFilter{
@@ -229,29 +239,44 @@ func (c *Client) GetRequestsWithCB(ctx context.Context, packageID iotago.Package
229239
},
230240
Cursor: lastSeen,
231241
})
242+
232243
if ctx.Err() != nil {
233244
cb(fmt.Errorf("failed to fetch requests: %w", err), nil)
234245
continue
235246
}
236-
if len(res.Data) == 0 {
237-
return nil
247+
248+
if objs == nil || len(objs.Data) == 0 {
249+
break
238250
}
239251

240-
lastSeen = res.NextCursor
241-
for _, reqData := range res.Data {
242-
if ctx.Err() != nil {
243-
return ctx.Err()
244-
}
252+
lastSeen = objs.NextCursor
245253

246-
fmt.Printf("Polling request id:%s, digest: %s, now:%s\n", reqData.Data.ObjectID, reqData.Data.Digest, time.Now().String())
247-
req, err := c.parseRequestAndFetchAssetsBag(reqData.Data)
248-
if err != nil {
249-
cb(fmt.Errorf("failed to decode requests: %w", err), nil)
250-
} else {
251-
cb(nil, req)
252-
}
254+
for _, req := range objs.Data {
255+
pulledRequests[*req.Data.ObjectID] = req.Data
253256
}
254257
}
258+
259+
// Sort object IDs
260+
objectKeys := maps.Keys(pulledRequests)
261+
sort.Slice(objectKeys, func(i, j int) bool {
262+
return bytes.Compare(objectKeys[i][:], objectKeys[j][:]) < 0
263+
})
264+
265+
if len(objectKeys) >= maxAmountOfRequests {
266+
sortedRequestIDs = objectKeys[:maxAmountOfRequests]
267+
} else {
268+
sortedRequestIDs = objectKeys
269+
}
270+
271+
// We only pass the maxRequests amount of requests into the result, to not overload the mempool.
272+
// Periodically, this function will be called by the Chain Manager to pick up the next amount of transactions.
273+
// This ensures that we don't suddenly load 20k of Requests into the mempool which fills up at a lower limit.
274+
// It improves the startup time of the node and makes sure that the Consensus gets the same IDs across all nodes in a sorted manner.
275+
276+
for _, reqID := range sortedRequestIDs {
277+
ref, err := c.parseRequestAndFetchAssetsBag(pulledRequests[reqID])
278+
cb(err, ref)
279+
}
255280
}
256281

257282
func (c *Client) GetRequests(

clients/iscmove/iscmoveclient/feed.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ func (f *ChainFeed) GetCurrentAnchor(ctx context.Context) (*iscmove.AnchorWithRe
5151

5252
// FetchCurrentState fetches the current Anchor and all Requests owned by the
5353
// anchor address.
54-
func (f *ChainFeed) FetchCurrentState(ctx context.Context, requestCb func(error, *iscmove.RefWithObject[iscmove.Request])) (*iscmove.AnchorWithRef, error) {
54+
func (f *ChainFeed) FetchCurrentState(ctx context.Context, maxAmountOfRequests int, requestCb func(error, *iscmove.RefWithObject[iscmove.Request])) (*iscmove.AnchorWithRef, error) {
5555
anchor, err := f.GetCurrentAnchor(ctx)
5656
if err != nil {
5757
return nil, err
5858
}
5959

60-
go f.wsClient.GetRequestsWithCB(ctx, f.iscPackageID, &f.anchorAddress, requestCb)
60+
f.wsClient.GetRequestsSorted(ctx, f.iscPackageID, &f.anchorAddress, maxAmountOfRequests, requestCb)
6161

62-
return anchor, nil
62+
return anchor, err
6363
}
6464

6565
// SubscribeToUpdates starts fetching updated versions of the Anchor and newly received requests in background.

components/nodeconn/component.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010

1111
"github.com/iotaledger/hive.go/app"
1212
"github.com/iotaledger/hive.go/app/shutdown"
13+
1314
"github.com/iotaledger/wasp/clients/iota-go/iotago"
15+
"github.com/iotaledger/wasp/components/chains"
1416
"github.com/iotaledger/wasp/packages/chain"
1517
"github.com/iotaledger/wasp/packages/daemon"
1618
"github.com/iotaledger/wasp/packages/nodeconn"
@@ -51,6 +53,7 @@ func provide(c *dig.Container) error {
5153
nodeConnection, err := nodeconn.New(
5254
Component.Daemon().ContextStopped(),
5355
*address,
56+
chains.ParamsChains.MempoolMaxOnledgerInPool,
5457
ParamsWS.WebsocketURL,
5558
Component.Logger().Named("nc"),
5659
deps.ShutdownHandler,

packages/nodeconn/chain.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,10 @@ func (ncc *ncChain) syncChainState(ctx context.Context) error {
201201
return err
202202
}
203203

204-
_, err = ncc.feed.FetchCurrentState(ctx, func(err error, req *iscmove.RefWithObject[iscmove.Request]) {
204+
anchor := isc.NewStateAnchor(moveAnchor, ncc.feed.GetISCPackageID())
205+
ncc.anchorHandler(&anchor)
206+
207+
go ncc.feed.FetchCurrentState(ctx, ncc.nodeConn.maxNumberOfRequests, func(err error, req *iscmove.RefWithObject[iscmove.Request]) {
205208
if err != nil {
206209
return
207210
}
@@ -210,14 +213,10 @@ func (ncc *ncChain) syncChainState(ctx context.Context) error {
210213
if err != nil {
211214
return
212215
}
216+
217+
ncc.LogInfof("Sending %s to request handler", req.ObjectID)
213218
ncc.requestHandler(onledgerReq)
214219
})
215-
if err != nil {
216-
return err
217-
}
218-
219-
anchor := isc.NewStateAnchor(moveAnchor, ncc.feed.GetISCPackageID())
220-
ncc.anchorHandler(&anchor)
221220

222221
ncc.LogInfof("Synchronizing chain state for %s... done", ncc.chainID)
223222
return nil

packages/nodeconn/nodeconn.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/iotaledger/hive.go/app/shutdown"
2121
"github.com/iotaledger/hive.go/ds/shrinkingmap"
2222
"github.com/iotaledger/hive.go/logger"
23+
2324
"github.com/iotaledger/wasp/clients/iota-go/iotago"
2425
"github.com/iotaledger/wasp/clients/iota-go/iotasigner"
2526
"github.com/iotaledger/wasp/clients/iscmove/iscmoveclient"
@@ -60,9 +61,10 @@ type nodeConnection struct {
6061
iscPackageID iotago.PackageID
6162
wsClient *iscmoveclient.Client
6263

63-
synced sync.WaitGroup
64-
chainsLock sync.RWMutex
65-
chainsMap *shrinkingmap.ShrinkingMap[isc.ChainID, *ncChain]
64+
maxNumberOfRequests int
65+
synced sync.WaitGroup
66+
chainsLock sync.RWMutex
67+
chainsMap *shrinkingmap.ShrinkingMap[isc.ChainID, *ncChain]
6668

6769
shutdownHandler *shutdown.ShutdownHandler
6870
}
@@ -72,6 +74,7 @@ var _ chain.NodeConnection = &nodeConnection{}
7274
func New(
7375
ctx context.Context,
7476
iscPackageID iotago.PackageID,
77+
maxNumberOfRequests int,
7578
wsURL string,
7679
log *logger.Logger,
7780
shutdownHandler *shutdown.ShutdownHandler,
@@ -81,9 +84,10 @@ func New(
8184
return nil, err
8285
}
8386
return &nodeConnection{
84-
WrappedLogger: logger.NewWrappedLogger(log),
85-
iscPackageID: iscPackageID,
86-
wsClient: wsClient,
87+
WrappedLogger: logger.NewWrappedLogger(log),
88+
iscPackageID: iscPackageID,
89+
wsClient: wsClient,
90+
maxNumberOfRequests: maxNumberOfRequests,
8791
chainsMap: shrinkingmap.New[isc.ChainID, *ncChain](
8892
shrinkingmap.WithShrinkingThresholdRatio(chainsCleanupThresholdRatio),
8993
shrinkingmap.WithShrinkingThresholdCount(chainsCleanupThresholdCount),

0 commit comments

Comments
 (0)