Skip to content

Commit 6b39ff8

Browse files
authored
Merge pull request #698 from lightninglabs/fed-envoy-push
Federation envoy re-attempts sync push for issuance proofs
2 parents 87da1d3 + a34ec30 commit 6b39ff8

20 files changed

+1981
-199
lines changed

itest/tapd_harness.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type harnessOpts struct {
100100
proofCourier proof.CourierHarness
101101
custodianProofRetrievalDelay *time.Duration
102102
addrAssetSyncerDisable bool
103+
104+
// fedSyncTickerInterval is the interval at which the federation envoy
105+
// sync ticker will fire.
106+
fedSyncTickerInterval *time.Duration
103107
}
104108

105109
type harnessOption func(*harnessOpts)
@@ -242,6 +246,10 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig,
242246
finalCfg.CustodianProofRetrievalDelay = *opts.custodianProofRetrievalDelay
243247
}
244248

249+
if opts.fedSyncTickerInterval != nil {
250+
finalCfg.Universe.SyncInterval = *opts.fedSyncTickerInterval
251+
}
252+
245253
return &tapdHarness{
246254
cfg: &cfg,
247255
clientCfg: finalCfg,

itest/test_harness.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,10 @@ type tapdHarnessParams struct {
366366
// synced from the above node.
367367
startupSyncNumAssets int
368368

369+
// fedSyncTickerInterval is the interval at which the federation envoy
370+
// sync ticker will fire.
371+
fedSyncTickerInterval *time.Duration
372+
369373
// noDefaultUniverseSync indicates whether the default universe server
370374
// should be added as a federation server or not.
371375
noDefaultUniverseSync bool
@@ -402,6 +406,7 @@ func setupTapdHarness(t *testing.T, ht *harnessTest,
402406
ho.proofCourier = selectedProofCourier
403407
ho.custodianProofRetrievalDelay = params.custodianProofRetrievalDelay
404408
ho.addrAssetSyncerDisable = params.addrAssetSyncerDisable
409+
ho.fedSyncTickerInterval = params.fedSyncTickerInterval
405410
}
406411

407412
tapdHarness, err := newTapdHarness(t, ht, tapdConfig{

itest/test_list_on_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ var testCases = []*testCase{
195195
name: "universe pagination simple",
196196
test: testUniversePaginationSimple,
197197
},
198+
{
199+
name: "mint proof repeat fed sync attempt",
200+
test: testMintProofRepeatFedSyncAttempt,
201+
},
198202
}
199203

200204
var optionalTestCases = []*testCase{

itest/universe_federation_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package itest
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
8+
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// testMintProofRepeatFedSyncAttempt tests that the minting node will retry
13+
// pushing the minting proofs to the federation server peer node, if the peer
14+
// node is offline at the time of the initial sync attempt.
15+
func testMintProofRepeatFedSyncAttempt(t *harnessTest) {
16+
// Create a new minting node, without hooking it up to any existing
17+
// Universe server. We will also set the sync ticker to 4 second, so
18+
// that we can test that the proof push sync is retried and eventually
19+
// succeeds after the fed server peer node reappears online.
20+
syncTickerInterval := 4 * time.Second
21+
mintingNode := setupTapdHarness(
22+
t.t, t, t.lndHarness.Bob, nil,
23+
func(params *tapdHarnessParams) {
24+
params.fedSyncTickerInterval = &syncTickerInterval
25+
params.noDefaultUniverseSync = true
26+
},
27+
)
28+
defer func() {
29+
require.NoError(t.t, mintingNode.stop(!*noDelete))
30+
}()
31+
32+
// We'll use the main node as our federation universe server
33+
// counterparty.
34+
fedServerNode := t.tapd
35+
36+
// Keep a reference to the fed server node RPC host address, so that we
37+
// can assert that it has not changed after the restart. This is
38+
// important, because the minting node will be retrying the proof push
39+
// to this address.
40+
fedServerNodeRpcHost := fedServerNode.rpcHost()
41+
42+
// Register the fedServerNode as a federation universe server with the
43+
// minting node.
44+
ctxb := context.Background()
45+
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
46+
defer cancel()
47+
48+
_, err := mintingNode.AddFederationServer(
49+
ctxt, &unirpc.AddFederationServerRequest{
50+
Servers: []*unirpc.UniverseFederationServer{
51+
{
52+
Host: fedServerNodeRpcHost,
53+
},
54+
},
55+
},
56+
)
57+
require.NoError(t.t, err)
58+
59+
// Assert that the fed server node has not seen any asset proofs.
60+
AssertUniverseStats(t.t, fedServerNode, 0, 0, 0)
61+
62+
// Stop the federation server peer node, so that it does not receive the
63+
// newly minted asset proofs immediately upon minting.
64+
t.Logf("Stopping fed server tapd node")
65+
require.NoError(t.t, fedServerNode.stop(false))
66+
67+
// Now that federation peer node is inactive, we'll mint some assets.
68+
t.Logf("Minting assets on minting node")
69+
rpcAssets := MintAssetsConfirmBatch(
70+
t.t, t.lndHarness.Miner.Client, mintingNode,
71+
[]*mintrpc.MintAssetRequest{
72+
simpleAssets[0], issuableAssets[0],
73+
},
74+
)
75+
require.Len(t.t, rpcAssets, 2)
76+
77+
t.lndHarness.MineBlocks(7)
78+
79+
// Wait for the minting node to attempt (and fail) to push the minting
80+
// proofs to the fed peer node. We wait some multiple of the sync ticker
81+
// interval to ensure that the minting node has had time to retry the
82+
// proof push sync.
83+
time.Sleep(syncTickerInterval * 2)
84+
85+
// Start the federation server peer node. The federation envoy component
86+
// of our minting node should currently be retrying the proof push sync
87+
// with the federation peer at each tick.
88+
t.Logf("Start (previously stopped) fed server tapd node")
89+
err = fedServerNode.start(false)
90+
require.NoError(t.t, err)
91+
92+
// Ensure that the federation server node RPC host address has not
93+
// changed after the restart. If it has, then the minting node will be
94+
// retrying the proof push to the wrong address.
95+
require.Equal(t.t, fedServerNodeRpcHost, fedServerNode.rpcHost())
96+
97+
t.Logf("Assert that fed peer node has seen the asset minting proofs")
98+
AssertUniverseStats(t.t, fedServerNode, 2, 2, 1)
99+
}

rpcserver.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3997,7 +3997,17 @@ func (r *rpcServer) DeleteFederationServer(ctx context.Context,
39973997

39983998
serversToDel := fn.Map(req.Servers, unmarshalUniverseServer)
39993999

4000-
err := r.cfg.FederationDB.RemoveServers(ctx, serversToDel...)
4000+
// Remove the servers from the proofs sync log. This is necessary before
4001+
// we can remove the servers from the database because of a foreign
4002+
// key constraint.
4003+
err := r.cfg.FederationDB.DeleteProofsSyncLogEntries(
4004+
ctx, serversToDel...,
4005+
)
4006+
if err != nil {
4007+
return nil, err
4008+
}
4009+
4010+
err = r.cfg.FederationDB.RemoveServers(ctx, serversToDel...)
40014011
if err != nil {
40024012
return nil, err
40034013
}

tapdb/assets_store_test.go

Lines changed: 11 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -246,105 +246,17 @@ func assertAssetEqual(t *testing.T, a, b *asset.Asset) {
246246
func TestImportAssetProof(t *testing.T) {
247247
t.Parallel()
248248

249-
// First, we'll create a new instance of the database.
250-
_, assetStore, db := newAssetStore(t)
251-
252-
// Next, we'll make a new random asset that also has a few inputs with
253-
// dummy witness information.
254-
testAsset := randAsset(t)
255-
256-
assetRoot, err := commitment.NewAssetCommitment(testAsset)
257-
require.NoError(t, err)
258-
259-
taprootAssetRoot, err := commitment.NewTapCommitment(assetRoot)
260-
require.NoError(t, err)
261-
262-
// With our asset created, we can now create the AnnotatedProof we use
263-
// to import assets into the database.
264-
var blockHash chainhash.Hash
265-
_, err = rand.Read(blockHash[:])
266-
require.NoError(t, err)
249+
var (
250+
ctxb = context.Background()
267251

268-
anchorTx := wire.NewMsgTx(2)
269-
anchorTx.AddTxIn(&wire.TxIn{})
270-
anchorTx.AddTxOut(&wire.TxOut{
271-
PkScript: bytes.Repeat([]byte{0x01}, 34),
272-
Value: 10,
273-
})
252+
dbHandle = NewDbHandle(t)
253+
assetStore = dbHandle.AssetStore
254+
)
274255

256+
// Add a random asset and corresponding proof into the database.
257+
testAsset, testProof := dbHandle.AddRandomAssetProof(t)
275258
assetID := testAsset.ID()
276-
anchorPoint := wire.OutPoint{
277-
Hash: anchorTx.TxHash(),
278-
Index: 0,
279-
}
280-
initialBlob := bytes.Repeat([]byte{0x0}, 100)
281-
updatedBlob := bytes.Repeat([]byte{0x77}, 100)
282-
testProof := &proof.AnnotatedProof{
283-
Locator: proof.Locator{
284-
AssetID: &assetID,
285-
ScriptKey: *testAsset.ScriptKey.PubKey,
286-
},
287-
Blob: initialBlob,
288-
AssetSnapshot: &proof.AssetSnapshot{
289-
Asset: testAsset,
290-
OutPoint: anchorPoint,
291-
AnchorBlockHash: blockHash,
292-
AnchorBlockHeight: test.RandInt[uint32](),
293-
AnchorTxIndex: test.RandInt[uint32](),
294-
AnchorTx: anchorTx,
295-
OutputIndex: 0,
296-
InternalKey: test.RandPubKey(t),
297-
ScriptRoot: taprootAssetRoot,
298-
},
299-
}
300-
if testAsset.GroupKey != nil {
301-
testProof.GroupKey = &testAsset.GroupKey.GroupPubKey
302-
}
303-
304-
// We'll now insert the internal key information as well as the script
305-
// key ahead of time to reflect the address creation that happens
306-
// elsewhere.
307-
ctxb := context.Background()
308-
_, err = db.UpsertInternalKey(ctxb, InternalKey{
309-
RawKey: testProof.InternalKey.SerializeCompressed(),
310-
KeyFamily: test.RandInt[int32](),
311-
KeyIndex: test.RandInt[int32](),
312-
})
313-
require.NoError(t, err)
314-
rawScriptKeyID, err := db.UpsertInternalKey(ctxb, InternalKey{
315-
RawKey: testAsset.ScriptKey.RawKey.PubKey.SerializeCompressed(),
316-
KeyFamily: int32(testAsset.ScriptKey.RawKey.Family),
317-
KeyIndex: int32(testAsset.ScriptKey.RawKey.Index),
318-
})
319-
require.NoError(t, err)
320-
_, err = db.UpsertScriptKey(ctxb, NewScriptKey{
321-
InternalKeyID: rawScriptKeyID,
322-
TweakedScriptKey: testAsset.ScriptKey.PubKey.SerializeCompressed(),
323-
Tweak: nil,
324-
})
325-
require.NoError(t, err)
326-
327-
// We'll add the chain transaction of the proof now to simulate a
328-
// batched transfer on a higher layer.
329-
var anchorTxBuf bytes.Buffer
330-
err = testProof.AnchorTx.Serialize(&anchorTxBuf)
331-
require.NoError(t, err)
332-
anchorTXID := testProof.AnchorTx.TxHash()
333-
_, err = db.UpsertChainTx(ctxb, ChainTxParams{
334-
Txid: anchorTXID[:],
335-
RawTx: anchorTxBuf.Bytes(),
336-
BlockHeight: sqlInt32(testProof.AnchorBlockHeight),
337-
BlockHash: testProof.AnchorBlockHash[:],
338-
TxIndex: sqlInt32(testProof.AnchorTxIndex),
339-
})
340-
require.NoError(t, err, "unable to insert chain tx: %w", err)
341-
342-
// With all our test data constructed, we'll now attempt to import the
343-
// asset into the database.
344-
require.NoError(t, assetStore.ImportProofs(
345-
ctxb, proof.MockHeaderVerifier, proof.MockGroupVerifier, false,
346-
testProof,
347-
))
259+
initialBlob := testProof.Blob
348260

349261
// We should now be able to retrieve the set of all assets inserted on
350262
// disk.
@@ -371,7 +283,7 @@ func TestImportAssetProof(t *testing.T) {
371283
ScriptKey: *testAsset.ScriptKey.PubKey,
372284
})
373285
require.NoError(t, err)
374-
require.Equal(t, initialBlob, []byte(currentBlob))
286+
require.Equal(t, initialBlob, currentBlob)
375287

376288
// We should also be able to fetch the created asset above based on
377289
// either the asset ID, or key group via the main coin selection
@@ -391,6 +303,8 @@ func TestImportAssetProof(t *testing.T) {
391303

392304
// We'll now attempt to overwrite the proof with one that has different
393305
// block information (simulating a re-org).
306+
updatedBlob := bytes.Repeat([]byte{0x77}, 100)
307+
394308
testProof.AnchorBlockHash = chainhash.Hash{12, 34, 56}
395309
testProof.AnchorBlockHeight = 1234
396310
testProof.AnchorTxIndex = 5678
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
DROP INDEX IF EXISTS federation_proof_sync_log_unique_index_proof_leaf_id_servers_id;
2+
DROP TABLE IF EXISTS federation_proof_sync_log;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-- This table stores the log of federation universe proof sync attempts. Rows
2+
-- in this table are specific to a given proof leaf, server, and sync direction.
3+
CREATE TABLE IF NOT EXISTS federation_proof_sync_log (
4+
id BIGINT PRIMARY KEY,
5+
6+
-- The status of the proof sync attempt.
7+
status TEXT NOT NULL CHECK(status IN ('pending', 'complete')),
8+
9+
-- The timestamp of when the log entry for the associated proof was last
10+
-- updated.
11+
timestamp TIMESTAMP NOT NULL,
12+
13+
-- The number of attempts that have been made to sync the proof.
14+
attempt_counter BIGINT NOT NULL DEFAULT 0,
15+
16+
-- The direction of the proof sync attempt.
17+
sync_direction TEXT NOT NULL CHECK(sync_direction IN ('push', 'pull')),
18+
19+
-- The ID of the subject proof leaf.
20+
proof_leaf_id BIGINT NOT NULL REFERENCES universe_leaves(id),
21+
22+
-- The ID of the universe that the proof leaf belongs to.
23+
universe_root_id BIGINT NOT NULL REFERENCES universe_roots(id),
24+
25+
-- The ID of the server that the proof will be/was synced to.
26+
servers_id BIGINT NOT NULL REFERENCES universe_servers(id)
27+
);
28+
29+
-- Create a unique index on table federation_proof_sync_log
30+
CREATE UNIQUE INDEX federation_proof_sync_log_unique_index_proof_leaf_id_servers_id
31+
ON federation_proof_sync_log (
32+
sync_direction,
33+
proof_leaf_id,
34+
universe_root_id,
35+
servers_id
36+
);

tapdb/sqlc/models.go

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)