2222import io .opentelemetry .api .OpenTelemetry ;
2323import io .opentelemetry .api .metrics .ObservableLongCounter ;
2424import io .prometheus .client .Gauge ;
25- import lombok .AllArgsConstructor ;
26- import lombok .ToString ;
25+ import java .util .Optional ;
26+ import java .util .Queue ;
27+ import java .util .concurrent .ScheduledExecutorService ;
28+ import java .util .concurrent .TimeUnit ;
29+ import java .util .function .Consumer ;
2730import lombok .extern .slf4j .Slf4j ;
2831import org .apache .pulsar .opentelemetry .Constants ;
2932import org .apache .pulsar .opentelemetry .OpenTelemetryAttributes .InflightReadLimiterUtilization ;
3033import org .apache .pulsar .opentelemetry .annotations .PulsarDeprecatedMetric ;
34+ import org .jctools .queues .SpscArrayQueue ;
3135
3236@ Slf4j
3337public class InflightReadsLimiter implements AutoCloseable {
@@ -58,16 +62,36 @@ public class InflightReadsLimiter implements AutoCloseable {
5862
5963 private final long maxReadsInFlightSize ;
6064 private long remainingBytes ;
65+ private final long acquireTimeoutMillis ;
66+ private final ScheduledExecutorService timeOutExecutor ;
67+ private final boolean enabled ;
6168
62- public InflightReadsLimiter (long maxReadsInFlightSize , OpenTelemetry openTelemetry ) {
63- if (maxReadsInFlightSize <= 0 ) {
69+ record Handle (long permits , long creationTime , boolean success ) {
70+ }
71+
72+ record QueuedHandle (Handle handle , Consumer <Handle > callback ) {
73+ }
74+
75+ private final Queue <QueuedHandle > queuedHandles ;
76+ private boolean timeoutCheckRunning = false ;
77+
78+ public InflightReadsLimiter (long maxReadsInFlightSize , int maxReadsInFlightAcquireQueueSize ,
79+ long acquireTimeoutMillis , ScheduledExecutorService timeOutExecutor ,
80+ OpenTelemetry openTelemetry ) {
81+ this .maxReadsInFlightSize = maxReadsInFlightSize ;
82+ this .remainingBytes = maxReadsInFlightSize ;
83+ this .acquireTimeoutMillis = acquireTimeoutMillis ;
84+ this .timeOutExecutor = timeOutExecutor ;
85+ if (maxReadsInFlightSize > 0 ) {
86+ enabled = true ;
87+ this .queuedHandles = new SpscArrayQueue <>(maxReadsInFlightAcquireQueueSize );
88+ } else {
89+ enabled = false ;
90+ this .queuedHandles = null ;
6491 // set it to -1 in order to show in the metrics that the metric is not available
6592 PULSAR_ML_READS_BUFFER_SIZE .set (-1 );
6693 PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE .set (-1 );
6794 }
68- this .maxReadsInFlightSize = maxReadsInFlightSize ;
69- this .remainingBytes = maxReadsInFlightSize ;
70-
7195 var meter = openTelemetry .getMeter (Constants .BROKER_INSTRUMENTATION_SCOPE_NAME );
7296 inflightReadsLimitCounter = meter .counterBuilder (INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME )
7397 .setDescription ("Maximum number of bytes that can be retained by managed ledger data read from storage "
@@ -102,70 +126,178 @@ public void close() {
102126 inflightReadsUsageCounter .close ();
103127 }
104128
105- @ AllArgsConstructor
106- @ ToString
107- static class Handle {
108- final long acquiredPermits ;
109- final boolean success ;
110- final int trials ;
129+ private static final Handle DISABLED = new Handle (0 , 0 , true );
130+ private static final Optional <Handle > DISABLED_OPTIONAL = Optional .of (DISABLED );
111131
112- final long creationTime ;
132+ /**
133+ * Acquires permits from the limiter. If the limiter is disabled, it will immediately return a successful handle.
134+ * If permits are available, it will return a handle with the acquired permits. If no permits are available,
135+ * it will return an empty optional and the callback will be called when permits become available or when the
136+ * acquire timeout is reached. The success field in the handle passed to the callback will be false if the acquire
137+ * operation times out. The callback should be non-blocking and run on a desired executor handled within the
138+ * callback itself.
139+ *
140+ * A successful handle will have the success field set to true, and the caller must call release with the handle
141+ * when the permits are no longer needed.
142+ *
143+ * If an unsuccessful handle is returned immediately, it means that the queue limit has been reached and the
144+ * callback will not be called. The caller should fail the read operation in this case to apply backpressure.
145+ *
146+ * @param permits the number of permits to acquire
147+ * @param callback the callback to be called when the permits are acquired or timed out
148+ * @return an optional handle that contains the permits if acquired, otherwise an empty optional
149+ */
150+ public Optional <Handle > acquire (long permits , Consumer <Handle > callback ) {
151+ if (isDisabled ()) {
152+ return DISABLED_OPTIONAL ;
153+ }
154+ return internalAcquire (permits , callback );
113155 }
114156
115- private static final Handle DISABLED = new Handle (0 , true , 0 , -1 );
157+ private synchronized Optional <Handle > internalAcquire (long permits , Consumer <Handle > callback ) {
158+ Handle handle = new Handle (permits , System .currentTimeMillis (), true );
159+ if (remainingBytes >= permits ) {
160+ remainingBytes -= permits ;
161+ if (log .isDebugEnabled ()) {
162+ log .debug ("acquired permits: {}, creationTime: {}, remainingBytes:{}" , permits , handle .creationTime ,
163+ remainingBytes );
164+ }
165+ updateMetrics ();
166+ return Optional .of (handle );
167+ } else if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize ) {
168+ remainingBytes = 0 ;
169+ if (log .isInfoEnabled ()) {
170+ log .info ("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
171+ + "Allowing request with permits set to maxReadsInFlightSize." ,
172+ permits , maxReadsInFlightSize , handle .creationTime , remainingBytes );
173+ }
174+ updateMetrics ();
175+ return Optional .of (new Handle (maxReadsInFlightSize , handle .creationTime , true ));
176+ } else {
177+ if (queuedHandles .offer (new QueuedHandle (handle , callback ))) {
178+ scheduleTimeOutCheck (acquireTimeoutMillis );
179+ return Optional .empty ();
180+ } else {
181+ log .warn ("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}" ,
182+ permits , handle .creationTime , remainingBytes );
183+ return Optional .of (new Handle (0 , handle .creationTime , false ));
184+ }
185+ }
186+ }
116187
117- Handle acquire (long permits , Handle current ) {
118- if (maxReadsInFlightSize <= 0 ) {
119- // feature is disabled
120- return DISABLED ;
188+ private synchronized void scheduleTimeOutCheck (long delayMillis ) {
189+ if (acquireTimeoutMillis <= 0 ) {
190+ return ;
121191 }
122- synchronized (this ) {
123- try {
124- if (current == null ) {
125- if (remainingBytes == 0 ) {
126- return new Handle (0 , false , 1 , System .currentTimeMillis ());
127- }
128- if (remainingBytes >= permits ) {
129- remainingBytes -= permits ;
130- return new Handle (permits , true , 1 , System .currentTimeMillis ());
131- } else {
132- long possible = remainingBytes ;
133- remainingBytes = 0 ;
134- return new Handle (possible , false , 1 , System .currentTimeMillis ());
135- }
192+ if (!timeoutCheckRunning ) {
193+ timeoutCheckRunning = true ;
194+ timeOutExecutor .schedule (this ::timeoutCheck , delayMillis , TimeUnit .MILLISECONDS );
195+ }
196+ }
197+
198+ private synchronized void timeoutCheck () {
199+ timeoutCheckRunning = false ;
200+ long delay = 0 ;
201+ while (true ) {
202+ QueuedHandle queuedHandle = queuedHandles .peek ();
203+ if (queuedHandle != null ) {
204+ long age = System .currentTimeMillis () - queuedHandle .handle .creationTime ;
205+ if (age >= acquireTimeoutMillis ) {
206+ // remove the peeked handle from the queue
207+ queuedHandles .poll ();
208+ handleTimeout (queuedHandle );
136209 } else {
137- if (current .trials >= 4 && current .acquiredPermits > 0 ) {
138- remainingBytes += current .acquiredPermits ;
139- return new Handle (0 , false , 1 , current .creationTime );
140- }
141- if (remainingBytes == 0 ) {
142- return new Handle (current .acquiredPermits , false , current .trials + 1 ,
143- current .creationTime );
144- }
145- long needed = permits - current .acquiredPermits ;
146- if (remainingBytes >= needed ) {
147- remainingBytes -= needed ;
148- return new Handle (permits , true , current .trials + 1 , current .creationTime );
149- } else {
150- long possible = remainingBytes ;
151- remainingBytes = 0 ;
152- return new Handle (current .acquiredPermits + possible , false ,
153- current .trials + 1 , current .creationTime );
154- }
210+ delay = acquireTimeoutMillis - age ;
211+ break ;
155212 }
156- } finally {
157- updateMetrics () ;
213+ } else {
214+ break ;
158215 }
159216 }
217+ if (delay > 0 ) {
218+ scheduleTimeOutCheck (delay );
219+ }
220+ }
221+
222+ private void handleTimeout (QueuedHandle queuedHandle ) {
223+ if (log .isDebugEnabled ()) {
224+ log .debug ("timed out queued permits: {}, creationTime: {}, remainingBytes:{}" ,
225+ queuedHandle .handle .permits , queuedHandle .handle .creationTime , remainingBytes );
226+ }
227+ try {
228+ queuedHandle .callback .accept (new Handle (0 , queuedHandle .handle .creationTime , false ));
229+ } catch (Exception e ) {
230+ log .error ("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}" ,
231+ queuedHandle .handle .permits , queuedHandle .handle .creationTime , remainingBytes , e );
232+ }
160233 }
161234
162- void release (Handle handle ) {
235+ /**
236+ * Releases permits back to the limiter. If the handle is disabled, this method will be a no-op.
237+ *
238+ * @param handle the handle containing the permits to release
239+ */
240+ public void release (Handle handle ) {
163241 if (handle == DISABLED ) {
164242 return ;
165243 }
166- synchronized (this ) {
167- remainingBytes += handle .acquiredPermits ;
168- updateMetrics ();
244+ internalRelease (handle );
245+ }
246+
247+ private synchronized void internalRelease (Handle handle ) {
248+ if (log .isDebugEnabled ()) {
249+ log .debug ("release permits: {}, creationTime: {}, remainingBytes:{}" , handle .permits ,
250+ handle .creationTime , getRemainingBytes ());
251+ }
252+ remainingBytes += handle .permits ;
253+ while (true ) {
254+ QueuedHandle queuedHandle = queuedHandles .peek ();
255+ if (queuedHandle != null ) {
256+ boolean timedOut = acquireTimeoutMillis > 0
257+ && System .currentTimeMillis () - queuedHandle .handle .creationTime > acquireTimeoutMillis ;
258+ if (timedOut ) {
259+ // remove the peeked handle from the queue
260+ queuedHandles .poll ();
261+ handleTimeout (queuedHandle );
262+ } else if (remainingBytes >= queuedHandle .handle .permits
263+ || queuedHandle .handle .permits > maxReadsInFlightSize
264+ && remainingBytes == maxReadsInFlightSize ) {
265+ // remove the peeked handle from the queue
266+ queuedHandles .poll ();
267+ handleQueuedHandle (queuedHandle );
268+ } else {
269+ break ;
270+ }
271+ } else {
272+ break ;
273+ }
274+ }
275+ updateMetrics ();
276+ }
277+
278+ private void handleQueuedHandle (QueuedHandle queuedHandle ) {
279+ long permits = queuedHandle .handle .permits ;
280+ Handle handleForCallback = queuedHandle .handle ;
281+ if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize ) {
282+ remainingBytes = 0 ;
283+ if (log .isInfoEnabled ()) {
284+ log .info ("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
285+ + "Allowing request with permits set to maxReadsInFlightSize." ,
286+ permits , maxReadsInFlightSize , queuedHandle .handle .creationTime , remainingBytes );
287+ }
288+ handleForCallback = new Handle (maxReadsInFlightSize , queuedHandle .handle .creationTime , true );
289+ } else {
290+ remainingBytes -= permits ;
291+ if (log .isDebugEnabled ()) {
292+ log .debug ("acquired queued permits: {}, creationTime: {}, remainingBytes:{}" ,
293+ permits , queuedHandle .handle .creationTime , remainingBytes );
294+ }
295+ }
296+ try {
297+ queuedHandle .callback .accept (handleForCallback );
298+ } catch (Exception e ) {
299+ log .error ("Error in callback of acquired queued permits: {}, creationTime: {}, remainingBytes:{}" ,
300+ handleForCallback .permits , handleForCallback .creationTime , remainingBytes , e );
169301 }
170302 }
171303
@@ -175,8 +307,6 @@ private synchronized void updateMetrics() {
175307 }
176308
177309 public boolean isDisabled () {
178- return maxReadsInFlightSize <= 0 ;
310+ return ! enabled ;
179311 }
180-
181-
182- }
312+ }
0 commit comments