Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions channeldb/forwarding_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,150 @@
return resp, nil
}

// DeleteStats contains statistics about a forwarding history deletion
// operation.
type DeleteStats struct {
// NumEventsDeleted is the total number of forwarding events that were
// deleted from the database.
NumEventsDeleted uint64

// TotalFeeMsat is the sum of all fees (AmtIn - AmtOut) from the
// deleted events, expressed in millisatoshis.
TotalFeeMsat int64
}

// DeleteForwardingEvents deletes all forwarding events older than the specified
// endTime from the database. The deletion is performed in batches to avoid
Comment on lines +431 to +432

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function comment states that it deletes events "older than" endTime. However, the implementation and tests show that it deletes events with a timestamp less than or equal to endTime. To avoid confusion, it would be clearer to state that it deletes events "at or before" the specified endTime.

Suggested change
// DeleteForwardingEvents deletes all forwarding events older than the specified
// endTime from the database. The deletion is performed in batches to avoid
// DeleteForwardingEvents deletes all forwarding events with a timestamp at or
// before the specified endTime from the database. The deletion is performed

// holding large database transactions. This method returns statistics about the
// deletion including the number of events deleted and the total fees earned
// from those events.
//
// The batchSize parameter controls how many events are deleted per database
// transaction. If batchSize is 0, a default of 10000 is used. The maximum
// allowed batch size is MaxResponseEvents (50000) to prevent resource exhaustion.

Check failure on line 439 in channeldb/forwarding_log.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 82 characters long, which exceeds the maximum of 80 characters. (ll)
func (f *ForwardingLog) DeleteForwardingEvents(endTime time.Time,
batchSize int) (DeleteStats, error) {

// Set default batch size if not specified, and enforce maximum.
if batchSize <= 0 {
batchSize = 10000
}
if batchSize > MaxResponseEvents {
batchSize = MaxResponseEvents
}

var stats DeleteStats

// We'll continue deleting batches until there are no more events to
// delete.
for {
var (
batchDeleted int
batchFees int64
)

err := kvdb.Update(f.db, func(tx kvdb.RwTx) error {
// Fetch the forwarding log bucket. If it doesn't exist,
// there's nothing to delete.
logBucket := tx.ReadWriteBucket(forwardingLogBucket)
if logBucket == nil {
return ErrNoForwardingEvents
}

// We'll use a cursor to iterate through events in time
// order.
cursor := logBucket.ReadWriteCursor()

// Next, encode the end time as our upper bound for
// deletion.
var endTimeBytes [8]byte
byteOrder.PutUint64(
endTimeBytes[:], uint64(endTime.UnixNano()),
)

// Collect keys to delete in this batch. We can't delete
// while iterating as it may corrupt the cursor.
keysToDelete := make([][]byte, 0, batchSize)

// Seek to the beginning and iterate through events
// until we reach the end time or batch limit.
//
//nolint:lll
for timestamp, eventBytes := cursor.First(); timestamp != nil; timestamp, eventBytes = cursor.Next() {

Check failure on line 488 in channeldb/forwarding_log.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 126 characters long, which exceeds the maximum of 80 characters. (ll)
// Stop if we've reached or passed the end time.
if bytes.Compare(
timestamp, endTimeBytes[:],
) > 0 {
break
}

// Stop if we've reached the batch size limit.
if len(keysToDelete) >= batchSize {
break
}

// Decode the event, as we need to parse it to
// obtain the fee.
readBuf := bytes.NewReader(eventBytes)
if readBuf.Len() > 0 {
var event ForwardingEvent
err := decodeForwardingEvent(
readBuf, &event,
)
if err != nil {
return err
}

// Calculate the fee for this event. Fee
// is the difference between incoming
// and outgoing amounts.
fee := int64(event.AmtIn - event.AmtOut)
batchFees += fee
}

// Make a copy of the key to delete later.
keyCopy := make([]byte, len(timestamp))
copy(keyCopy, timestamp)
keysToDelete = append(keysToDelete, keyCopy)
}

// Now delete all the collected keys.
for _, key := range keysToDelete {
if err := logBucket.Delete(key); err != nil {
return err
}
}

batchDeleted = len(keysToDelete)
return nil
}, func() {
batchDeleted = 0
batchFees = 0
})

if err != nil {
// If the bucket doesn't exist, we're done.
if err == ErrNoForwardingEvents {

Check failure on line 542 in channeldb/forwarding_log.go

View workflow job for this annotation

GitHub Actions / Lint code

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
return stats, err
}

// Update our running statistics.
stats.NumEventsDeleted += uint64(batchDeleted)
stats.TotalFeeMsat += batchFees

// If we deleted fewer events than the batch size, we're done.
if batchDeleted < batchSize {
break
}

// Otherwise, continue with the next batch.
}

return stats, nil
}

// makeUniqueTimestamps takes a slice of forwarding events, sorts it by the
// event timestamps and then makes sure there are no duplicates in the
// timestamps. If duplicates are found, some of the timestamps are increased on
Expand Down
Loading
Loading