|
7 | 7 | "errors" |
8 | 8 | "fmt" |
9 | 9 | "reflect" |
| 10 | + "slices" |
10 | 11 | "sort" |
11 | 12 | "strings" |
12 | 13 | "sync" |
@@ -563,31 +564,67 @@ func (s *swapClientServer) ListSwaps(ctx context.Context, |
563 | 564 | req *looprpc.ListSwapsRequest) (*looprpc.ListSwapsResponse, error) { |
564 | 565 |
|
565 | 566 | var ( |
566 | | - rpcSwaps = []*looprpc.SwapStatus{} |
567 | | - idx = 0 |
| 567 | + rpcSwaps = []*looprpc.SwapStatus{} |
| 568 | + maxSwaps = int(req.MaxSwaps) |
| 569 | + nextStartTime = int64(0) |
| 570 | + canPage = false |
568 | 571 | ) |
569 | 572 |
|
570 | 573 | s.swapsLock.Lock() |
571 | 574 | defer s.swapsLock.Unlock() |
572 | 575 |
|
| 576 | + // Convert the swaps map into a slice which can be ordered. |
| 577 | + var swapList []loop.SwapInfo |
| 578 | + for _, swp := range s.swaps { |
| 579 | + swapList = append(swapList, swp) |
| 580 | + } |
| 581 | + |
| 582 | + // Sort all swaps by a consistent field, InitiationTime. |
| 583 | + slices.SortFunc(swapList, func(a, b loop.SwapInfo) int { |
| 584 | + // For descending order (newest first) |
| 585 | + if a.InitiationTime.UnixNano() > b.InitiationTime.UnixNano() { |
| 586 | + return 1 |
| 587 | + } |
| 588 | + if a.InitiationTime.UnixNano() < b.InitiationTime.UnixNano() { |
| 589 | + return -1 |
| 590 | + } |
| 591 | + return 0 |
| 592 | + }) |
573 | 593 | // We can just use the server's in-memory cache as that contains the |
574 | 594 | // most up-to-date state including temporary failures which aren't |
575 | 595 | // persisted to disk. The swaps field is a map, that's why we need an |
576 | 596 | // additional index. |
577 | | - for _, swp := range s.swaps { |
| 597 | + for _, swp := range swapList { |
578 | 598 | // Filter the swap based on the provided filter. |
579 | 599 | if !filterSwap(&swp, req.ListSwapFilter) { |
580 | 600 | continue |
581 | 601 | } |
582 | 602 |
|
583 | | - rpcSwap, err := s.marshallSwap(ctx, &swp) |
584 | | - if err != nil { |
585 | | - return nil, err |
| 603 | + // Check if this swap is within our pagination window. |
| 604 | + // If maxSwaps is 0, we return all swaps. |
| 605 | + // Otherwise, check if we've reached our limit. |
| 606 | + if maxSwaps == 0 || len(rpcSwaps) < maxSwaps { |
| 607 | + rpcSwap, err := s.marshallSwap(ctx, &swp) |
| 608 | + if err != nil { |
| 609 | + return nil, err |
| 610 | + } |
| 611 | + rpcSwaps = append(rpcSwaps, rpcSwap) |
| 612 | + } else { |
| 613 | + // Indicate that paging is enabled, and exit the loop. |
| 614 | + canPage = true |
| 615 | + break |
586 | 616 | } |
587 | | - rpcSwaps = append(rpcSwaps, rpcSwap) |
588 | | - idx++ |
589 | 617 | } |
590 | | - return &looprpc.ListSwapsResponse{Swaps: rpcSwaps}, nil |
| 618 | + |
| 619 | + if canPage { |
| 620 | + nextStartTime = rpcSwaps[len(rpcSwaps)-1].InitiationTime + 1 |
| 621 | + } |
| 622 | + |
| 623 | + response := looprpc.ListSwapsResponse{ |
| 624 | + Swaps: rpcSwaps, |
| 625 | + NextStartTime: nextStartTime, |
| 626 | + } |
| 627 | + return &response, nil |
591 | 628 | } |
592 | 629 |
|
593 | 630 | // filterSwap filters the given swap based on the provided filter. |
@@ -617,6 +654,11 @@ func filterSwap(swapInfo *loop.SwapInfo, filter *looprpc.ListSwapsFilter) bool { |
617 | 654 | return false |
618 | 655 | } |
619 | 656 |
|
| 657 | + // If timestamp filters are set, only return swaps within the specified time range. |
| 658 | + if filter.StartTimestampNs > 0 && swapInfo.InitiationTime.UnixNano() < filter.StartTimestampNs { |
| 659 | + return false |
| 660 | + } |
| 661 | + |
620 | 662 | // If the swap is of type loop out and the outgoing channel filter is |
621 | 663 | // set, we only return swaps that match the filter. |
622 | 664 | if swapInfo.SwapType == swap.TypeOut && filter.OutgoingChanSet != nil { |
|
0 commit comments