@@ -95,7 +95,9 @@ type BoundedAckCache[T AckCacheItem] struct {
9595 lastAck int64
9696 currSize uint64
9797
98- logger log.Logger
98+ logger log.Logger
99+ budgetManager Manager
100+ cacheID string
99101}
100102
101103// NewBoundedAckCache creates a new bounded ack cache with the specified capacity limits.
@@ -104,20 +106,28 @@ type BoundedAckCache[T AckCacheItem] struct {
104106// - maxCount: maximum number of items (dynamic property)
105107// - maxSize: maximum total byte size (dynamic property)
106108// - logger: optional logger for diagnostics (can be nil)
109+ // - budgetManager: optional budget manager for host-level capacity tracking (can be nil)
110+ // - cacheID: cache identifier for budget manager (required if budgetManager is provided)
107111//
108112// The cache will reject new items when either limit would be exceeded.
113+ // If a budget manager is provided, Put and Ack operations will automatically
114+ // reserve and release capacity through the budget manager.
109115func NewBoundedAckCache [T AckCacheItem ](
110116 maxCount dynamicproperties.IntPropertyFn ,
111117 maxSize dynamicproperties.IntPropertyFn ,
112118 logger log.Logger ,
119+ budgetManager Manager ,
120+ cacheID string ,
113121) AckCache [T ] {
114122 initialCount := maxCount ()
115123 return & BoundedAckCache [T ]{
116- maxCount : maxCount ,
117- maxSize : maxSize ,
118- order : make (sequenceHeap [T ], 0 , initialCount ),
119- cache : make (map [int64 ]T , initialCount ),
120- logger : logger ,
124+ maxCount : maxCount ,
125+ maxSize : maxSize ,
126+ order : make (sequenceHeap [T ], 0 , initialCount ),
127+ cache : make (map [int64 ]T , initialCount ),
128+ logger : logger ,
129+ budgetManager : budgetManager ,
130+ cacheID : cacheID ,
121131 }
122132}
123133
@@ -145,12 +155,13 @@ func (c *BoundedAckCache[T]) Put(item T, size uint64) error {
145155 return ErrAckCacheFull
146156 }
147157
148- // Add to both heap and map
149- c .cache [sequenceID ] = item
150- heap .Push (& c .order , heapItem [T ]{sequenceID : sequenceID , size : size })
151- c .currSize += size
158+ if c .budgetManager != nil {
159+ return c .budgetManager .ReserveWithCallback (c .cacheID , size , 1 , func () error {
160+ return c .putInternal (item , sequenceID , size )
161+ })
162+ }
152163
153- return nil
164+ return c . putInternal ( item , sequenceID , size )
154165}
155166
156167// Get retrieves an item by sequence ID.
@@ -166,20 +177,21 @@ func (c *BoundedAckCache[T]) Ack(level int64) (uint64, int) {
166177 c .mu .Lock ()
167178 defer c .mu .Unlock ()
168179
169- var freedSize uint64
170- var removedCount int
171-
172- // Remove all items from heap with sequence ID <= level
173- for c .order .Len () > 0 && c .order .Peek ().sequenceID <= level {
174- item := heap .Pop (& c .order ).(heapItem [T ])
175- delete (c .cache , item .sequenceID )
176- c .currSize -= item .size
177- freedSize += item .size
178- removedCount ++
180+ if c .budgetManager != nil {
181+ var freedSize uint64
182+ var removedCount int64
183+ err := c .budgetManager .ReleaseWithCallback (c .cacheID , func () (uint64 , int64 , error ) {
184+ freedSize , removedCount = c .ackInternal (level )
185+ return freedSize , removedCount , nil
186+ })
187+ if err != nil {
188+ return 0 , 0
189+ }
190+ return freedSize , int (removedCount )
179191 }
180192
181- c . lastAck = level
182- return freedSize , removedCount
193+ freedSize , removedCount := c . ackInternal ( level )
194+ return freedSize , int ( removedCount )
183195}
184196
185197// Size returns current total byte size.
@@ -198,6 +210,31 @@ func (c *BoundedAckCache[T]) Count() int {
198210 return len (c .order )
199211}
200212
213+ // putInternal adds an item to the cache. Caller must hold the lock.
214+ func (c * BoundedAckCache [T ]) putInternal (item T , sequenceID int64 , size uint64 ) error {
215+ c .cache [sequenceID ] = item
216+ heap .Push (& c .order , heapItem [T ]{sequenceID : sequenceID , size : size })
217+ c .currSize += size
218+ return nil
219+ }
220+
221+ // ackInternal removes all items with sequence ID <= level. Caller must hold the lock.
222+ func (c * BoundedAckCache [T ]) ackInternal (level int64 ) (uint64 , int64 ) {
223+ var freedSize uint64
224+ var removedCount int64
225+
226+ for c .order .Len () > 0 && c .order .Peek ().sequenceID <= level {
227+ item := heap .Pop (& c .order ).(heapItem [T ])
228+ delete (c .cache , item .sequenceID )
229+ c .currSize -= item .size
230+ freedSize += item .size
231+ removedCount ++
232+ }
233+
234+ c .lastAck = level
235+ return freedSize , removedCount
236+ }
237+
201238// heapItem represents an item in the sequence heap
202239type heapItem [T AckCacheItem ] struct {
203240 sequenceID int64
0 commit comments