diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index bb02b9c276..8f5e4cb69f 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -54,6 +54,8 @@ refacotor the payment related LND code to make it more modular. * Implement the SQL backend for the [payments database](https://github.com/lightningnetwork/lnd/pull/9147) + * Implement QueryPayments for the [payments db + SQL Backend](https://github.com/lightningnetwork/lnd/pull/10287) ## Code Health diff --git a/payments/db/errors.go b/payments/db/errors.go index 40e37d95c6..424b63c7bd 100644 --- a/payments/db/errors.go +++ b/payments/db/errors.go @@ -130,4 +130,8 @@ var ( // NOTE: Only used for the kv backend. ErrNoSequenceNrIndex = errors.New("payment sequence number index " + "does not exist") + + // errMaxPaymentsReached is used internally to signal that the maximum + // number of payments has been reached during a paginated query. + errMaxPaymentsReached = errors.New("max payments reached") ) diff --git a/payments/db/sql_converters.go b/payments/db/sql_converters.go new file mode 100644 index 0000000000..4343dfd268 --- /dev/null +++ b/payments/db/sql_converters.go @@ -0,0 +1,274 @@ +package paymentsdb + +import ( + "bytes" + "fmt" + "sort" + "strconv" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/record" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/sqldb/sqlc" + "github.com/lightningnetwork/lnd/tlv" +) + +// dbPaymentToCreationInfo converts database payment data to +// PaymentCreationInfo. +func dbPaymentToCreationInfo(paymentIdentifier []byte, amountMsat int64, + createdAt time.Time, intentPayload []byte, + firstHopCustomRecords lnwire.CustomRecords) *PaymentCreationInfo { + + // This is the payment hash for non-AMP payments and the SetID for AMP + // payments. + var identifier lntypes.Hash + copy(identifier[:], paymentIdentifier) + + return &PaymentCreationInfo{ + PaymentIdentifier: identifier, + Value: lnwire.MilliSatoshi(amountMsat), + CreationTime: createdAt.Local(), + PaymentRequest: intentPayload, + FirstHopCustomRecords: firstHopCustomRecords, + } +} + +// dbAttemptToHTLCAttempt converts a database HTLC attempt to an HTLCAttempt. +func dbAttemptToHTLCAttempt( + dbAttempt sqlc.FetchHtlcAttemptsForPaymentRow, + hops []sqlc.FetchHopsForAttemptsRow, + hopCustomRecords map[int64][]sqlc.PaymentHopCustomRecord, + routeCustomRecords []sqlc.PaymentAttemptFirstHopCustomRecord) ( + *HTLCAttempt, error) { + + // Convert route-level first hop custom records to CustomRecords map. + var firstHopWireCustomRecords lnwire.CustomRecords + if len(routeCustomRecords) > 0 { + firstHopWireCustomRecords = make(lnwire.CustomRecords) + for _, record := range routeCustomRecords { + firstHopWireCustomRecords[uint64(record.Key)] = + record.Value + } + } + + // Build the route from the database data. + route, err := dbDataToRoute( + hops, hopCustomRecords, dbAttempt.FirstHopAmountMsat, + dbAttempt.RouteTotalTimeLock, dbAttempt.RouteTotalAmount, + dbAttempt.RouteSourceKey, firstHopWireCustomRecords, + ) + if err != nil { + return nil, fmt.Errorf("failed to convert to route: %w", + err) + } + + hash, err := lntypes.MakeHash(dbAttempt.PaymentHash) + if err != nil { + return nil, fmt.Errorf("failed to parse payment "+ + "hash: %w", err) + } + + // Create the attempt info. + var sessionKey [32]byte + copy(sessionKey[:], dbAttempt.SessionKey) + + info := HTLCAttemptInfo{ + AttemptID: uint64(dbAttempt.AttemptIndex), + sessionKey: sessionKey, + Route: *route, + AttemptTime: dbAttempt.AttemptTime, + Hash: &hash, + } + + attempt := &HTLCAttempt{ + HTLCAttemptInfo: info, + } + + // Add settlement info if present. + if dbAttempt.ResolutionType.Valid && + HTLCAttemptResolutionType(dbAttempt.ResolutionType.Int32) == + HTLCAttemptResolutionSettled { + + var preimage lntypes.Preimage + copy(preimage[:], dbAttempt.SettlePreimage) + + attempt.Settle = &HTLCSettleInfo{ + Preimage: preimage, + SettleTime: dbAttempt.ResolutionTime.Time, + } + } + + // Add failure info if present. + if dbAttempt.ResolutionType.Valid && + HTLCAttemptResolutionType(dbAttempt.ResolutionType.Int32) == + HTLCAttemptResolutionFailed { + + failure := &HTLCFailInfo{ + FailTime: dbAttempt.ResolutionTime.Time, + } + + if dbAttempt.HtlcFailReason.Valid { + failure.Reason = HTLCFailReason( + dbAttempt.HtlcFailReason.Int32, + ) + } + + if dbAttempt.FailureSourceIndex.Valid { + failure.FailureSourceIndex = uint32( + dbAttempt.FailureSourceIndex.Int32, + ) + } + + // Decode the failure message if present. + if len(dbAttempt.FailureMsg) > 0 { + msg, err := lnwire.DecodeFailureMessage( + bytes.NewReader(dbAttempt.FailureMsg), 0, + ) + if err != nil { + return nil, fmt.Errorf("failed to decode "+ + "failure message: %w", err) + } + failure.Message = msg + } + + attempt.Failure = failure + } + + return attempt, nil +} + +// dbDataToRoute converts database route data to a route.Route. +func dbDataToRoute(hops []sqlc.FetchHopsForAttemptsRow, + hopCustomRecords map[int64][]sqlc.PaymentHopCustomRecord, + firstHopAmountMsat int64, totalTimeLock int32, totalAmount int64, + sourceKey []byte, firstHopWireCustomRecords lnwire.CustomRecords) ( + *route.Route, error) { + + if len(hops) == 0 { + return nil, fmt.Errorf("no hops provided") + } + + // Sort hops by hop index. + sort.Slice(hops, func(i, j int) bool { + return hops[i].HopIndex < hops[j].HopIndex + }) + + routeHops := make([]*route.Hop, len(hops)) + + for i, hop := range hops { + pubKey, err := route.NewVertexFromBytes(hop.PubKey) + if err != nil { + return nil, fmt.Errorf("failed to parse pub key: %w", + err) + } + + var channelID uint64 + if hop.Scid != "" { + // The SCID is stored as a string representation + // of the uint64. + var err error + channelID, err = strconv.ParseUint(hop.Scid, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse "+ + "scid: %w", err) + } + } + + routeHop := &route.Hop{ + PubKeyBytes: pubKey, + ChannelID: channelID, + OutgoingTimeLock: uint32(hop.OutgoingTimeLock), + AmtToForward: lnwire.MilliSatoshi(hop.AmtToForward), + } + + // Add MPP record if present. + if len(hop.MppPaymentAddr) > 0 { + var paymentAddr [32]byte + copy(paymentAddr[:], hop.MppPaymentAddr) + routeHop.MPP = record.NewMPP( + lnwire.MilliSatoshi(hop.MppTotalMsat.Int64), + paymentAddr, + ) + } + + // Add AMP record if present. + if len(hop.AmpRootShare) > 0 { + var rootShare [32]byte + copy(rootShare[:], hop.AmpRootShare) + var setID [32]byte + copy(setID[:], hop.AmpSetID) + + routeHop.AMP = record.NewAMP( + rootShare, setID, + uint32(hop.AmpChildIndex.Int32), + ) + } + + // Add blinding point if present (only for introduction node). + if len(hop.BlindingPoint) > 0 { + pubKey, err := btcec.ParsePubKey(hop.BlindingPoint) + if err != nil { + return nil, fmt.Errorf("failed to parse "+ + "blinding point: %w", err) + } + routeHop.BlindingPoint = pubKey + } + + // Add encrypted data if present (for all blinded hops). + if len(hop.EncryptedData) > 0 { + routeHop.EncryptedData = hop.EncryptedData + } + + // Add total amount if present (only for final hop in blinded + // route). + if hop.BlindedPathTotalAmt.Valid { + routeHop.TotalAmtMsat = lnwire.MilliSatoshi( + hop.BlindedPathTotalAmt.Int64, + ) + } + + // Add hop-level custom records. + if records, ok := hopCustomRecords[hop.ID]; ok { + routeHop.CustomRecords = make( + record.CustomSet, + ) + for _, rec := range records { + routeHop.CustomRecords[uint64(rec.Key)] = + rec.Value + } + } + + // Add metadata if present. + if len(hop.MetaData) > 0 { + routeHop.Metadata = hop.MetaData + } + + routeHops[i] = routeHop + } + + // Parse the source node public key. + var sourceNode route.Vertex + copy(sourceNode[:], sourceKey) + + route := &route.Route{ + TotalTimeLock: uint32(totalTimeLock), + TotalAmount: lnwire.MilliSatoshi(totalAmount), + SourcePubKey: sourceNode, + Hops: routeHops, + FirstHopWireCustomRecords: firstHopWireCustomRecords, + } + + // Set the first hop amount if it is set. + if firstHopAmountMsat != 0 { + route.FirstHopAmount = tlv.NewRecordT[tlv.TlvType0]( + tlv.NewBigSizeT(lnwire.MilliSatoshi( + firstHopAmountMsat, + )), + ) + } + + return route, nil +} diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index 12585caf64..0b406d2031 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1,14 +1,61 @@ package paymentsdb import ( + "context" + "database/sql" + "errors" "fmt" + "math" + "time" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/sqldb" + "github.com/lightningnetwork/lnd/sqldb/sqlc" +) + +// PaymentIntentType represents the type of payment intent. +type PaymentIntentType int16 + +const ( + // PaymentIntentTypeBolt11 indicates a BOLT11 invoice payment. + PaymentIntentTypeBolt11 PaymentIntentType = 0 +) + +// HTLCAttemptResolutionType represents the type of HTLC attempt resolution. +type HTLCAttemptResolutionType int32 + +const ( + // HTLCAttemptResolutionSettled indicates the HTLC attempt was settled + // successfully with a preimage. + HTLCAttemptResolutionSettled HTLCAttemptResolutionType = 1 + + // HTLCAttemptResolutionFailed indicates the HTLC attempt failed. + HTLCAttemptResolutionFailed HTLCAttemptResolutionType = 2 ) // SQLQueries is a subset of the sqlc.Querier interface that can be used to // execute queries against the SQL payments tables. +// +//nolint:ll,interfacebloat type SQLQueries interface { + /* + Payment DB read operations. + */ + FilterPayments(ctx context.Context, query sqlc.FilterPaymentsParams) ([]sqlc.FilterPaymentsRow, error) + FetchPayment(ctx context.Context, paymentIdentifier []byte) (sqlc.FetchPaymentRow, error) + FetchPaymentsByIDs(ctx context.Context, paymentIDs []int64) ([]sqlc.FetchPaymentsByIDsRow, error) + + CountPayments(ctx context.Context) (int64, error) + + FetchHtlcAttemptsForPayment(ctx context.Context, paymentID int64) ([]sqlc.FetchHtlcAttemptsForPaymentRow, error) + FetchAllInflightAttempts(ctx context.Context) ([]sqlc.PaymentHtlcAttempt, error) + FetchHopsForAttempt(ctx context.Context, htlcAttemptIndex int64) ([]sqlc.FetchHopsForAttemptRow, error) + FetchHopsForAttempts(ctx context.Context, htlcAttemptIndices []int64) ([]sqlc.FetchHopsForAttemptsRow, error) + + FetchPaymentLevelFirstHopCustomRecords(ctx context.Context, paymentID int64) ([]sqlc.PaymentFirstHopCustomRecord, error) + FetchRouteLevelFirstHopCustomRecords(ctx context.Context, htlcAttemptIndices []int64) ([]sqlc.PaymentAttemptFirstHopCustomRecord, error) + FetchHopLevelCustomRecords(ctx context.Context, hopIDs []int64) ([]sqlc.PaymentHopCustomRecord, error) } // BatchedSQLQueries is a version of the SQLQueries that's capable @@ -65,3 +112,404 @@ func NewSQLStore(cfg *SQLStoreConfig, db BatchedSQLQueries, // A compile-time constraint to ensure SQLStore implements DB. var _ DB = (*SQLStore)(nil) + +// fetchPaymentWithCompleteData fetches a payment with all its related data +// including attempts, hops, and custom records from the database. +func (s *SQLStore) fetchPaymentWithCompleteData(ctx context.Context, + db SQLQueries, dbPayment sqlc.PaymentAndIntent) (*MPPayment, error) { + + // The query will only return BOLT 11 payment intents or intents with + // no intent type set. + paymentIntent := dbPayment.GetPaymentIntent() + paymentRequest := paymentIntent.IntentPayload + + // Fetch payment-level first hop custom records. + payment := dbPayment.GetPayment() + customRecords, err := db.FetchPaymentLevelFirstHopCustomRecords( + ctx, payment.ID, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch payment level custom "+ + "records: %w", err) + } + + // Convert to the FirstHopCustomRecords map. + var firstHopCustomRecords lnwire.CustomRecords + if len(customRecords) > 0 { + firstHopCustomRecords = make(lnwire.CustomRecords) + for _, record := range customRecords { + firstHopCustomRecords[uint64(record.Key)] = record.Value + } + } + + // Convert the basic payment info. + info := dbPaymentToCreationInfo( + payment.PaymentIdentifier, payment.AmountMsat, + payment.CreatedAt, paymentRequest, firstHopCustomRecords, + ) + + // Fetch all HTLC attempts for this payment. + attempts, err := s.fetchHTLCAttemptsForPayment( + ctx, db, payment.ID, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch HTLC attempts: %w", + err) + } + + // Set the failure reason if present. + var failureReason *FailureReason + if payment.FailReason.Valid { + reason := FailureReason(payment.FailReason.Int32) + failureReason = &reason + } + + mpPayment := &MPPayment{ + SequenceNum: uint64(payment.ID), + Info: info, + HTLCs: attempts, + FailureReason: failureReason, + } + + // The status and state will be determined by calling + // SetState after construction. + if err := mpPayment.SetState(); err != nil { + return nil, fmt.Errorf("failed to set payment state: %w", err) + } + + return mpPayment, nil +} + +// fetchHTLCAttemptsForPayment fetches all HTLC attempts for a payment and +// uses ExecuteBatchQuery to efficiently fetch hops and custom records. +func (s *SQLStore) fetchHTLCAttemptsForPayment(ctx context.Context, + db SQLQueries, paymentID int64) ([]HTLCAttempt, error) { + + // Fetch all HTLC attempts for this payment. + dbAttempts, err := db.FetchHtlcAttemptsForPayment( + ctx, paymentID, + ) + + if err != nil { + return nil, fmt.Errorf("failed to fetch HTLC attempts: %w", + err) + } + + if len(dbAttempts) == 0 { + return nil, nil + } + + // Collect all attempt indices for batch fetching. + attemptIndices := make([]int64, len(dbAttempts)) + for i, attempt := range dbAttempts { + attemptIndices[i] = attempt.AttemptIndex + } + + // Fetch all hops for all attempts using ExecuteBatchQuery. + hopsByAttempt := make(map[int64][]sqlc.FetchHopsForAttemptsRow) + err = sqldb.ExecuteBatchQuery( + ctx, s.cfg.QueryCfg, attemptIndices, + func(idx int64) int64 { return idx }, + func(ctx context.Context, indices []int64) ( + []sqlc.FetchHopsForAttemptsRow, error) { + + return db.FetchHopsForAttempts(ctx, indices) + }, + func(ctx context.Context, + hop sqlc.FetchHopsForAttemptsRow) error { + + hopsByAttempt[hop.HtlcAttemptIndex] = append( + hopsByAttempt[hop.HtlcAttemptIndex], hop, + ) + + return nil + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch hops for attempts: %w", + err) + } + + // Collect all hop IDs for fetching hop-level custom records. + var hopIDs []int64 + for _, hops := range hopsByAttempt { + for _, hop := range hops { + hopIDs = append(hopIDs, hop.ID) + } + } + + // Fetch all hop-level custom records using ExecuteBatchQuery. + hopCustomRecords := make(map[int64][]sqlc.PaymentHopCustomRecord) + if len(hopIDs) > 0 { + err = sqldb.ExecuteBatchQuery( + ctx, s.cfg.QueryCfg, hopIDs, + func(id int64) int64 { return id }, + func(ctx context.Context, ids []int64) ( + []sqlc.PaymentHopCustomRecord, error) { + + return db.FetchHopLevelCustomRecords(ctx, ids) + }, + func(ctx context.Context, + record sqlc.PaymentHopCustomRecord) error { + + // TODO(ziggie): Can we get rid of this? + // This has to be in place otherwise the + // comparison will not match. + if record.Value == nil { + record.Value = []byte{} + } + + hopCustomRecords[record.HopID] = append( + hopCustomRecords[record.HopID], record, + ) + + return nil + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch hop custom "+ + "records: %w", err) + } + } + + // Fetch route-level first hop custom records using ExecuteBatchQuery. + routeCustomRecords := make( + map[int64][]sqlc.PaymentAttemptFirstHopCustomRecord, + ) + err = sqldb.ExecuteBatchQuery( + ctx, s.cfg.QueryCfg, attemptIndices, + func(idx int64) int64 { return idx }, + func(ctx context.Context, indices []int64) ( + []sqlc.PaymentAttemptFirstHopCustomRecord, error) { + + return db.FetchRouteLevelFirstHopCustomRecords( + ctx, indices, + ) + }, + func(ctx context.Context, + record sqlc.PaymentAttemptFirstHopCustomRecord) error { + + routeCustomRecords[record.HtlcAttemptIndex] = append( + routeCustomRecords[record.HtlcAttemptIndex], + record, + ) + + return nil + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch route custom "+ + "records: %w", err) + } + + // Now convert all attempts to HTLCAttempt structs. + attempts := make([]HTLCAttempt, 0, len(dbAttempts)) + for _, dbAttempt := range dbAttempts { + attemptIndex := dbAttempt.AttemptIndex + attempt, err := dbAttemptToHTLCAttempt( + dbAttempt, hopsByAttempt[attemptIndex], + hopCustomRecords, + routeCustomRecords[attemptIndex], + ) + if err != nil { + return nil, fmt.Errorf("failed to convert attempt "+ + "%d: %w", attemptIndex, err) + } + attempts = append(attempts, *attempt) + } + + return attempts, nil +} + +// QueryPayments queries the payments from the database. +// +// This is part of the DB interface. +func (s *SQLStore) QueryPayments(ctx context.Context, + query Query) (Response, error) { + + if query.MaxPayments == 0 { + return Response{}, fmt.Errorf("max payments must be non-zero") + } + + var ( + allPayments []*MPPayment + totalCount int64 + initialCursor int64 + ) + + extractCursor := func( + row sqlc.FilterPaymentsRow) int64 { + + return row.Payment.ID + } + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + // We first count all payments to determine the total count + // if requested. + if query.CountTotal { + totalPayments, err := db.CountPayments(ctx) + if err != nil { + return fmt.Errorf("failed to count "+ + "payments: %w", err) + } + totalCount = totalPayments + } + + processPayment := func(ctx context.Context, + dbPayment sqlc.FilterPaymentsRow) error { + + // Fetch all the additional data for the payment. + mpPayment, err := s.fetchPaymentWithCompleteData( + ctx, db, dbPayment, + ) + if err != nil { + return fmt.Errorf("failed to fetch payment "+ + "with complete data: %w", err) + } + + // To keep compatibility with the old API, we only + // return non-succeeded payments if requested. + if mpPayment.Status != StatusSucceeded && + !query.IncludeIncomplete { + + return nil + } + + if uint64(len(allPayments)) >= query.MaxPayments { + return errMaxPaymentsReached + } + + allPayments = append(allPayments, mpPayment) + + return nil + } + + queryFunc := func(ctx context.Context, lastID int64, + limit int32) ([]sqlc.FilterPaymentsRow, error) { + + filterParams := sqlc.FilterPaymentsParams{ + NumLimit: limit, + Reverse: query.Reversed, + // For now there only BOLT 11 payment intents + // exist. + IntentType: sqldb.SQLInt16( + PaymentIntentTypeBolt11, + ), + } + + if query.Reversed { + filterParams.IndexOffsetLet = sqldb.SQLInt64( + lastID, + ) + } else { + filterParams.IndexOffsetGet = sqldb.SQLInt64( + lastID, + ) + } + + // Add potential date filters if specified. + if query.CreationDateStart != 0 { + filterParams.CreatedAfter = sqldb.SQLTime( + time.Unix(query.CreationDateStart, 0). + UTC(), + ) + } + if query.CreationDateEnd != 0 { + filterParams.CreatedBefore = sqldb.SQLTime( + time.Unix(query.CreationDateEnd, 0). + UTC(), + ) + } + + return db.FilterPayments(ctx, filterParams) + } + + if query.Reversed { + if query.IndexOffset == 0 { + initialCursor = int64(math.MaxInt64) + } else { + initialCursor = int64(query.IndexOffset) + } + } else { + initialCursor = int64(query.IndexOffset) + } + + return sqldb.ExecutePaginatedQuery( + ctx, s.cfg.QueryCfg, initialCursor, queryFunc, + extractCursor, processPayment, + ) + }, func() { + allPayments = nil + }) + + // We make sure we don't return an error if we reached the maximum + // number of payments. Which is the pagination limit for the query + // itself. + if err != nil && !errors.Is(err, errMaxPaymentsReached) { + return Response{}, fmt.Errorf("failed to query payments: %w", + err) + } + + // Handle case where no payments were found + if len(allPayments) == 0 { + return Response{ + Payments: allPayments, + FirstIndexOffset: 0, + LastIndexOffset: 0, + TotalCount: uint64(totalCount), + }, nil + } + + // If the query was reversed, we need to reverse the payment list + // to match the kvstore behavior and return payments in forward order. + if query.Reversed { + for i, j := 0, len(allPayments)-1; i < j; i, j = i+1, j-1 { + allPayments[i], allPayments[j] = allPayments[j], + allPayments[i] + } + } + + return Response{ + Payments: allPayments, + FirstIndexOffset: allPayments[0].SequenceNum, + LastIndexOffset: allPayments[len(allPayments)-1].SequenceNum, + TotalCount: uint64(totalCount), + }, nil +} + +// FetchPayment fetches the payment corresponding to the given payment +// hash. +// +// This is part of the DB interface. +func (s *SQLStore) FetchPayment(paymentHash lntypes.Hash) (*MPPayment, error) { + ctx := context.TODO() + + var mpPayment *MPPayment + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + dbPayment, err := db.FetchPayment(ctx, paymentHash[:]) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to fetch payment: %w", err) + } + + if errors.Is(err, sql.ErrNoRows) { + return ErrPaymentNotInitiated + } + + mpPayment, err = s.fetchPaymentWithCompleteData( + ctx, db, dbPayment, + ) + if err != nil { + return fmt.Errorf("failed to fetch payment with "+ + "complete data: %w", err) + } + + return nil + }, func() { + }) + if err != nil { + return nil, fmt.Errorf("failed to fetch payment: %w", err) + } + + return mpPayment, nil +} diff --git a/sqldb/sqlc/db_custom.go b/sqldb/sqlc/db_custom.go index f7bc499185..db0a03a421 100644 --- a/sqldb/sqlc/db_custom.go +++ b/sqldb/sqlc/db_custom.go @@ -161,3 +161,79 @@ func (r GetChannelsBySCIDRangeRow) Node1Pub() []byte { func (r GetChannelsBySCIDRangeRow) Node2Pub() []byte { return r.Node2PubKey } + +// PaymentAndIntent is an interface that provides access to a payment and its +// associated payment intent. +type PaymentAndIntent interface { + // GetPayment returns the Payment associated with this interface. + GetPayment() Payment + + // GetPaymentIntent returns the PaymentIntent associated with this payment. + GetPaymentIntent() PaymentIntent +} + +// GetPayment returns the Payment associated with this interface. +// +// NOTE: This method is part of the PaymentAndIntent interface. +func (r FilterPaymentsRow) GetPayment() Payment { + return r.Payment +} + +// GetPaymentIntent returns the PaymentIntent associated with this payment. +// If the payment has no intent (IntentType is NULL), this returns a zero-value +// PaymentIntent. +// +// NOTE: This method is part of the PaymentAndIntent interface. +func (r FilterPaymentsRow) GetPaymentIntent() PaymentIntent { + if !r.IntentType.Valid { + return PaymentIntent{} + } + return PaymentIntent{ + IntentType: r.IntentType.Int16, + IntentPayload: r.IntentPayload, + } +} + +// GetPayment returns the Payment associated with this interface. +// +// NOTE: This method is part of the PaymentAndIntent interface. +func (r FetchPaymentRow) GetPayment() Payment { + return r.Payment +} + +// GetPaymentIntent returns the PaymentIntent associated with this payment. +// If the payment has no intent (IntentType is NULL), this returns a zero-value +// PaymentIntent. +// +// NOTE: This method is part of the PaymentAndIntent interface. +func (r FetchPaymentRow) GetPaymentIntent() PaymentIntent { + if !r.IntentType.Valid { + return PaymentIntent{} + } + return PaymentIntent{ + IntentType: r.IntentType.Int16, + IntentPayload: r.IntentPayload, + } +} + +// GetPayment returns the Payment associated with this interface. +// +// NOTE: This method is part of the PaymentAndIntent interface. +func (r FetchPaymentsByIDsRow) GetPayment() Payment { + return r.Payment +} + +// GetPaymentIntent returns the PaymentIntent associated with this payment. +// If the payment has no intent (IntentType is NULL), this returns a zero-value +// PaymentIntent. +// +// NOTE: This method is part of the PaymentAndIntent interface. +func (r FetchPaymentsByIDsRow) GetPaymentIntent() PaymentIntent { + if !r.IntentType.Valid { + return PaymentIntent{} + } + return PaymentIntent{ + IntentType: r.IntentType.Int16, + IntentPayload: r.IntentPayload, + } +} diff --git a/sqldb/sqlc/migrations/000009_payments.up.sql b/sqldb/sqlc/migrations/000009_payments.up.sql index c856db8f44..0d85b497b0 100644 --- a/sqldb/sqlc/migrations/000009_payments.up.sql +++ b/sqldb/sqlc/migrations/000009_payments.up.sql @@ -32,9 +32,13 @@ CREATE TABLE IF NOT EXISTS payment_intents ( ); -- Index for efficient querying by intent type -CREATE INDEX IF NOT EXISTS idx_payment_intents_type +CREATE INDEX IF NOT EXISTS idx_payment_intents_type ON payment_intents(intent_type); +-- Unique constraint for deduplication of payment intents +CREATE UNIQUE INDEX IF NOT EXISTS idx_payment_intents_unique +ON payment_intents(intent_type, intent_payload); + -- ───────────────────────────────────────────── -- Payments Table -- ───────────────────────────────────────────── @@ -187,7 +191,8 @@ CREATE TABLE IF NOT EXISTS payment_htlc_attempt_resolutions ( -- HTLC failure reason code htlc_fail_reason INTEGER, - -- Failure message from the failing node + -- Failure message from the failing node, this message is binary encoded + -- using the lightning wire protocol, see also lnwire/onion_error.go failure_msg BLOB, -- Ensure data integrity: settled attempts must have preimage, diff --git a/sqldb/sqlc/payments.sql.go b/sqldb/sqlc/payments.sql.go new file mode 100644 index 0000000000..b925c78358 --- /dev/null +++ b/sqldb/sqlc/payments.sql.go @@ -0,0 +1,655 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: payments.sql + +package sqlc + +import ( + "context" + "database/sql" + "strings" + "time" +) + +const countPayments = `-- name: CountPayments :one +SELECT COUNT(*) FROM payments +` + +func (q *Queries) CountPayments(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countPayments) + var count int64 + err := row.Scan(&count) + return count, err +} + +const fetchAllInflightAttempts = `-- name: FetchAllInflightAttempts :many +SELECT + ha.id, + ha.attempt_index, + ha.payment_id, + ha.session_key, + ha.attempt_time, + ha.payment_hash, + ha.first_hop_amount_msat, + ha.route_total_time_lock, + ha.route_total_amount, + ha.route_source_key +FROM payment_htlc_attempts ha +WHERE NOT EXISTS ( + SELECT 1 FROM payment_htlc_attempt_resolutions hr + WHERE hr.attempt_index = ha.attempt_index +) +` + +// Fetch all inflight attempts across all payments +func (q *Queries) FetchAllInflightAttempts(ctx context.Context) ([]PaymentHtlcAttempt, error) { + rows, err := q.db.QueryContext(ctx, fetchAllInflightAttempts) + if err != nil { + return nil, err + } + defer rows.Close() + var items []PaymentHtlcAttempt + for rows.Next() { + var i PaymentHtlcAttempt + if err := rows.Scan( + &i.ID, + &i.AttemptIndex, + &i.PaymentID, + &i.SessionKey, + &i.AttemptTime, + &i.PaymentHash, + &i.FirstHopAmountMsat, + &i.RouteTotalTimeLock, + &i.RouteTotalAmount, + &i.RouteSourceKey, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchHopLevelCustomRecords = `-- name: FetchHopLevelCustomRecords :many +SELECT + l.id, + l.hop_id, + l.key, + l.value +FROM payment_hop_custom_records l +WHERE l.hop_id IN (/*SLICE:hop_ids*/?) +` + +func (q *Queries) FetchHopLevelCustomRecords(ctx context.Context, hopIds []int64) ([]PaymentHopCustomRecord, error) { + query := fetchHopLevelCustomRecords + var queryParams []interface{} + if len(hopIds) > 0 { + for _, v := range hopIds { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:hop_ids*/?", makeQueryParams(len(queryParams), len(hopIds)), 1) + } else { + query = strings.Replace(query, "/*SLICE:hop_ids*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []PaymentHopCustomRecord + for rows.Next() { + var i PaymentHopCustomRecord + if err := rows.Scan( + &i.ID, + &i.HopID, + &i.Key, + &i.Value, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchHopsForAttempt = `-- name: FetchHopsForAttempt :many +SELECT + h.id, + h.htlc_attempt_index, + h.hop_index, + h.pub_key, + h.scid, + h.outgoing_time_lock, + h.amt_to_forward, + h.meta_data, + m.payment_addr AS mpp_payment_addr, + m.total_msat AS mpp_total_msat, + a.root_share AS amp_root_share, + a.set_id AS amp_set_id, + a.child_index AS amp_child_index, + b.encrypted_data, + b.blinding_point, + b.blinded_path_total_amt +FROM payment_route_hops h +LEFT JOIN payment_route_hop_mpp m ON m.hop_id = h.id +LEFT JOIN payment_route_hop_amp a ON a.hop_id = h.id +LEFT JOIN payment_route_hop_blinded b ON b.hop_id = h.id +WHERE h.htlc_attempt_index = $1 +ORDER BY h.hop_index ASC +` + +type FetchHopsForAttemptRow struct { + ID int64 + HtlcAttemptIndex int64 + HopIndex int32 + PubKey []byte + Scid string + OutgoingTimeLock int32 + AmtToForward int64 + MetaData []byte + MppPaymentAddr []byte + MppTotalMsat sql.NullInt64 + AmpRootShare []byte + AmpSetID []byte + AmpChildIndex sql.NullInt32 + EncryptedData []byte + BlindingPoint []byte + BlindedPathTotalAmt sql.NullInt64 +} + +func (q *Queries) FetchHopsForAttempt(ctx context.Context, htlcAttemptIndex int64) ([]FetchHopsForAttemptRow, error) { + rows, err := q.db.QueryContext(ctx, fetchHopsForAttempt, htlcAttemptIndex) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchHopsForAttemptRow + for rows.Next() { + var i FetchHopsForAttemptRow + if err := rows.Scan( + &i.ID, + &i.HtlcAttemptIndex, + &i.HopIndex, + &i.PubKey, + &i.Scid, + &i.OutgoingTimeLock, + &i.AmtToForward, + &i.MetaData, + &i.MppPaymentAddr, + &i.MppTotalMsat, + &i.AmpRootShare, + &i.AmpSetID, + &i.AmpChildIndex, + &i.EncryptedData, + &i.BlindingPoint, + &i.BlindedPathTotalAmt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchHopsForAttempts = `-- name: FetchHopsForAttempts :many +SELECT + h.id, + h.htlc_attempt_index, + h.hop_index, + h.pub_key, + h.scid, + h.outgoing_time_lock, + h.amt_to_forward, + h.meta_data, + m.payment_addr AS mpp_payment_addr, + m.total_msat AS mpp_total_msat, + a.root_share AS amp_root_share, + a.set_id AS amp_set_id, + a.child_index AS amp_child_index, + b.encrypted_data, + b.blinding_point, + b.blinded_path_total_amt +FROM payment_route_hops h +LEFT JOIN payment_route_hop_mpp m ON m.hop_id = h.id +LEFT JOIN payment_route_hop_amp a ON a.hop_id = h.id +LEFT JOIN payment_route_hop_blinded b ON b.hop_id = h.id +WHERE h.htlc_attempt_index IN (/*SLICE:htlc_attempt_indices*/?) +` + +type FetchHopsForAttemptsRow struct { + ID int64 + HtlcAttemptIndex int64 + HopIndex int32 + PubKey []byte + Scid string + OutgoingTimeLock int32 + AmtToForward int64 + MetaData []byte + MppPaymentAddr []byte + MppTotalMsat sql.NullInt64 + AmpRootShare []byte + AmpSetID []byte + AmpChildIndex sql.NullInt32 + EncryptedData []byte + BlindingPoint []byte + BlindedPathTotalAmt sql.NullInt64 +} + +func (q *Queries) FetchHopsForAttempts(ctx context.Context, htlcAttemptIndices []int64) ([]FetchHopsForAttemptsRow, error) { + query := fetchHopsForAttempts + var queryParams []interface{} + if len(htlcAttemptIndices) > 0 { + for _, v := range htlcAttemptIndices { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:htlc_attempt_indices*/?", makeQueryParams(len(queryParams), len(htlcAttemptIndices)), 1) + } else { + query = strings.Replace(query, "/*SLICE:htlc_attempt_indices*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchHopsForAttemptsRow + for rows.Next() { + var i FetchHopsForAttemptsRow + if err := rows.Scan( + &i.ID, + &i.HtlcAttemptIndex, + &i.HopIndex, + &i.PubKey, + &i.Scid, + &i.OutgoingTimeLock, + &i.AmtToForward, + &i.MetaData, + &i.MppPaymentAddr, + &i.MppTotalMsat, + &i.AmpRootShare, + &i.AmpSetID, + &i.AmpChildIndex, + &i.EncryptedData, + &i.BlindingPoint, + &i.BlindedPathTotalAmt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchHtlcAttemptsForPayment = `-- name: FetchHtlcAttemptsForPayment :many +SELECT + ha.id, + ha.attempt_index, + ha.payment_id, + ha.session_key, + ha.attempt_time, + ha.payment_hash, + ha.first_hop_amount_msat, + ha.route_total_time_lock, + ha.route_total_amount, + ha.route_source_key, + hr.resolution_type, + hr.resolution_time, + hr.failure_source_index, + hr.htlc_fail_reason, + hr.failure_msg, + hr.settle_preimage +FROM payment_htlc_attempts ha +LEFT JOIN payment_htlc_attempt_resolutions hr ON hr.attempt_index = ha.attempt_index +WHERE ha.payment_id = $1 +ORDER BY ha.attempt_time ASC +` + +type FetchHtlcAttemptsForPaymentRow struct { + ID int64 + AttemptIndex int64 + PaymentID int64 + SessionKey []byte + AttemptTime time.Time + PaymentHash []byte + FirstHopAmountMsat int64 + RouteTotalTimeLock int32 + RouteTotalAmount int64 + RouteSourceKey []byte + ResolutionType sql.NullInt32 + ResolutionTime sql.NullTime + FailureSourceIndex sql.NullInt32 + HtlcFailReason sql.NullInt32 + FailureMsg []byte + SettlePreimage []byte +} + +// This fetches all htlc attempts for a payment. +func (q *Queries) FetchHtlcAttemptsForPayment(ctx context.Context, paymentID int64) ([]FetchHtlcAttemptsForPaymentRow, error) { + rows, err := q.db.QueryContext(ctx, fetchHtlcAttemptsForPayment, paymentID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchHtlcAttemptsForPaymentRow + for rows.Next() { + var i FetchHtlcAttemptsForPaymentRow + if err := rows.Scan( + &i.ID, + &i.AttemptIndex, + &i.PaymentID, + &i.SessionKey, + &i.AttemptTime, + &i.PaymentHash, + &i.FirstHopAmountMsat, + &i.RouteTotalTimeLock, + &i.RouteTotalAmount, + &i.RouteSourceKey, + &i.ResolutionType, + &i.ResolutionTime, + &i.FailureSourceIndex, + &i.HtlcFailReason, + &i.FailureMsg, + &i.SettlePreimage, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchPayment = `-- name: FetchPayment :one +SELECT + p.id, p.intent_id, p.amount_msat, p.created_at, p.payment_identifier, p.fail_reason, + i.intent_type AS "intent_type", + i.intent_payload AS "intent_payload" +FROM payments p +LEFT JOIN payment_intents i ON i.id = p.intent_id +WHERE p.payment_identifier = $1 +` + +type FetchPaymentRow struct { + Payment Payment + IntentType sql.NullInt16 + IntentPayload []byte +} + +func (q *Queries) FetchPayment(ctx context.Context, paymentIdentifier []byte) (FetchPaymentRow, error) { + row := q.db.QueryRowContext(ctx, fetchPayment, paymentIdentifier) + var i FetchPaymentRow + err := row.Scan( + &i.Payment.ID, + &i.Payment.IntentID, + &i.Payment.AmountMsat, + &i.Payment.CreatedAt, + &i.Payment.PaymentIdentifier, + &i.Payment.FailReason, + &i.IntentType, + &i.IntentPayload, + ) + return i, err +} + +const fetchPaymentLevelFirstHopCustomRecords = `-- name: FetchPaymentLevelFirstHopCustomRecords :many +SELECT + l.id, + l.payment_id, + l.key, + l.value +FROM payment_first_hop_custom_records l +WHERE l.payment_id = $1 +` + +func (q *Queries) FetchPaymentLevelFirstHopCustomRecords(ctx context.Context, paymentID int64) ([]PaymentFirstHopCustomRecord, error) { + rows, err := q.db.QueryContext(ctx, fetchPaymentLevelFirstHopCustomRecords, paymentID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []PaymentFirstHopCustomRecord + for rows.Next() { + var i PaymentFirstHopCustomRecord + if err := rows.Scan( + &i.ID, + &i.PaymentID, + &i.Key, + &i.Value, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchPaymentsByIDs = `-- name: FetchPaymentsByIDs :many +SELECT + p.id, p.intent_id, p.amount_msat, p.created_at, p.payment_identifier, p.fail_reason, + i.intent_type AS "intent_type", + i.intent_payload AS "intent_payload" +FROM payments p +LEFT JOIN payment_intents i ON i.id = p.intent_id +WHERE p.id IN (/*SLICE:payment_ids*/?) +` + +type FetchPaymentsByIDsRow struct { + Payment Payment + IntentType sql.NullInt16 + IntentPayload []byte +} + +func (q *Queries) FetchPaymentsByIDs(ctx context.Context, paymentIds []int64) ([]FetchPaymentsByIDsRow, error) { + query := fetchPaymentsByIDs + var queryParams []interface{} + if len(paymentIds) > 0 { + for _, v := range paymentIds { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:payment_ids*/?", makeQueryParams(len(queryParams), len(paymentIds)), 1) + } else { + query = strings.Replace(query, "/*SLICE:payment_ids*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchPaymentsByIDsRow + for rows.Next() { + var i FetchPaymentsByIDsRow + if err := rows.Scan( + &i.Payment.ID, + &i.Payment.IntentID, + &i.Payment.AmountMsat, + &i.Payment.CreatedAt, + &i.Payment.PaymentIdentifier, + &i.Payment.FailReason, + &i.IntentType, + &i.IntentPayload, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchRouteLevelFirstHopCustomRecords = `-- name: FetchRouteLevelFirstHopCustomRecords :many +SELECT + l.id, + l.htlc_attempt_index, + l.key, + l.value +FROM payment_attempt_first_hop_custom_records l +WHERE l.htlc_attempt_index IN (/*SLICE:htlc_attempt_indices*/?) +` + +func (q *Queries) FetchRouteLevelFirstHopCustomRecords(ctx context.Context, htlcAttemptIndices []int64) ([]PaymentAttemptFirstHopCustomRecord, error) { + query := fetchRouteLevelFirstHopCustomRecords + var queryParams []interface{} + if len(htlcAttemptIndices) > 0 { + for _, v := range htlcAttemptIndices { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:htlc_attempt_indices*/?", makeQueryParams(len(queryParams), len(htlcAttemptIndices)), 1) + } else { + query = strings.Replace(query, "/*SLICE:htlc_attempt_indices*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []PaymentAttemptFirstHopCustomRecord + for rows.Next() { + var i PaymentAttemptFirstHopCustomRecord + if err := rows.Scan( + &i.ID, + &i.HtlcAttemptIndex, + &i.Key, + &i.Value, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const filterPayments = `-- name: FilterPayments :many +/* ───────────────────────────────────────────── + fetch queries + ───────────────────────────────────────────── +*/ + +SELECT + p.id, p.intent_id, p.amount_msat, p.created_at, p.payment_identifier, p.fail_reason, + i.intent_type AS "intent_type", + i.intent_payload AS "intent_payload" +FROM payments p +LEFT JOIN payment_intents i ON i.id = p.intent_id +WHERE ( + p.id > $1 OR + $1 IS NULL +) AND ( + p.id < $2 OR + $2 IS NULL +) AND ( + p.created_at >= $3 OR + $3 IS NULL +) AND ( + p.created_at <= $4 OR + $4 IS NULL +) AND ( + i.intent_type = $5 OR + $5 IS NULL OR i.intent_type IS NULL +) +ORDER BY + CASE WHEN $6 = false OR $6 IS NULL THEN p.id END ASC, + CASE WHEN $6 = true THEN p.id END DESC +LIMIT $7 +` + +type FilterPaymentsParams struct { + IndexOffsetGet sql.NullInt64 + IndexOffsetLet sql.NullInt64 + CreatedAfter sql.NullTime + CreatedBefore sql.NullTime + IntentType sql.NullInt16 + Reverse interface{} + NumLimit int32 +} + +type FilterPaymentsRow struct { + Payment Payment + IntentType sql.NullInt16 + IntentPayload []byte +} + +func (q *Queries) FilterPayments(ctx context.Context, arg FilterPaymentsParams) ([]FilterPaymentsRow, error) { + rows, err := q.db.QueryContext(ctx, filterPayments, + arg.IndexOffsetGet, + arg.IndexOffsetLet, + arg.CreatedAfter, + arg.CreatedBefore, + arg.IntentType, + arg.Reverse, + arg.NumLimit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FilterPaymentsRow + for rows.Next() { + var i FilterPaymentsRow + if err := rows.Scan( + &i.Payment.ID, + &i.Payment.IntentID, + &i.Payment.AmountMsat, + &i.Payment.CreatedAt, + &i.Payment.PaymentIdentifier, + &i.Payment.FailReason, + &i.IntentType, + &i.IntentPayload, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 0087559be8..5024e32b63 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -14,6 +14,7 @@ type Querier interface { AddSourceNode(ctx context.Context, nodeID int64) error AddV1ChannelProof(ctx context.Context, arg AddV1ChannelProofParams) (sql.Result, error) ClearKVInvoiceHashIndex(ctx context.Context) error + CountPayments(ctx context.Context) (int64, error) CountZombieChannels(ctx context.Context, version int16) (int64, error) CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error) DeleteCanceledInvoices(ctx context.Context) (sql.Result, error) @@ -30,8 +31,20 @@ type Querier interface { DeleteZombieChannel(ctx context.Context, arg DeleteZombieChannelParams) (sql.Result, error) FetchAMPSubInvoiceHTLCs(ctx context.Context, arg FetchAMPSubInvoiceHTLCsParams) ([]FetchAMPSubInvoiceHTLCsRow, error) FetchAMPSubInvoices(ctx context.Context, arg FetchAMPSubInvoicesParams) ([]AmpSubInvoice, error) + // Fetch all inflight attempts across all payments + FetchAllInflightAttempts(ctx context.Context) ([]PaymentHtlcAttempt, error) + FetchHopLevelCustomRecords(ctx context.Context, hopIds []int64) ([]PaymentHopCustomRecord, error) + FetchHopsForAttempt(ctx context.Context, htlcAttemptIndex int64) ([]FetchHopsForAttemptRow, error) + FetchHopsForAttempts(ctx context.Context, htlcAttemptIndices []int64) ([]FetchHopsForAttemptsRow, error) + // This fetches all htlc attempts for a payment. + FetchHtlcAttemptsForPayment(ctx context.Context, paymentID int64) ([]FetchHtlcAttemptsForPaymentRow, error) + FetchPayment(ctx context.Context, paymentIdentifier []byte) (FetchPaymentRow, error) + FetchPaymentLevelFirstHopCustomRecords(ctx context.Context, paymentID int64) ([]PaymentFirstHopCustomRecord, error) + FetchPaymentsByIDs(ctx context.Context, paymentIds []int64) ([]FetchPaymentsByIDsRow, error) + FetchRouteLevelFirstHopCustomRecords(ctx context.Context, htlcAttemptIndices []int64) ([]PaymentAttemptFirstHopCustomRecord, error) FetchSettledAMPSubInvoices(ctx context.Context, arg FetchSettledAMPSubInvoicesParams) ([]FetchSettledAMPSubInvoicesRow, error) FilterInvoices(ctx context.Context, arg FilterInvoicesParams) ([]Invoice, error) + FilterPayments(ctx context.Context, arg FilterPaymentsParams) ([]FilterPaymentsRow, error) GetAMPInvoiceID(ctx context.Context, setID []byte) (int64, error) GetChannelAndNodesBySCID(ctx context.Context, arg GetChannelAndNodesBySCIDParams) (GetChannelAndNodesBySCIDRow, error) GetChannelByOutpointWithPolicies(ctx context.Context, arg GetChannelByOutpointWithPoliciesParams) (GetChannelByOutpointWithPoliciesRow, error) diff --git a/sqldb/sqlc/queries/payments.sql b/sqldb/sqlc/queries/payments.sql new file mode 100644 index 0000000000..034864e09a --- /dev/null +++ b/sqldb/sqlc/queries/payments.sql @@ -0,0 +1,174 @@ +/* ───────────────────────────────────────────── + fetch queries + ───────────────────────────────────────────── +*/ + +-- name: FilterPayments :many +SELECT + sqlc.embed(p), + i.intent_type AS "intent_type", + i.intent_payload AS "intent_payload" +FROM payments p +LEFT JOIN payment_intents i ON i.id = p.intent_id +WHERE ( + p.id > sqlc.narg('index_offset_get') OR + sqlc.narg('index_offset_get') IS NULL +) AND ( + p.id < sqlc.narg('index_offset_let') OR + sqlc.narg('index_offset_let') IS NULL +) AND ( + p.created_at >= sqlc.narg('created_after') OR + sqlc.narg('created_after') IS NULL +) AND ( + p.created_at <= sqlc.narg('created_before') OR + sqlc.narg('created_before') IS NULL +) AND ( + i.intent_type = sqlc.narg('intent_type') OR + sqlc.narg('intent_type') IS NULL OR i.intent_type IS NULL +) +ORDER BY + CASE WHEN sqlc.narg('reverse') = false OR sqlc.narg('reverse') IS NULL THEN p.id END ASC, + CASE WHEN sqlc.narg('reverse') = true THEN p.id END DESC +LIMIT @num_limit; + +-- name: FetchPayment :one +SELECT + sqlc.embed(p), + i.intent_type AS "intent_type", + i.intent_payload AS "intent_payload" +FROM payments p +LEFT JOIN payment_intents i ON i.id = p.intent_id +WHERE p.payment_identifier = $1; + +-- name: FetchPaymentsByIDs :many +SELECT + sqlc.embed(p), + i.intent_type AS "intent_type", + i.intent_payload AS "intent_payload" +FROM payments p +LEFT JOIN payment_intents i ON i.id = p.intent_id +WHERE p.id IN (sqlc.slice('payment_ids')/*SLICE:payment_ids*/); + +-- name: CountPayments :one +SELECT COUNT(*) FROM payments; + + +-- This fetches all htlc attempts for a payment. +-- name: FetchHtlcAttemptsForPayment :many +SELECT + ha.id, + ha.attempt_index, + ha.payment_id, + ha.session_key, + ha.attempt_time, + ha.payment_hash, + ha.first_hop_amount_msat, + ha.route_total_time_lock, + ha.route_total_amount, + ha.route_source_key, + hr.resolution_type, + hr.resolution_time, + hr.failure_source_index, + hr.htlc_fail_reason, + hr.failure_msg, + hr.settle_preimage +FROM payment_htlc_attempts ha +LEFT JOIN payment_htlc_attempt_resolutions hr ON hr.attempt_index = ha.attempt_index +WHERE ha.payment_id = $1 +ORDER BY ha.attempt_time ASC; + +-- name: FetchAllInflightAttempts :many +-- Fetch all inflight attempts across all payments +SELECT + ha.id, + ha.attempt_index, + ha.payment_id, + ha.session_key, + ha.attempt_time, + ha.payment_hash, + ha.first_hop_amount_msat, + ha.route_total_time_lock, + ha.route_total_amount, + ha.route_source_key +FROM payment_htlc_attempts ha +WHERE NOT EXISTS ( + SELECT 1 FROM payment_htlc_attempt_resolutions hr + WHERE hr.attempt_index = ha.attempt_index +); + +-- name: FetchHopsForAttempt :many +SELECT + h.id, + h.htlc_attempt_index, + h.hop_index, + h.pub_key, + h.scid, + h.outgoing_time_lock, + h.amt_to_forward, + h.meta_data, + m.payment_addr AS mpp_payment_addr, + m.total_msat AS mpp_total_msat, + a.root_share AS amp_root_share, + a.set_id AS amp_set_id, + a.child_index AS amp_child_index, + b.encrypted_data, + b.blinding_point, + b.blinded_path_total_amt +FROM payment_route_hops h +LEFT JOIN payment_route_hop_mpp m ON m.hop_id = h.id +LEFT JOIN payment_route_hop_amp a ON a.hop_id = h.id +LEFT JOIN payment_route_hop_blinded b ON b.hop_id = h.id +WHERE h.htlc_attempt_index = $1 +ORDER BY h.hop_index ASC; + +-- name: FetchHopsForAttempts :many +SELECT + h.id, + h.htlc_attempt_index, + h.hop_index, + h.pub_key, + h.scid, + h.outgoing_time_lock, + h.amt_to_forward, + h.meta_data, + m.payment_addr AS mpp_payment_addr, + m.total_msat AS mpp_total_msat, + a.root_share AS amp_root_share, + a.set_id AS amp_set_id, + a.child_index AS amp_child_index, + b.encrypted_data, + b.blinding_point, + b.blinded_path_total_amt +FROM payment_route_hops h +LEFT JOIN payment_route_hop_mpp m ON m.hop_id = h.id +LEFT JOIN payment_route_hop_amp a ON a.hop_id = h.id +LEFT JOIN payment_route_hop_blinded b ON b.hop_id = h.id +WHERE h.htlc_attempt_index IN (sqlc.slice('htlc_attempt_indices')/*SLICE:htlc_attempt_indices*/); + +-- name: FetchPaymentLevelFirstHopCustomRecords :many +SELECT + l.id, + l.payment_id, + l.key, + l.value +FROM payment_first_hop_custom_records l +WHERE l.payment_id = $1; + +-- name: FetchRouteLevelFirstHopCustomRecords :many +SELECT + l.id, + l.htlc_attempt_index, + l.key, + l.value +FROM payment_attempt_first_hop_custom_records l +WHERE l.htlc_attempt_index IN (sqlc.slice('htlc_attempt_indices')/*SLICE:htlc_attempt_indices*/); + +-- name: FetchHopLevelCustomRecords :many +SELECT + l.id, + l.hop_id, + l.key, + l.value +FROM payment_hop_custom_records l +WHERE l.hop_id IN (sqlc.slice('hop_ids')/*SLICE:hop_ids*/); +