|
| 1 | +# Flow Controller Queue Plugins (`plugins/queue/`) |
| 2 | + |
| 3 | +This directory contains concrete implementations of the `framework.SafeQueue` interface. This contract defines core, |
| 4 | +self-contained queue data structures used by the `controller.FlowController`. |
| 5 | + |
| 6 | +## Overview |
| 7 | + |
| 8 | +The `controller.FlowController` manages requests by organizing them into queues. Each logical "flow" (e.g., a specific |
| 9 | +model or workload) within a given priority band has its own `ports.ManagedQueue` instance, which wraps a |
| 10 | +`framework.SafeQueue`. This design allows the `controller.FlowController` to apply policies at both the inter-flow |
| 11 | +(across different flows) and intra-flow (within a single flow's queue) levels. |
| 12 | + |
| 13 | +The `framework.SafeQueue` interface abstracts the underlying data structure and its ordering logic. This pluggable |
| 14 | +design allows for: |
| 15 | + |
| 16 | +- **Different Queuing Disciplines**: A basic FIFO queue ([`listqueue`](./listqueue/)) is provided, but other disciplines |
| 17 | + like priority queues ([`maxminheap`](./maxminheap/)) can be used for more complex ordering requirements. |
| 18 | +- **Specialized Capabilities**: Policies can declare `RequiredQueueCapabilities()` (e.g., `framework.CapabilityFIFO` or |
| 19 | + `framework.CapabilityPriorityConfigurable`). The `ports.FlowRegistry` pairs the policy with a queue that provides the |
| 20 | + necessary capabilities. |
| 21 | +- **Performance Optimization**: Different queue implementations offer varying performance characteristics, which can be |
| 22 | + compared using the centralized benchmark suite to select the best fit for a given workload. |
| 23 | + |
| 24 | +## Contributing a New `SafeQueue` Implementation |
| 25 | + |
| 26 | +To contribute a new queue implementation, follow these steps: |
| 27 | + |
| 28 | +1. **Define Your Implementation** |
| 29 | + - Create a new Go package in a subdirectory (e.g., `mycustomqueue/`). |
| 30 | + - Implement the `framework.SafeQueue` and `types.QueueItemHandle` interfaces. |
| 31 | + - Ensure all methods of `framework.SafeQueue` are goroutine-safe, typically by using a `sync.Mutex` or |
| 32 | + `sync.RWMutex`. |
| 33 | + - If your queue declares `framework.CapabilityPriorityConfigurable`, it MUST use the `framework.ItemComparator` |
| 34 | + passed to its constructor for all internal ordering logic. |
| 35 | + |
| 36 | +2. **Register Your Queue** |
| 37 | + - In an `init()` function within your queue's Go file, call `queue.MustRegisterQueue()` with a unique name and a |
| 38 | + constructor function that matches the `queue.QueueConstructor` signature. |
| 39 | + |
| 40 | +3. **Add to the Conformance Test** |
| 41 | + - Add a blank import for your new package to [`conformance_test.go`](./conformance_test.go). Your queue will then be |
| 42 | + automatically included in the conformance suite, which validates the `SafeQueue` contract. |
| 43 | + |
| 44 | +4. **Documentation** |
| 45 | + - Add GoDoc comments to your new queue type, explaining its behavior, capabilities, and any trade-offs. |
| 46 | + |
| 47 | +5. **Benchmarking** |
| 48 | + - You do not need to write custom benchmarks. The centralized suite in [`benchmark_test.go`](./benchmark_test.go) |
| 49 | + automatically includes any new queue implementation after it is registered. This ensures all queues are compared |
| 50 | + fairly under the same conditions. |
| 51 | + |
| 52 | +## Benchmarking Strategy and Results |
| 53 | + |
| 54 | +A centralized benchmark suite runs against all registered `SafeQueue` implementations to provide a consistent |
| 55 | +performance comparison. To run the benchmarks, use the following command: |
| 56 | + |
| 57 | +```sh |
| 58 | +go test -bench=. -benchmem ./pkg/epp/flowcontrol/framework/plugins/queue/... |
| 59 | +``` |
| 60 | + |
| 61 | +### Benchmark Scenarios |
| 62 | + |
| 63 | +The suite includes the following scenarios: |
| 64 | + |
| 65 | +- **`AddRemove`**: Measures throughput of tightly coupled `Add` and `Remove` operations under high parallelism. This |
| 66 | + tests the raw overhead of the data structure and its locking mechanism for simple, transactional workloads. |
| 67 | +- **`AddPeekRemove`**: Measures performance of a sequential `Add` -> `PeekHead` -> `Remove` loop. This simulates a |
| 68 | + common consumer pattern where a single worker inspects an item before processing it. |
| 69 | +- **`BulkAddThenBulkRemove`**: Tests performance of adding a large batch of items and then removing them all. This can |
| 70 | + reveal how the data structure's performance changes as it grows and shrinks under load. |
| 71 | +- **`HighContention`**: Simulates a realistic workload with multiple concurrent producers (adding items) and consumers |
| 72 | + (peeking and removing items) operating on the same queue. |
| 73 | + |
| 74 | +### Latest Results |
| 75 | + |
| 76 | +*Last Updated: 2025-07-10* |
| 77 | +*(CPU: AMD EPYC 7B12)* |
| 78 | + |
| 79 | +| Benchmark | Implementation | Iterations | ns/op | B/op | allocs/op | |
| 80 | +| --------------------------- | -------------- | ---------- | ------- | ----- | --------- | |
| 81 | +| **AddRemove** | `ListQueue` | 1,889,844 | 609.0 | 224 | 5 | |
| 82 | +| | `MaxMinHeap` | 1,660,987 | 696.7 | 184 | 4 | |
| 83 | +| **AddPeekRemove** | `ListQueue` | 3,884,938 | 298.0 | 224 | 5 | |
| 84 | +| | `MaxMinHeap` | 1,857,448 | 615.9 | 184 | 4 | |
| 85 | +| **AddPeekTailRemove** | `ListQueue` | 3,576,487 | 308.4 | 224 | 5 | |
| 86 | +| | `MaxMinHeap` | 2,113,134 | 535.3 | 184 | 4 | |
| 87 | +| **BulkAddThenBulkRemove** | `ListQueue` | 24,032 | 49,861 | 24801 | 698 | |
| 88 | +| | `MaxMinHeap` | 10,000 | 108,868 | 20787 | 597 | |
| 89 | +| **HighContention** | `ListQueue` | 484,574 | 2,328 | 896 | 20 | |
| 90 | +| | `MaxMinHeap` | 84,806 | 18,679 | 783 | 16 | |
| 91 | + |
| 92 | +### Interpretation of Results |
| 93 | + |
| 94 | +The benchmark results highlight the trade-offs between the different queue implementations based on their underlying |
| 95 | +data structures: |
| 96 | + |
| 97 | +- **`ListQueue`**: As a linked list, it excels in scenarios involving frequent additions or removals from either end of |
| 98 | + the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. Its performance is less competitive in |
| 99 | + high-contention and bulk scenarios, which reflects the necessary per-item memory allocation and pointer manipulation |
| 100 | + overhead. |
| 101 | +- **`MaxMinHeap`**: As a slice-based heap, it has a lower allocation overhead per operation, making it efficient for |
| 102 | + high-throughput `AddRemove` cycles. Peeking and removing items involves maintaining the heap property, which has an |
| 103 | + O(log n) cost, making individual peek operations slower than `ListQueue`. |
| 104 | + |
| 105 | +**Choosing a Queue:** |
| 106 | + |
| 107 | +The data suggests the following guidance: |
| 108 | +- For simple **FIFO** workloads where the primary operations are consuming from the head, `ListQueue` is a strong and |
| 109 | + simple choice. |
| 110 | +- For workloads requiring **priority-based ordering** or those that are sensitive to allocation overhead under high |
| 111 | + contention, `MaxMinHeap` is likely the more suitable option. |
| 112 | + |
| 113 | +These benchmarks provide a baseline for performance. The best choice for a specific use case will depend on the expected |
| 114 | +workload patterns. |
0 commit comments