Skip to content

Commit d09b3bf

Browse files
author
Robyn Ffrancon
authored
Merge pull request #687 from lightninglabs/balance-bug-import-proofs
Check for existing asset in db before attempting to insert a new asset.
2 parents c0c3ab7 + a51368f commit d09b3bf

File tree

10 files changed

+268
-17
lines changed

10 files changed

+268
-17
lines changed

itest/send_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,196 @@ func testBasicSendUnidirectional(t *harnessTest) {
135135
wg.Wait()
136136
}
137137

138+
// testRestartReceiver tests that the receiver node's asset balance after a
139+
// single asset transfer does not change if the receiver node restarts.
140+
// Before the addition of this test, after restarting the receiver node
141+
// the asset balance would be erroneously incremented. This is because the
142+
// receiver node was not storing asset transfer in its database with the
143+
// appropriate field uniqueness constraints.
144+
func testRestartReceiverCheckBalance(t *harnessTest) {
145+
var (
146+
ctxb = context.Background()
147+
wg sync.WaitGroup
148+
)
149+
150+
const (
151+
// Number of units to send.
152+
numUnits = 10
153+
)
154+
155+
// Subscribe to receive assent send events from primary tapd node.
156+
eventNtfns, err := t.tapd.SubscribeSendAssetEventNtfns(
157+
ctxb, &taprpc.SubscribeSendAssetEventNtfnsRequest{},
158+
)
159+
require.NoError(t.t, err)
160+
161+
// Test to ensure that we execute the transaction broadcast state.
162+
// This test is executed in a goroutine to ensure that we can receive
163+
// the event notification from the tapd node as the rest of the test
164+
// proceeds.
165+
wg.Add(1)
166+
go func() {
167+
defer wg.Done()
168+
169+
broadcastState := tapfreighter.SendStateBroadcast.String()
170+
targetEventSelector := func(event *taprpc.SendAssetEvent) bool {
171+
switch eventTyped := event.Event.(type) {
172+
case *taprpc.SendAssetEvent_ExecuteSendStateEvent:
173+
ev := eventTyped.ExecuteSendStateEvent
174+
175+
// Log send state execution.
176+
timestamp := time.UnixMicro(ev.Timestamp)
177+
t.Logf("Executing send state (%v): %v",
178+
timestamp.Format(time.RFC3339Nano),
179+
ev.SendState)
180+
181+
return ev.SendState == broadcastState
182+
}
183+
184+
return false
185+
}
186+
187+
timeout := 2 * defaultProofTransferReceiverAckTimeout
188+
ctx, cancel := context.WithTimeout(ctxb, timeout)
189+
defer cancel()
190+
assertAssetSendNtfsEvent(
191+
t, ctx, eventNtfns, targetEventSelector, 1,
192+
)
193+
}()
194+
195+
// First, we'll make a normal assets with enough units to allow us to
196+
// send it around a few times.
197+
rpcAssets := MintAssetsConfirmBatch(
198+
t.t, t.lndHarness.Miner.Client, t.tapd,
199+
[]*mintrpc.MintAssetRequest{issuableAssets[0]},
200+
)
201+
202+
genInfo := rpcAssets[0].AssetGenesis
203+
204+
// Now that we have the asset created, we'll make a new node that'll
205+
// serve as the node which'll receive the assets. The existing tapd
206+
// node will be used to synchronize universe state.
207+
//
208+
// We will stipulate that the receiver node's custodian service should
209+
// not delay commencing the proof retrieval procedure once a suitable
210+
// on-chain asset transfer is detected. This will ensure that on restart
211+
// the receiver node will attempt to immediately retrieve the asset
212+
// proof even if the proof and asset are present.
213+
custodianProofRetrievalDelay := 0 * time.Second
214+
215+
recvTapd := setupTapdHarness(
216+
t.t, t, t.lndHarness.Bob, t.universeServer,
217+
func(params *tapdHarnessParams) {
218+
params.startupSyncNode = t.tapd
219+
params.startupSyncNumAssets = len(rpcAssets)
220+
params.custodianProofRetrievalDelay = &custodianProofRetrievalDelay
221+
},
222+
)
223+
defer func() {
224+
require.NoError(t.t, recvTapd.stop(!*noDelete))
225+
}()
226+
227+
// Next, we'll attempt to complete two transfers with distinct
228+
// addresses from our main node to Bob.
229+
currentUnits := issuableAssets[0].Asset.Amount
230+
231+
// Issue a single address which will be reused for each send.
232+
bobAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
233+
AssetId: genInfo.AssetId,
234+
Amt: numUnits,
235+
AssetVersion: rpcAssets[0].Version,
236+
})
237+
require.NoError(t.t, err)
238+
239+
t.t.Logf("Performing send procedure")
240+
241+
// Deduct what we sent from the expected current number of
242+
// units.
243+
currentUnits -= numUnits
244+
245+
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], bobAddr)
246+
247+
sendResp := sendAssetsToAddr(t, t.tapd, bobAddr)
248+
249+
ConfirmAndAssertOutboundTransfer(
250+
t.t, t.lndHarness.Miner.Client, t.tapd, sendResp,
251+
genInfo.AssetId,
252+
[]uint64{currentUnits, numUnits}, 0, 1,
253+
)
254+
AssertNonInteractiveRecvComplete(t.t, recvTapd, 1)
255+
256+
// Close event stream.
257+
err = eventNtfns.CloseSend()
258+
require.NoError(t.t, err)
259+
260+
wg.Wait()
261+
262+
assertRecvBalance := func() {
263+
// Get asset balance by group from the receiver node.
264+
respGroup, err := recvTapd.ListBalances(
265+
ctxb, &taprpc.ListBalancesRequest{
266+
GroupBy: &taprpc.ListBalancesRequest_GroupKey{
267+
GroupKey: true,
268+
},
269+
},
270+
)
271+
require.NoError(t.t, err)
272+
273+
// We expect to see a single asset group balance. The receiver
274+
// node received one asset only.
275+
require.Len(t.t, respGroup.AssetGroupBalances, 1)
276+
277+
var assetGroupBalance *taprpc.AssetGroupBalance
278+
279+
for _, value := range respGroup.AssetGroupBalances {
280+
assetGroupBalance = value
281+
break
282+
}
283+
284+
require.Equal(t.t, int(10), int(assetGroupBalance.Balance))
285+
286+
// Get asset balance by asset ID from the receiver node.
287+
respAsset, err := recvTapd.ListBalances(
288+
ctxb, &taprpc.ListBalancesRequest{
289+
GroupBy: &taprpc.ListBalancesRequest_AssetId{
290+
AssetId: true,
291+
},
292+
},
293+
)
294+
require.NoError(t.t, err)
295+
296+
// We expect to see a single asset group balance. The receiver
297+
// node received one asset only.
298+
require.Len(t.t, respAsset.AssetBalances, 1)
299+
300+
var assetBalance *taprpc.AssetBalance
301+
302+
for _, value := range respAsset.AssetBalances {
303+
assetBalance = value
304+
break
305+
}
306+
307+
require.Equal(t.t, assetBalance.Balance, uint64(10))
308+
}
309+
310+
// Initial balance check.
311+
assertRecvBalance()
312+
313+
// Restart the receiver node and then check the balance again.
314+
require.NoError(t.t, recvTapd.stop(false))
315+
require.NoError(t.t, recvTapd.start(false))
316+
317+
assertRecvBalance()
318+
319+
// Restart the receiver node, mine some blocks, and then check the
320+
// balance again.
321+
require.NoError(t.t, recvTapd.stop(false))
322+
t.lndHarness.MineBlocks(7)
323+
require.NoError(t.t, recvTapd.start(false))
324+
325+
assertRecvBalance()
326+
}
327+
138328
// testResumePendingPackageSend tests that we can properly resume a pending
139329
// package send after a restart.
140330
func testResumePendingPackageSend(t *harnessTest) {

itest/tapd_harness.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,11 @@ type tapdConfig struct {
8181
}
8282

8383
type harnessOpts struct {
84-
proofSendBackoffCfg *proof.BackoffCfg
85-
proofReceiverAckTimeout *time.Duration
86-
proofCourier proof.CourierHarness
87-
addrAssetSyncerDisable bool
84+
proofSendBackoffCfg *proof.BackoffCfg
85+
proofReceiverAckTimeout *time.Duration
86+
proofCourier proof.CourierHarness
87+
custodianProofRetrievalDelay *time.Duration
88+
addrAssetSyncerDisable bool
8889
}
8990

9091
type harnessOption func(*harnessOpts)
@@ -223,6 +224,11 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig,
223224
finalCfg.HashMailCourier = nil
224225
}
225226

227+
// Set the custodian proof retrieval delay if it was specified.
228+
if opts.custodianProofRetrievalDelay != nil {
229+
finalCfg.CustodianProofRetrievalDelay = *opts.custodianProofRetrievalDelay
230+
}
231+
226232
return &tapdHarness{
227233
cfg: &cfg,
228234
clientCfg: finalCfg,

itest/test_harness.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,11 @@ type tapdHarnessParams struct {
325325
// an ack from the proof receiver.
326326
proofReceiverAckTimeout *time.Duration
327327

328+
// custodianProofRetrievalDelay is the time duration the custodian waits
329+
// having identified an asset transfer on-chain and before retrieving
330+
// the corresponding proof via the proof courier service.
331+
custodianProofRetrievalDelay *time.Duration
332+
328333
// addrAssetSyncerDisable is a flag that determines if the address book
329334
// will try and bootstrap unknown assets on address creation.
330335
addrAssetSyncerDisable bool
@@ -371,6 +376,7 @@ func setupTapdHarness(t *testing.T, ht *harnessTest,
371376
ho.proofSendBackoffCfg = params.proofSendBackoffCfg
372377
ho.proofReceiverAckTimeout = params.proofReceiverAckTimeout
373378
ho.proofCourier = selectedProofCourier
379+
ho.custodianProofRetrievalDelay = params.custodianProofRetrievalDelay
374380
ho.addrAssetSyncerDisable = params.addrAssetSyncerDisable
375381
}
376382

itest/test_list_on_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ var testCases = []*testCase{
5252
test: testBasicSendUnidirectional,
5353
proofCourierType: proof.UniverseRpcCourierType,
5454
},
55+
{
56+
name: "restart receiver check balance",
57+
test: testRestartReceiverCheckBalance,
58+
proofCourierType: proof.UniverseRpcCourierType,
59+
},
5560
{
5661
name: "resume pending package send",
5762
test: testResumePendingPackageSend,

tapcfg/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ const (
124124
// universe queries. By default we'll allow 100 qps, with a max burst
125125
// of 10 queries.
126126
defaultUniverseQueriesBurst = 10
127+
128+
// defaultProofRetrievalDelay is the default time duration the custodian
129+
// waits having identified an asset transfer on-chain and before
130+
// retrieving the corresponding proof via the proof courier service.
131+
defaultProofRetrievalDelay = 5 * time.Second
127132
)
128133

129134
var (
@@ -301,6 +306,8 @@ type Config struct {
301306
DefaultProofCourierAddr string `long:"proofcourieraddr" description:"Default proof courier service address."`
302307
HashMailCourier *proof.HashMailCourierCfg `group:"proofcourier" namespace:"hashmailcourier"`
303308

309+
CustodianProofRetrievalDelay time.Duration `long:"custodianproofretrievaldelay" description:"The number of seconds the custodian waits after identifying an asset transfer on-chain and before retrieving the corresponding proof."`
310+
304311
ChainConf *ChainConfig
305312
RpcConf *RpcConfig
306313

@@ -384,6 +391,7 @@ func DefaultConfig() Config {
384391
MaxBackoff: defaultProofTransferMaxBackoff,
385392
},
386393
},
394+
CustodianProofRetrievalDelay: defaultProofRetrievalDelay,
387395
Universe: &UniverseConfig{
388396
SyncInterval: defaultUniverseSyncInterval,
389397
UniverseQueriesPerSecond: rate.Limit(

tapcfg/server.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,13 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
362362
GroupVerifier: tapgarden.GenGroupVerifier(
363363
context.Background(), assetMintingStore,
364364
),
365-
AddrBook: addrBook,
366-
ProofArchive: proofArchive,
367-
ProofNotifier: assetStore,
368-
ErrChan: mainErrChan,
369-
ProofCourierCfg: proofCourierCfg,
370-
ProofWatcher: reOrgWatcher,
365+
AddrBook: addrBook,
366+
ProofArchive: proofArchive,
367+
ProofNotifier: assetStore,
368+
ErrChan: mainErrChan,
369+
ProofCourierCfg: proofCourierCfg,
370+
ProofRetrievalDelay: cfg.CustodianProofRetrievalDelay,
371+
ProofWatcher: reOrgWatcher,
371372
},
372373
),
373374
ChainBridge: chainBridge,

tapdb/assets_common.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ type UpsertAssetStore interface {
6666
UpsertAssetGroupKey(ctx context.Context, arg AssetGroupKey) (int64,
6767
error)
6868

69+
// QueryAssets fetches a filtered set of fully confirmed assets.
70+
QueryAssets(context.Context, QueryAssetFilters) ([]ConfirmedAsset,
71+
error)
72+
6973
// InsertNewAsset inserts a new asset on disk.
7074
InsertNewAsset(ctx context.Context,
7175
arg sqlc.InsertNewAssetParams) (int64, error)
@@ -200,6 +204,24 @@ func upsertAssetsWithGenesis(ctx context.Context, q UpsertAssetStore,
200204
anchorUtxoID = anchorUtxoIDs[idx]
201205
}
202206

207+
// Check for matching assets in the database. If we find one,
208+
// we'll just use its primary key ID, and we won't attempt to
209+
// insert the asset again.
210+
existingAssets, err := q.QueryAssets(ctx, QueryAssetFilters{
211+
AnchorUtxoID: anchorUtxoID,
212+
GenesisID: sqlInt64(genAssetID),
213+
ScriptKeyID: sqlInt64(scriptKeyID),
214+
})
215+
if err != nil {
216+
return 0, nil, fmt.Errorf("unable to query assets: %w",
217+
err)
218+
}
219+
220+
if len(existingAssets) > 0 {
221+
assetIDs[idx] = existingAssets[0].AssetPrimaryKey
222+
continue
223+
}
224+
203225
// With all the dependent data inserted, we can now insert the
204226
// base asset information itself.
205227
assetIDs[idx], err = q.InsertNewAsset(

tapdb/sqlc/assets.sql.go

Lines changed: 10 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tapdb/sqlc/queries/assets.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,10 @@ WHERE (
430430
assets.amount >= COALESCE(sqlc.narg('min_amt'), assets.amount) AND
431431
assets.spent = COALESCE(sqlc.narg('spent'), assets.spent) AND
432432
(key_group_info_view.tweaked_group_key = sqlc.narg('key_group_filter') OR
433-
sqlc.narg('key_group_filter') IS NULL)
433+
sqlc.narg('key_group_filter') IS NULL) AND
434+
assets.anchor_utxo_id = COALESCE(sqlc.narg('anchor_utxo_id'), assets.anchor_utxo_id) AND
435+
assets.genesis_id = COALESCE(sqlc.narg('genesis_id'), assets.genesis_id) AND
436+
assets.script_key_id = COALESCE(sqlc.narg('script_key_id'), assets.script_key_id)
434437
);
435438

436439
-- name: AllAssets :many

0 commit comments

Comments
 (0)