Skip to content

Commit c3718c0

Browse files
authored
Merge pull request #626 from lightninglabs/universe-cache
multi: add new caches for universe proofs, roots, keys, and config
2 parents 1e24b99 + 86e95aa commit c3718c0

26 files changed

+1635
-335
lines changed

config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/lightningnetwork/lnd"
1818
"github.com/lightningnetwork/lnd/build"
1919
"github.com/lightningnetwork/lnd/signal"
20+
"golang.org/x/time/rate"
2021
"google.golang.org/grpc"
2122
)
2223

@@ -129,6 +130,15 @@ type Config struct {
129130
// This applies to federation syncing as well as RPC insert and query.
130131
UniversePublicAccess bool
131132

133+
// UniverseQueriesPerSecond is the maximum number of queries per
134+
// second across the set of active universe queries that is permitted.
135+
// Anything above this starts to get rate limited.
136+
UniverseQueriesPerSecond rate.Limit
137+
138+
// UniverseQueriesBurst is the burst budget for the universe query rate
139+
// limiting.
140+
UniverseQueriesBurst int
141+
132142
Prometheus monitoring.PrometheusConfig
133143

134144
// LogWriter is the root logger that all of the daemon's subloggers are

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ require (
3939
golang.org/x/net v0.17.0
4040
golang.org/x/sync v0.2.0
4141
golang.org/x/term v0.13.0
42+
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
4243
google.golang.org/grpc v1.56.3
4344
google.golang.org/protobuf v1.30.0
4445
gopkg.in/macaroon-bakery.v2 v2.1.0
@@ -187,7 +188,6 @@ require (
187188
golang.org/x/mod v0.10.0 // indirect
188189
golang.org/x/sys v0.13.0 // indirect
189190
golang.org/x/text v0.13.0 // indirect
190-
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
191191
golang.org/x/tools v0.9.1 // indirect
192192
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
193193
gopkg.in/errgo.v1 v1.0.1 // indirect

itest/addrs_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func testAddressAssetSyncer(t *harnessTest) {
239239
require.NoError(t.t, err)
240240

241241
// Bob's Universe stats should show that he now has two assets.
242-
AssertUniverseStats(t.t, bob, 2, 0, 2, 1)
242+
AssertUniverseStats(t.t, bob, 2, 2, 1)
243243

244244
// Bob should not be able to make an address for a random asset ID
245245
// that he nor Alice are aware of.
@@ -344,7 +344,7 @@ func testAddressAssetSyncer(t *harnessTest) {
344344
},
345345
)
346346
require.NoError(t.t, err)
347-
AssertUniverseStats(t.t, bob, 0, 0, 0, 0)
347+
AssertUniverseStats(t.t, bob, 0, 0, 0)
348348
}
349349

350350
// If we restart Bob with the syncer disabled and no automatic sync
@@ -401,11 +401,11 @@ func testAddressAssetSyncer(t *harnessTest) {
401401

402402
// Bob's Universe stats should show that he has now synced both assets
403403
// from the second mint and the single asset group from that mint.
404-
AssertUniverseStats(t.t, bob, 2, 0, 2, 1)
404+
AssertUniverseStats(t.t, bob, 2, 2, 1)
405405

406406
// Alice's Universe stats should reflect the extra syncs from the asset
407407
// group lookups by Bob.
408-
AssertUniverseStats(t.t, t.tapd, 4, 4, 4, 2)
408+
AssertUniverseStats(t.t, t.tapd, 4, 4, 2)
409409

410410
// If Alice now mints a reissuance for the second asset group, Bob
411411
// should successfully sync that new asset.

itest/assertions.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,7 +1226,7 @@ func AssertUniverseKeysEqual(t *testing.T, uniIDs []*unirpc.ID,
12261226
}
12271227

12281228
func AssertUniverseStats(t *testing.T, client unirpc.UniverseClient,
1229-
numProofs, numSyncs, numAssets, numGroups int) {
1229+
numProofs, numAssets, numGroups int) {
12301230

12311231
err := wait.NoError(func() error {
12321232
uniStats, err := client.UniverseStats(
@@ -1240,10 +1240,6 @@ func AssertUniverseStats(t *testing.T, client unirpc.UniverseClient,
12401240
return fmt.Errorf("expected %v proofs, got %v",
12411241
numProofs, uniStats.NumTotalProofs)
12421242
}
1243-
if numSyncs != int(uniStats.NumTotalSyncs) {
1244-
return fmt.Errorf("expected %v syncs, got %v",
1245-
numSyncs, uniStats.NumTotalSyncs)
1246-
}
12471243
if numAssets != int(uniStats.NumTotalAssets) {
12481244
return fmt.Errorf("expected %v assets, got %v",
12491245
numAssets, uniStats.NumTotalAssets)

itest/universe_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ func testUniverseFederation(t *harnessTest) {
453453

454454
// Bob's Universe stats should show that he now has a single asset. We
455455
// should also be able to query for stats specifically for the asset.
456-
AssertUniverseStats(t.t, bob, 1, 0, 1, 0)
456+
AssertUniverseStats(t.t, bob, 1, 1, 0)
457457

458458
// Test the content of the universe info call.
459459
info, err := bob.Info(ctxt, &unirpc.InfoRequest{})
@@ -545,7 +545,7 @@ func testUniverseFederation(t *harnessTest) {
545545

546546
// Bob's stats should also now show that there're three total asset as
547547
// well as three proofs.
548-
AssertUniverseStats(t.t, bob, 3, 0, 3, 1)
548+
AssertUniverseStats(t.t, bob, 3, 3, 1)
549549

550550
// We should be able to find both the new assets in the set of universe
551551
// stats for an asset.

proof/proof.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,3 +446,29 @@ func (p *Proof) IsUnknownVersion() bool {
446446
return true
447447
}
448448
}
449+
450+
// SparseDecode can be used to decode a proof from a reader without decoding
451+
// and parsing the entire thing. This handles ignoring the magic bytes, and
452+
// will decode directly into the target records.
453+
func SparseDecode(r io.Reader, records ...tlv.Record) error {
454+
// The very first byte of the serialized proof is a magic byte, so
455+
// we'll read one byte to skip it.
456+
var magicBytes [PrefixMagicBytesLength]byte
457+
_, err := r.Read(magicBytes[:])
458+
if err != nil {
459+
return err
460+
}
461+
462+
if magicBytes != PrefixMagicBytes {
463+
return fmt.Errorf("invalid prefix magic bytes, expected %s, "+
464+
"got %s", string(PrefixMagicBytes[:]),
465+
string(magicBytes[:]))
466+
}
467+
468+
proofStream, err := tlv.NewStream(records...)
469+
if err != nil {
470+
return err
471+
}
472+
473+
return proofStream.Decode(r)
474+
}

rpcserver.go

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/lnrpc"
4747
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
4848
"github.com/lightningnetwork/lnd/signal"
49+
"golang.org/x/time/rate"
4950
"google.golang.org/grpc"
5051
)
5152

@@ -116,6 +117,8 @@ type rpcServer struct {
116117

117118
blockTimestampCache *lru.Cache[uint32, cacheableTimestamp]
118119

120+
proofQueryRateLimiter *rate.Limiter
121+
119122
quit chan struct{}
120123
wg sync.WaitGroup
121124
}
@@ -132,7 +135,10 @@ func newRPCServer(interceptor signal.Interceptor,
132135
maxNumBlocksInCache,
133136
),
134137
quit: make(chan struct{}),
135-
cfg: cfg,
138+
proofQueryRateLimiter: rate.NewLimiter(
139+
cfg.UniverseQueriesPerSecond, cfg.UniverseQueriesBurst,
140+
),
141+
cfg: cfg,
136142
}, nil
137143
}
138144

@@ -2897,6 +2903,12 @@ func marshalUniverseRoot(node universe.Root) (*unirpc.UniverseRoot, error) {
28972903
func (r *rpcServer) AssetRoots(ctx context.Context,
28982904
req *unirpc.AssetRootRequest) (*unirpc.AssetRootResponse, error) {
28992905

2906+
// Check the rate limiter to see if we need to wait at all. If not then
2907+
// this'll be a noop.
2908+
if err := r.proofQueryRateLimiter.Wait(ctx); err != nil {
2909+
return nil, err
2910+
}
2911+
29002912
// First, we'll retrieve the full set of known asset Universe roots.
29012913
assetRoots, err := r.cfg.UniverseArchive.RootNodes(
29022914
ctx, universe.RootNodesQuery{
@@ -3083,6 +3095,12 @@ func (r *rpcServer) QueryAssetRoots(ctx context.Context,
30833095
"given universe")
30843096
}
30853097

3098+
// Check the rate limiter to see if we need to wait at all. If not then
3099+
// this'll be a noop.
3100+
if err := r.proofQueryRateLimiter.Wait(ctx); err != nil {
3101+
return nil, err
3102+
}
3103+
30863104
// Query for both a issaunce and transfer universe root.
30873105
assetRoots, err := r.queryAssetProofRoots(ctx, universeID)
30883106
if err != nil {
@@ -3266,13 +3284,22 @@ func (r *rpcServer) AssetLeafKeys(ctx context.Context,
32663284
return nil, err
32673285
}
32683286

3269-
// TODO(roasbeef): tell above if was tring or not, then would set
3270-
// below diff
3287+
// If the proof type wasn't speciifed, then we'll return an error as we
3288+
// don't know which keys to actually fetch.
3289+
if universeID.ProofType == universe.ProofTypeUnspecified {
3290+
return nil, fmt.Errorf("proof type must be specified")
3291+
}
32713292

32723293
if req.Limit > universe.MaxPageSize || req.Limit < 0 {
32733294
return nil, fmt.Errorf("invalid request limit")
32743295
}
32753296

3297+
// Check the rate limiter to see if we need to wait at all. If not then
3298+
// this'll be a noop.
3299+
if err := r.proofQueryRateLimiter.Wait(ctx); err != nil {
3300+
return nil, err
3301+
}
3302+
32763303
leafKeys, err := r.cfg.UniverseArchive.UniverseLeafKeys(
32773304
ctx, universe.UniverseLeafKeysQuery{
32783305
Id: universeID,
@@ -3299,23 +3326,16 @@ func (r *rpcServer) AssetLeafKeys(ctx context.Context,
32993326
func marshalAssetLeaf(ctx context.Context, keys taprpc.KeyLookup,
33003327
assetLeaf *universe.Leaf) (*unirpc.AssetLeaf, error) {
33013328

3302-
// In order to display the full asset, we'll also encode the genesis
3303-
// proof.
3304-
var buf bytes.Buffer
3305-
if err := assetLeaf.Proof.Encode(&buf); err != nil {
3306-
return nil, err
3307-
}
3308-
33093329
rpcAsset, err := taprpc.MarshalAsset(
3310-
ctx, &assetLeaf.Proof.Asset, false, true, keys,
3330+
ctx, assetLeaf.Asset, false, true, keys,
33113331
)
33123332
if err != nil {
33133333
return nil, err
33143334
}
33153335

33163336
return &unirpc.AssetLeaf{
33173337
Asset: rpcAsset,
3318-
Proof: buf.Bytes(),
3338+
Proof: assetLeaf.RawProof,
33193339
}, nil
33203340
}
33213341

@@ -3339,6 +3359,12 @@ func (r *rpcServer) AssetLeaves(ctx context.Context,
33393359
return nil, err
33403360
}
33413361

3362+
// Check the rate limiter to see if we need to wait at all. If not then
3363+
// this'll be a noop.
3364+
if err := r.proofQueryRateLimiter.Wait(ctx); err != nil {
3365+
return nil, err
3366+
}
3367+
33423368
assetLeaves, err := r.cfg.UniverseArchive.MintingLeaves(ctx, universeID)
33433369
if err != nil {
33443370
return nil, err
@@ -3533,7 +3559,7 @@ func (r *rpcServer) QueryProof(ctx context.Context,
35333559
return nil, err
35343560
}
35353561

3536-
rpcsLog.Debugf("[QueryProof]: fetching proof at (universeID=%v, "+
3562+
rpcsLog.Tracef("[QueryProof]: fetching proof at (universeID=%v, "+
35373563
"leafKey=%x)", universeID, leafKey.UniverseKey())
35383564

35393565
// Retrieve proof export config for the given universe.
@@ -3576,6 +3602,12 @@ func (r *rpcServer) QueryProof(ctx context.Context,
35763602
"given universe")
35773603
}
35783604

3605+
// Check the rate limiter to see if we need to wait at all. If not then
3606+
// this'll be a noop.
3607+
if err := r.proofQueryRateLimiter.Wait(ctx); err != nil {
3608+
return nil, err
3609+
}
3610+
35793611
// Attempt to retrieve the proof given the candidate set of universe
35803612
// IDs.
35813613
var proofs []*universe.Proof
@@ -3610,7 +3642,7 @@ func (r *rpcServer) QueryProof(ctx context.Context,
36103642
// not be fully specified
36113643
proof := proofs[0]
36123644

3613-
rpcsLog.Debugf("[QueryProof]: found proof at (universeID=%v, "+
3645+
rpcsLog.Tracef("[QueryProof]: found proof at (universeID=%v, "+
36143646
"leafKey=%x)", universeID, leafKey.UniverseKey())
36153647

36163648
return r.marshalUniverseProofLeaf(ctx, req, proof)
@@ -3635,8 +3667,9 @@ func unmarshalAssetLeaf(leaf *unirpc.AssetLeaf) (*universe.Leaf, error) {
36353667
Genesis: assetProof.Asset.Genesis,
36363668
GroupKey: assetProof.Asset.GroupKey,
36373669
},
3638-
Proof: &assetProof,
3639-
Amt: assetProof.Asset.Amount,
3670+
RawProof: leaf.Proof,
3671+
Asset: &assetProof.Asset,
3672+
Amt: assetProof.Asset.Amount,
36403673
}, nil
36413674
}
36423675

@@ -3668,8 +3701,8 @@ func (r *rpcServer) InsertProof(ctx context.Context,
36683701
// If universe proof type unspecified, set based on the provided asset
36693702
// proof.
36703703
if universeID.ProofType == universe.ProofTypeUnspecified {
3671-
universeID.ProofType, err = universe.NewProofTypeFromAssetProof(
3672-
assetLeaf.Proof,
3704+
universeID.ProofType, err = universe.NewProofTypeFromAsset(
3705+
assetLeaf.Asset,
36733706
)
36743707
if err != nil {
36753708
return nil, err
@@ -3678,7 +3711,7 @@ func (r *rpcServer) InsertProof(ctx context.Context,
36783711

36793712
// Ensure that the new proof is of the correct type for the target
36803713
// universe.
3681-
err = universe.ValidateProofUniverseType(assetLeaf.Proof, universeID)
3714+
err = universe.ValidateProofUniverseType(assetLeaf.Asset, universeID)
36823715
if err != nil {
36833716
return nil, err
36843717
}
@@ -3694,6 +3727,12 @@ func (r *rpcServer) InsertProof(ctx context.Context,
36943727
"given universe")
36953728
}
36963729

3730+
// Check the rate limiter to see if we need to wait at all. If not then
3731+
// this'll be a noop.
3732+
if err := r.proofQueryRateLimiter.Wait(ctx); err != nil {
3733+
return nil, err
3734+
}
3735+
36973736
rpcsLog.Debugf("[InsertProof]: inserting proof at "+
36983737
"(universeID=%v, leafKey=%x)", universeID,
36993738
leafKey.UniverseKey())

tapcfg/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/lightningnetwork/lnd/signal"
3232
"github.com/lightningnetwork/lnd/tor"
3333
"golang.org/x/net/http2"
34+
"golang.org/x/time/rate"
3435
"google.golang.org/grpc"
3536
"google.golang.org/grpc/credentials"
3637
)
@@ -113,6 +114,16 @@ const (
113114
// defaultReOrgSafeDepth is the default number of confirmations we'll
114115
// wait for before considering a transaction safely buried in the chain.
115116
defaultReOrgSafeDepth = 6
117+
118+
// defaultUniverseMaxQps is the default maximum number of queries per
119+
// second for the universe server. This permis 100 queries per second
120+
// by default.
121+
defaultUniverseMaxQps = 100
122+
123+
// defaultUniverseQueriesBurst is the default burst budget for the
124+
// universe queries. By default we'll allow 100 qps, with a max burst
125+
// of 10 queries.
126+
defaultUniverseQueriesBurst = 10
116127
)
117128

118129
var (
@@ -251,6 +262,12 @@ type UniverseConfig struct {
251262
FederationServers []string `long:"federationserver" description:"The host:port of a Universe server peer with. These servers will be added as the default set of federation servers. Can be specified multiple times."`
252263

253264
PublicAccess bool `long:"public-access" description:"If true, and the Universe server is on a public interface, valid proof from remote parties will be accepted, and proofs will be queryable by remote parties. This applies to federation syncing as well as RPC insert and query."`
265+
266+
StatsCacheDuration time.Duration `long:"stats-cache-duration" description:"The amount of time to cache stats for before refreshing them."`
267+
268+
UniverseQueriesPerSecond rate.Limit `long:"max-qps" description:"The maximum number of queries per second across the set of active universe queries that is permitted. Anything above this starts to get rate limited."`
269+
270+
UniverseQueriesBurst int `long:"req-burst-budget" description:"The burst budget for the universe query rate limiting."`
254271
}
255272

256273
// AddressConfig is the config that houses any address Book related config
@@ -369,6 +386,10 @@ func DefaultConfig() Config {
369386
},
370387
Universe: &UniverseConfig{
371388
SyncInterval: defaultUniverseSyncInterval,
389+
UniverseQueriesPerSecond: rate.Limit(
390+
defaultUniverseMaxQps,
391+
),
392+
UniverseQueriesBurst: defaultUniverseQueriesBurst,
372393
},
373394
AddrBook: &AddrBookConfig{
374395
DisableSyncer: false,

0 commit comments

Comments
 (0)