Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/loop/swaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ var listSwapsCommand = cli.Command{
labelFlag,
channelFlag,
lastHopFlag,
cli.Uint64Flag{
Name: "max_swaps",
Usage: "Max number of swaps to return after filtering",
},
cli.Int64Flag{
Name: "start_time_ns",
Usage: "Unix timestamp in nanoseconds to select swaps initiated " +
"after this time",
},
},
}

Expand Down Expand Up @@ -99,9 +108,19 @@ func listSwaps(ctx *cli.Context) error {
filter.Label = ctx.String(labelFlag.Name)
}

// Parse start timestamp if set.
if ctx.IsSet("start_time_ns") {
startTimestamp, err := strconv.ParseInt(ctx.String("start_time_ns"), 10, 64)
if err != nil {
return fmt.Errorf("error parsing start timestamp: %w", err)
}
filter.StartTimestampNs = startTimestamp
}

resp, err := client.ListSwaps(
context.Background(), &looprpc.ListSwapsRequest{
ListSwapFilter: filter,
MaxSwaps: ctx.Uint64("max_swaps"),
},
)
if err != nil {
Expand Down
51 changes: 46 additions & 5 deletions loopd/swapclient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package loopd

import (
"bytes"
"cmp"
"context"
"encoding/hex"
"errors"
"fmt"
"reflect"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -563,8 +565,11 @@ func (s *swapClientServer) ListSwaps(ctx context.Context,
req *looprpc.ListSwapsRequest) (*looprpc.ListSwapsResponse, error) {

var (
rpcSwaps = []*looprpc.SwapStatus{}
idx = 0
rpcSwaps = []*looprpc.SwapStatus{}
swapInfos = []*loop.SwapInfo{}
maxSwaps = int(req.MaxSwaps)
nextStartTime = int64(0)
canPage = false
)

s.swapsLock.Lock()
Expand All @@ -580,14 +585,43 @@ func (s *swapClientServer) ListSwaps(ctx context.Context,
continue
}

rpcSwap, err := s.marshallSwap(ctx, &swp)
swapInfos = append(swapInfos, &swp)
}

// Sort the swaps by initiation time in ascending order (oldest first).
slices.SortFunc(swapInfos, func(a, b *loop.SwapInfo) int {
return cmp.Compare(
a.InitiationTime.UnixNano(),
b.InitiationTime.UnixNano(),
)
})

// Apply the maxSwaps limit if specified.
if maxSwaps > 0 && len(swapInfos) > maxSwaps {
canPage = true
swapInfos = swapInfos[:maxSwaps]
}

// Marshal the filtered and limited swaps.
for _, swp := range swapInfos {
rpcSwap, err := s.marshallSwap(ctx, swp)
if err != nil {
return nil, err
}
rpcSwaps = append(rpcSwaps, rpcSwap)
idx++
}
return &looprpc.ListSwapsResponse{Swaps: rpcSwaps}, nil

// Set the next start time for pagination if needed.
if canPage && len(rpcSwaps) > 0 {
// Use the initiation time of the last swap plus 1 nanosecond.
nextStartTime = rpcSwaps[len(rpcSwaps)-1].InitiationTime + 1
}

response := looprpc.ListSwapsResponse{
Swaps: rpcSwaps,
NextStartTime: nextStartTime,
}
return &response, nil
}

// filterSwap filters the given swap based on the provided filter.
Expand Down Expand Up @@ -617,6 +651,13 @@ func filterSwap(swapInfo *loop.SwapInfo, filter *looprpc.ListSwapsFilter) bool {
return false
}

// If timestamp filters are set, only return swaps within the specified time range.
if filter.StartTimestampNs > 0 &&
swapInfo.InitiationTime.UnixNano() < filter.StartTimestampNs {

return false
}

// If the swap is of type loop out and the outgoing channel filter is
// set, we only return swaps that match the filter.
if swapInfo.SwapType == swap.TypeOut && filter.OutgoingChanSet != nil {
Expand Down
Loading