diff --git a/payments/db/migration1/migration_validation.go b/payments/db/migration1/migration_validation.go index f20c9278361..c81303e5432 100644 --- a/payments/db/migration1/migration_validation.go +++ b/payments/db/migration1/migration_validation.go @@ -79,6 +79,20 @@ func validateMigratedPaymentBatch(ctx context.Context, hash[:8], err) } + if kvPayment.Status == StatusInFlight { + // Mirror the migration's legacy terminalization + // before comparing KV with SQL. + // + //nolint:ll + _, err = terminalizeUnresolvedLegacyZeroAttempts( + kvPayment, + ) + if err != nil { + return fmt.Errorf("normalize KV "+ + "payment %x: %w", hash[:8], err) + } + } + err = structuralCompare(kvPayment, row) if err != nil { // On structural mismatch, perform a deep @@ -191,6 +205,16 @@ func deepComparePayment(ctx context.Context, cfg *SQLStoreConfig, paymentHash[:8], err) } + if kvPayment.Status == StatusInFlight { + // Mirror the migration's legacy terminalization before + // comparing KV with SQL. + _, err = terminalizeUnresolvedLegacyZeroAttempts(kvPayment) + if err != nil { + return fmt.Errorf("normalize KV payment %x: %w", + paymentHash[:8], err) + } + } + normalizeLegacyZeroAttemptIDsForCompare(kvPayment, sqlPayment) normalizePaymentForCompare(kvPayment) normalizePaymentForCompare(sqlPayment) @@ -221,6 +245,60 @@ func deepComparePayment(ctx context.Context, cfg *SQLStoreConfig, return nil } +// normalizeLegacyZeroAttemptIDsForCompare aligns expected legacy attempt ID +// remaps before deep comparison. +// +// Legacy KV payments can use attempt ID zero to represent an unknown ID. During +// migration those attempts are assigned synthetic SQL attempt indexes, while +// the source KV payment is left unchanged. Match by session key and copy the +// SQL attempt ID into the KV object so fallback deep comparison still reports +// real data mismatches instead of this expected migration repair. +func normalizeLegacyZeroAttemptIDsForCompare(kvPayment, + sqlPayment *MPPayment) { + + if kvPayment == nil || sqlPayment == nil { + return + } + + sqlAttemptsBySessionKey := make(map[string]uint64) + for i := range sqlPayment.HTLCs { + htlc := &sqlPayment.HTLCs[i] + sessionKey := htlc.SessionKey() + if sessionKey == nil { + // Leave normalization incomplete so the deep comparison + // reports the malformed attempt instead of panicking. + continue + } + + sessionKeyBytes := sessionKey.Serialize() + sessionKeyStr := string(sessionKeyBytes) + sqlAttemptsBySessionKey[sessionKeyStr] = htlc.AttemptID + } + + for i := range kvPayment.HTLCs { + htlc := &kvPayment.HTLCs[i] + if htlc.AttemptID != 0 { + continue + } + + sessionKey := htlc.SessionKey() + if sessionKey == nil { + // Leave normalization incomplete so the deep comparison + // reports the malformed attempt instead of panicking. + continue + } + + sessionKeyBytes := sessionKey.Serialize() + sessionKeyStr := string(sessionKeyBytes) + attemptID, ok := sqlAttemptsBySessionKey[sessionKeyStr] + if !ok { + continue + } + + htlc.AttemptID = attemptID + } +} + // normalizePaymentForCompare normalizes fields that are expected to differ // between KV and SQL representations before deep comparison. func normalizePaymentForCompare(payment *MPPayment) { diff --git a/payments/db/migration1/sql_migration.go b/payments/db/migration1/sql_migration.go index 4a751d439be..a353766506f 100644 --- a/payments/db/migration1/sql_migration.go +++ b/payments/db/migration1/sql_migration.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "fmt" + "math" "strconv" "time" @@ -21,6 +22,13 @@ const ( defaultRateWindowDuration = 300 * time.Second ) +var ( + // switchNextPaymentIDKey is the switch sequencer bucket key. This is + // intentionally kept in sync with htlcswitch.nextPaymentIDKey without + // importing htlcswitch into the migration package. + switchNextPaymentIDKey = []byte("next-payment-id-key") +) + // MigrationStats tracks migration progress. type MigrationStats struct { TotalPayments int64 @@ -130,33 +138,18 @@ func MigratePaymentsKVToSQL(ctx context.Context, kvBackend kvdb.Backend, var ( validationBatch []migratedPaymentRef - // indexedPayments is the total number of entries in the index - // bucket. This is an upper bound on the number of payments to - // migrate since the index also includes duplicate payment - // entries (which are migrated as sub-entries of the primary - // payment rather than as top-level payments). - indexedPayments int64 - reportInterval = rate.Sometimes{Interval: 5 * time.Second} ) - // Count entries in the index bucket upfront for approximate progress - // reporting. This is a cheap read-only scan with no deserialization. - err := kvBackend.View(func(kvTx kvdb.RTx) error { - indexes := kvTx.ReadBucket(paymentsIndexBucket) - if indexes == nil { - return nil - } - - return indexes.ForEach(func(_, _ []byte) error { - indexedPayments++ - return nil - }) - }, func() { indexedPayments = 0 }) + indexedPayments, nextSwitchPaymentID, err := collectMigrationState( + kvBackend, + ) if err != nil { - return fmt.Errorf("count payments: %w", err) + return fmt.Errorf("collect payment migration state: %w", err) } + attemptIDAllocator := newAttemptIDAllocator(nextSwitchPaymentID) + log.Infof("Found ~%d index entries to migrate (includes duplicates)", indexedPayments) @@ -196,9 +189,9 @@ func MigratePaymentsKVToSQL(ctx context.Context, kvBackend kvdb.Backend, reportInterval.Do(reporter.report) return migrateIndexEntry( - ctx, seqKey, indexVal, - paymentsBucket, kvBackend, - sqlDB, cfg, stats, &validationBatch, + ctx, seqKey, indexVal, paymentsBucket, + kvBackend, sqlDB, cfg, stats, &validationBatch, + attemptIDAllocator, ) }) }, func() {}) @@ -223,6 +216,12 @@ func MigratePaymentsKVToSQL(ctx context.Context, kvBackend kvdb.Backend, return err } + if err := advanceSwitchPaymentIDSequence( + kvBackend, attemptIDAllocator.nextID, + ); err != nil { + return fmt.Errorf("advance switch payment ID sequence: %w", err) + } + stats.MigrationDuration = time.Since(startTime) printMigrationSummary(stats) @@ -244,13 +243,161 @@ func normalizeTimeForSQL(t time.Time) time.Time { return time.Unix(0, t.UnixNano()).UTC() } +// collectMigrationState scans the payment index once to gather progress +// information and reads the switch sequencer horizon used for legacy attempt +// ID allocation. +func collectMigrationState(kvBackend kvdb.Backend) (int64, uint64, error) { + var ( + indexedPayments int64 + nextSwitchPaymentID uint64 + ) + err := kvBackend.View(func(kvTx kvdb.RTx) error { + // Read the switch sequencer horizon that legacy zero-ID + // attempts will allocate from if needed. + seqBucket := kvTx.ReadBucket(switchNextPaymentIDKey) + if seqBucket != nil { + nextSwitchPaymentID = seqBucket.Sequence() + } + + // If there are no payments, there is nothing to count or + // migrate. + paymentsBucket := kvTx.ReadBucket(paymentsRootBucket) + if paymentsBucket == nil { + log.Infof("No payments bucket found - database is " + + "empty") + + return nil + } + + // Count index entries for approximate progress reporting. The + // main migration still streams over this index in order. + indexes := kvTx.ReadBucket(paymentsIndexBucket) + if indexes == nil { + return fmt.Errorf("index bucket does not exist") + } + + return indexes.ForEach(func(_, _ []byte) error { + indexedPayments++ + return nil + }) + }, func() { + indexedPayments = 0 + nextSwitchPaymentID = 0 + }) + if err != nil { + return 0, 0, err + } + + return indexedPayments, nextSwitchPaymentID, nil +} + +// attemptIDAllocator tracks the next switch payment ID that is safe to hand +// out after migration. +type attemptIDAllocator struct { + // nextID is the in-memory counter used for the next synthetic attempt + // ID and the final switch sequencer horizon to persist. + nextID uint64 +} + +// newAttemptIDAllocator creates a new attempt ID allocator. +// +// The SQL schema requires payment_htlc_attempts.attempt_index to be globally +// unique because attempt-related rows use it as their stable identifier. Very +// old KV payments can contain attempt ID zero, which represented an unknown +// legacy value and cannot be preserved in SQL without colliding with every +// other such legacy attempt. +// +// Non-zero attempt IDs were allocated from the switch sequencer. The sequencer +// persists a horizon: it reads Sequence() as the next ID to hand out, then +// writes a higher value before returning that ID. This means the value stored +// in switchNextPaymentIDKey is already beyond all IDs it handed out. We +// therefore allocate replacement IDs for legacy zero attempts from that horizon +// and persist the final next-unused value once migration succeeds. These old +// attempts may receive high attempt_index values, which means that within a +// payment that mixes remapped and non-remapped attempts the remapped ones will +// sort after the originals when SQL queries order by attempt_index. This is +// acceptable because it only affects very old payments whose attempt IDs were +// already unknown, and intra-payment attempt ordering is not a load-bearing +// user-visible invariant; uniqueness and future non-collision with the switch +// sequencer are the actual invariants we need to preserve. +func newAttemptIDAllocator(nextSwitchPaymentID uint64) *attemptIDAllocator { + return &attemptIDAllocator{ + nextID: nextSwitchPaymentID, + } +} + +// allocateLegacyAttemptID returns a new unique attempt ID for a legacy payment. +// It uses the in-memory counter initialized from the switch payment ID +// sequencer horizon. +func (a *attemptIDAllocator) allocateLegacyAttemptID() (uint64, error) { + // The runtime switch sequencer never hands out ID zero: when its + // persisted sequence is zero, it starts by issuing ID one. Mirror + // that behavior for legacy attempts on otherwise idle nodes whose + // switch sequencer bucket has not allocated a batch yet. + if a.nextID == 0 { + a.nextID = 1 + } + + if a.nextID == ^uint64(0) { + return 0, fmt.Errorf("cannot allocate legacy attempt ID: "+ + "switch payment ID sequence is %d", a.nextID) + } + + attemptID := a.nextID + a.nextID++ + + return attemptID, nil +} + +// advanceSwitchPaymentIDSequence makes sure the switch sequencer cannot later +// hand out an ID that was already present in a migrated payment attempt. +func advanceSwitchPaymentIDSequence(kvBackend kvdb.Backend, + nextID uint64) error { + + return kvdb.Update(kvBackend, func(tx kvdb.RwTx) error { + seqBucket := tx.ReadWriteBucket(switchNextPaymentIDKey) + if seqBucket == nil { + if nextID <= 1 { + return nil + } + + var err error + seqBucket, err = tx.CreateTopLevelBucket( + switchNextPaymentIDKey, + ) + if err != nil { + return err + } + } + + currentSeq := seqBucket.Sequence() + if currentSeq == nextID { + // No synthetic IDs were allocated, so the sequencer is + // already at the migration cursor. + return nil + } + if currentSeq > nextID { + // Migration runs exclusively, so the sequencer should + // not move beyond the cursor computed by migration. + return fmt.Errorf("switch payment ID sequence above "+ + "migration horizon: current=%d, expected=%d", + currentSeq, nextID) + } + + // Synthetic IDs were allocated, so advance the sequencer to + // the final next-unused ID. + return seqBucket.SetSequence(nextID) + }, func() {}) +} + // migrateIndexEntry processes a single entry from the payments index bucket, // migrating the corresponding payment to SQL and appending it to the // validation batch. func migrateIndexEntry(ctx context.Context, seqKey, indexVal []byte, paymentsBucket kvdb.RBucket, kvBackend kvdb.Backend, - sqlDB SQLMigrationQueries, cfg *SQLStoreConfig, - stats *MigrationStats, validationBatch *[]migratedPaymentRef) error { + sqlDB SQLMigrationQueries, cfg *SQLStoreConfig, stats *MigrationStats, + validationBatch *[]migratedPaymentRef, + attemptIDAllocator *attemptIDAllocator) error { r := bytes.NewReader(indexVal) paymentHash, err := deserializePaymentIndex(r) @@ -290,7 +437,7 @@ func migrateIndexEntry(ctx context.Context, seqKey, indexVal []byte, // Migrate the payment to the SQL database. paymentID, err := migratePayment( - ctx, payment, paymentHash, sqlDB, stats, + ctx, payment, paymentHash, sqlDB, stats, attemptIDAllocator, ) if err != nil { return fmt.Errorf("migrate payment %x: %w", paymentHash[:8], @@ -330,7 +477,24 @@ func migrateIndexEntry(ctx context.Context, seqKey, indexVal []byte, // migratePayment migrates a single payment from KV to SQL. func migratePayment(ctx context.Context, payment *MPPayment, hash lntypes.Hash, - sqlDB SQLMigrationQueries, stats *MigrationStats) (int64, error) { + sqlDB SQLMigrationQueries, stats *MigrationStats, + attemptIDAllocator *attemptIDAllocator) (int64, error) { + + if payment.Status == StatusInFlight { + terminalizedLegacyAttempts, err := + terminalizeUnresolvedLegacyZeroAttempts(payment) + if err != nil { + return 0, err + } + if terminalizedLegacyAttempts > 0 { + log.Warnf("Terminalized %d unresolved legacy HTLC "+ + "attempt(s) with unknown attempt ID zero for "+ + "payment %x; the parent payment was failed if "+ + "no other settled or in-flight HTLC kept "+ + "it active", terminalizedLegacyAttempts, + hash[:8]) + } + } // Update migration stats based on payment status. switch payment.Status { @@ -405,6 +569,7 @@ func migratePayment(ctx context.Context, payment *MPPayment, hash lntypes.Hash, for _, htlc := range payment.HTLCs { err = migrateHTLCAttempt( ctx, paymentID, hash, &htlc, sqlDB, stats, + attemptIDAllocator, ) if err != nil { return 0, fmt.Errorf("migrate attempt %d: %w", @@ -417,10 +582,64 @@ func migratePayment(ctx context.Context, payment *MPPayment, hash lntypes.Hash, return paymentID, nil } +// terminalizeUnresolvedLegacyZeroAttempts marks unresolved legacy zero-ID HTLC +// attempts failed and fails the parent payment if no other resolved or +// recoverable in-flight HTLC keeps it active. +// +// Attempt ID zero was written by an old KV migration as an unknown legacy +// value. If such an attempt has no settle/fail resolution, it cannot be safely +// resumed after SQL migration because the live switch state would not know the +// synthetic attempt ID assigned below. +// +// Callers only need this for in-flight payments: any unresolved HTLC makes the +// parent payment in-flight, while terminal historical payments can use the +// regular zero-ID remap path. +func terminalizeUnresolvedLegacyZeroAttempts(payment *MPPayment) (int, error) { + var ( + terminalizedLegacyAttempts int + hasSettled bool + hasNonZeroInFlight bool + ) + + for i := range payment.HTLCs { + htlc := &payment.HTLCs[i] + switch { + case htlc.Settle != nil: + hasSettled = true + + case htlc.Failure != nil: + + case htlc.AttemptID == 0: + htlc.Failure = &HTLCFailInfo{ + Reason: HTLCFailUnknown, + } + terminalizedLegacyAttempts++ + + default: + hasNonZeroInFlight = true + } + } + + if terminalizedLegacyAttempts == 0 { + return 0, nil + } + + if !hasSettled && !hasNonZeroInFlight && payment.FailureReason == nil { + reason := FailureReasonError + payment.FailureReason = &reason + } + + if err := payment.setState(); err != nil { + return 0, err + } + + return terminalizedLegacyAttempts, nil +} + // migrateHTLCAttempt migrates a single HTLC attempt. func migrateHTLCAttempt(ctx context.Context, paymentID int64, parentPaymentHash lntypes.Hash, htlc *HTLCAttempt, sqlDB SQLQueries, - stats *MigrationStats) error { + stats *MigrationStats, attemptIDAllocator *attemptIDAllocator) error { // Determine the payment hash for this HTLC attempt. // @@ -443,13 +662,41 @@ func migrateHTLCAttempt(ctx context.Context, paymentID int64, firstHopAmountMsat := int64(htlc.Route.FirstHopAmount.Val.Int()) - // Get the session key bytes. - sessionKeyBytes := htlc.SessionKey().Serialize() + sessionKey := htlc.SessionKey() + if sessionKey == nil { + return fmt.Errorf("HTLC attempt %d for payment %x is "+ + "missing session key", htlc.AttemptID, + parentPaymentHash[:8]) + } + + sessionKeyBytes := sessionKey.Serialize() + + attemptID := htlc.AttemptID + if attemptID == 0 { + var err error + attemptID, err = attemptIDAllocator.allocateLegacyAttemptID() + if err != nil { + return fmt.Errorf("allocate legacy attempt ID: %w", err) + } + + log.Warnf("Allocated HTLC attempt index %d from switch "+ + "sequencer for legacy payment %x with unknown "+ + "attempt ID", attemptID, + parentPaymentHash[:8]) + } + + if attemptID > math.MaxInt64 { + return fmt.Errorf("unable to convert HTLC attempt ID to "+ + "SQL attempt index: attempt_id=%d payment=%x max=%d", + attemptID, parentPaymentHash[:8], uint64(math.MaxInt64)) + } + + attemptIndex := int64(attemptID) // Insert HTLC attempt. _, err := sqlDB.InsertHtlcAttempt(ctx, sqlc.InsertHtlcAttemptParams{ PaymentID: paymentID, - AttemptIndex: int64(htlc.AttemptID), + AttemptIndex: attemptIndex, SessionKey: sessionKeyBytes, AttemptTime: normalizeTimeForSQL(htlc.AttemptTime), PaymentHash: paymentHash, @@ -459,7 +706,13 @@ func migrateHTLCAttempt(ctx context.Context, paymentID int64, RouteSourceKey: htlc.Route.SourcePubKey[:], }) if err != nil { - return fmt.Errorf("insert HTLC attempt: %w", err) + // SQL unique constraint errors do not include the conflicting + // value. Include the attempted index so failures point directly + // at the problematic legacy attempt. + return fmt.Errorf("unable to insert HTLC attempt: "+ + "index=%d payment=%x original_attempt_id=%d: %w", + attemptIndex, parentPaymentHash[:8], + htlc.AttemptID, err) } // Insert the route-level first hop custom records. @@ -467,7 +720,7 @@ func migrateHTLCAttempt(ctx context.Context, paymentID int64, err = sqlDB.InsertPaymentAttemptFirstHopCustomRecord( ctx, sqlc.InsertPaymentAttemptFirstHopCustomRecordParams{ - HtlcAttemptIndex: int64(htlc.AttemptID), + HtlcAttemptIndex: attemptIndex, Key: int64(key), Value: value, }, @@ -482,7 +735,7 @@ func migrateHTLCAttempt(ctx context.Context, paymentID int64, for hopIndex := range htlc.Route.Hops { hop := htlc.Route.Hops[hopIndex] err = migrateRouteHop( - ctx, int64(htlc.AttemptID), hopIndex, hop, + ctx, attemptIndex, hopIndex, hop, sqlDB, stats, ) if err != nil { @@ -495,7 +748,7 @@ func migrateHTLCAttempt(ctx context.Context, paymentID int64, case htlc.Settle != nil: // Settled err = sqlDB.SettleAttempt(ctx, sqlc.SettleAttemptParams{ - AttemptIndex: int64(htlc.AttemptID), + AttemptIndex: attemptIndex, ResolutionTime: normalizeTimeForSQL( htlc.Settle.SettleTime, ), @@ -521,7 +774,7 @@ func migrateHTLCAttempt(ctx context.Context, paymentID int64, } err = sqlDB.FailAttempt(ctx, sqlc.FailAttemptParams{ - AttemptIndex: int64(htlc.AttemptID), + AttemptIndex: attemptIndex, ResolutionTime: normalizeTimeForSQL( htlc.Failure.FailTime, ), diff --git a/payments/db/migration1/sql_migration_test.go b/payments/db/migration1/sql_migration_test.go index ab0ee6cc1bf..05b4d12f4a4 100644 --- a/payments/db/migration1/sql_migration_test.go +++ b/payments/db/migration1/sql_migration_test.go @@ -454,6 +454,183 @@ func TestMigrationWithDuplicates(t *testing.T) { require.Empty(t, duplicates[1].SettlePreimage) } +// TestMigrationWithLegacyZeroAttemptIDs verifies that very old payments whose +// HTLC attempt ID was migrated as the legacy "unknown" value zero do not +// collide in the SQL attempt index. +func TestMigrationWithLegacyZeroAttemptIDs(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + switchSequence uint64 + expectedAttempts []uint64 + expectedSequence uint64 + }{ + { + name: "zero switch sequence", + expectedAttempts: []uint64{1, 2}, + expectedSequence: 3, + }, + { + name: "advanced switch sequence", + switchSequence: 10, + expectedAttempts: []uint64{10, 11}, + expectedSequence: 12, + }, + } + + for _, testCase := range testCases { + testCase := testCase + t.Run(testCase.name, func(t *testing.T) { + ctx := context.Background() + kvDB := setupTestKVDB(t) + + hash1 := createTestPaymentHash(t, 10) + hash2 := createTestPaymentHash(t, 11) + + err := kvdb.Update(kvDB, func(tx kvdb.RwTx) error { + paymentsBucket, err := tx.CreateTopLevelBucket( + paymentsRootBucket, + ) + if err != nil { + return err + } + + indexBucket, err := tx.CreateTopLevelBucket( + paymentsIndexBucket, + ) + if err != nil { + return err + } + + seqBucket, err := tx.CreateTopLevelBucket( + switchNextPaymentIDKey, + ) + if err != nil { + return err + } + if testCase.switchSequence > 0 { + err := seqBucket.SetSequence( + testCase.switchSequence, + ) + if err != nil { + return err + } + } + + if err := createTestPaymentInKVWithAttemptID( + t, paymentsBucket, indexBucket, 3, hash1, 0, + ); err != nil { + return err + } + + return createTestPaymentInKVWithAttemptID( + t, paymentsBucket, indexBucket, 6, hash2, 0, + ) + }, func() {}) + require.NoError(t, err) + + sqlStore := setupTestSQLDB(t) + err = runPaymentsMigration(ctx, kvDB, sqlStore) + require.NoError(t, err) + + payment1, err := sqlStore.FetchPayment(ctx, hash1) + require.NoError(t, err) + require.Len(t, payment1.HTLCs, 1) + + payment2, err := sqlStore.FetchPayment(ctx, hash2) + require.NoError(t, err) + require.Len(t, payment2.HTLCs, 1) + + attempt1 := payment1.HTLCs[0].AttemptID + attempt2 := payment2.HTLCs[0].AttemptID + require.NotZero(t, attempt1) + require.NotZero(t, attempt2) + require.Equal(t, testCase.expectedAttempts[0], attempt1) + require.Equal(t, testCase.expectedAttempts[1], attempt2) + require.NotEqual(t, attempt1, attempt2) + + var seq uint64 + err = kvdb.View(kvDB, func(tx kvdb.RTx) error { + seqBucket := tx.ReadBucket(switchNextPaymentIDKey) + require.NotNil(t, seqBucket) + seq = seqBucket.Sequence() + + return nil + }, func() {}) + require.NoError(t, err) + + maxAttemptID := attempt1 + if attempt2 > maxAttemptID { + maxAttemptID = attempt2 + } + require.Greater(t, seq, maxAttemptID) + require.Equal(t, testCase.expectedSequence, seq) + + for _, kvPayment := range fetchAllPaymentsFromKV(t, kvDB) { + comparePaymentData(t, ctx, sqlStore, kvPayment) + } + }) + } +} + +// TestMigrationWithUnresolvedLegacyZeroAttemptID verifies that the migration +// fails unresolved legacy zero-ID HTLC attempts instead of remapping them into +// resumable synthetic attempts. +func TestMigrationWithUnresolvedLegacyZeroAttemptID(t *testing.T) { + t.Parallel() + + ctx := context.Background() + kvDB := setupTestKVDB(t) + + hash := createTestPaymentHash(t, 13) + + err := kvdb.Update(kvDB, func(tx kvdb.RwTx) error { + paymentsBucket, err := tx.CreateTopLevelBucket( + paymentsRootBucket, + ) + if err != nil { + return err + } + + indexBucket, err := tx.CreateTopLevelBucket(paymentsIndexBucket) + if err != nil { + return err + } + + _, err = tx.CreateTopLevelBucket( + switchNextPaymentIDKey, + ) + if err != nil { + return err + } + + return createInFlightPaymentWithAttemptID( + t, paymentsBucket, indexBucket, hash, 0, + ) + }, func() {}) + require.NoError(t, err) + + sqlStore := setupTestSQLDB(t) + err = runPaymentsMigration(ctx, kvDB, sqlStore) + require.NoError(t, err) + + payment, err := sqlStore.FetchPayment(ctx, hash) + require.NoError(t, err) + require.Equal(t, StatusFailed, payment.Status) + require.NotNil(t, payment.FailureReason) + require.Equal(t, FailureReasonError, *payment.FailureReason) + require.Len(t, payment.HTLCs, 1) + require.NotZero(t, payment.HTLCs[0].AttemptID) + require.Nil(t, payment.HTLCs[0].Settle) + require.NotNil(t, payment.HTLCs[0].Failure) + require.Equal(t, HTLCFailUnknown, payment.HTLCs[0].Failure.Reason) + + inFlight, err := sqlStore.FetchInFlightPayments(ctx) + require.NoError(t, err) + require.Empty(t, inFlight) +} + // TestDuplicatePaymentsWithoutAttemptInfo verifies duplicate payments without // attempt info are migrated with terminal failure reasons. func TestDuplicatePaymentsWithoutAttemptInfo(t *testing.T) { @@ -1214,6 +1391,23 @@ func createTestPaymentInKV(t *testing.T, paymentsBucket, t.Helper() + // Increment global attempt ID and create HTLC attempt. So we have a + // globally unique attempt ID for the HTLC attempt. + *globalAttemptID++ + + return createTestPaymentInKVWithAttemptID( + t, paymentsBucket, indexBucket, seqNum, hash, *globalAttemptID, + ) +} + +// createTestPaymentInKVWithAttemptID creates a single payment in the KV store +// with a specific HTLC attempt ID. +func createTestPaymentInKVWithAttemptID(t *testing.T, paymentsBucket, + indexBucket kvdb.RwBucket, seqNum uint64, hash [32]byte, + attemptID uint64) error { + + t.Helper() + // Create payment bucket. paymentBucket, err := paymentsBucket.CreateBucketIfNotExists(hash[:]) if err != nil { @@ -1258,11 +1452,8 @@ func createTestPaymentInKV(t *testing.T, paymentsBucket, return err } - // Increment global attempt ID and create HTLC attempt. So we have a - // globally unique attempt ID for the HTLC attempt. - *globalAttemptID++ err = createTestHTLCAttempt( - t, htlcBucket, hash, *globalAttemptID, seqNum%3 == 0, + t, htlcBucket, hash, attemptID, seqNum%3 == 0, ) if err != nil { return err @@ -1635,6 +1826,10 @@ func fetchAllPaymentsFromKV(t *testing.T, kvDB kvdb.Backend) []*MPPayment { // normalizePaymentData makes sure that the payment data is normalized for // comparison using the same logic as in-migration validation. func normalizePaymentData(payment *MPPayment) { + if payment.Status == StatusInFlight { + _, _ = terminalizeUnresolvedLegacyZeroAttempts(payment) + } + normalizePaymentForCompare(payment) } @@ -1653,7 +1848,16 @@ func comparePaymentData(t *testing.T, ctx context.Context, sqlStore *SQLStore, require.NoError(t, err, "SQL payment should exist for %x", paymentHash[:8]) - // Normalize time precision to microseconds. + if kvPayment.Status == StatusInFlight { + // Normalize legacy payment state before remapped zero attempt + // IDs are aligned, because unresolved legacy zero-ID attempts + // are intentionally failed during migration. + _, _ = terminalizeUnresolvedLegacyZeroAttempts(kvPayment) + } + + // Normalize expected KV/SQL representation differences before the + // deep equality check. + normalizeLegacyZeroAttemptIDsForCompare(kvPayment, sqlPayment) normalizePaymentData(kvPayment) normalizePaymentData(sqlPayment) @@ -2649,8 +2853,8 @@ func createPaymentWithFeatureSet(t *testing.T, paymentsBucket, } // TestMigrateInFlightPayment tests that a payment with an active (in-flight) -// HTLC attempt — i.e. a node that upgrades while a payment is still pending on -// the network — is migrated correctly. The HTLC attempt must appear in SQL +// HTLC attempt, such as a node that upgrades while a payment is still pending +// on the network, is migrated correctly. The HTLC attempt must appear in SQL // without a settlement or failure resolution, and FetchInFlightPayments must // return the payment after migration. func TestMigrateInFlightPayment(t *testing.T) { @@ -2713,6 +2917,14 @@ func TestMigrateInFlightPayment(t *testing.T) { func createInFlightPayment(t *testing.T, paymentsBucket, indexBucket kvdb.RwBucket, hash [32]byte) error { + return createInFlightPaymentWithAttemptID( + t, paymentsBucket, indexBucket, hash, 1, + ) +} + +func createInFlightPaymentWithAttemptID(t *testing.T, paymentsBucket, + indexBucket kvdb.RwBucket, hash [32]byte, attemptID uint64) error { + t.Helper() paymentBucket, err := paymentsBucket.CreateBucketIfNotExists(hash[:]) @@ -2763,7 +2975,6 @@ func createInFlightPayment(t *testing.T, paymentsBucket, return err } - const attemptID = 1 attemptInfo := &HTLCAttemptInfo{ AttemptID: attemptID, sessionKey: sessionKeyBytes,