2222import io .opentelemetry .api .OpenTelemetry ;
2323import io .opentelemetry .api .metrics .ObservableLongCounter ;
2424import io .prometheus .client .Gauge ;
25+ import java .util .Optional ;
26+ import java .util .Queue ;
27+ import java .util .concurrent .ScheduledExecutorService ;
28+ import java .util .concurrent .TimeUnit ;
29+ import java .util .function .Consumer ;
2530import lombok .AllArgsConstructor ;
2631import lombok .ToString ;
2732import lombok .extern .slf4j .Slf4j ;
2833import org .apache .pulsar .opentelemetry .Constants ;
2934import org .apache .pulsar .opentelemetry .OpenTelemetryAttributes .InflightReadLimiterUtilization ;
3035import org .apache .pulsar .opentelemetry .annotations .PulsarDeprecatedMetric ;
36+ import org .jctools .queues .SpscArrayQueue ;
3137
3238@ Slf4j
3339public class InflightReadsLimiter implements AutoCloseable {
@@ -58,16 +64,41 @@ public class InflightReadsLimiter implements AutoCloseable {
5864
5965 private final long maxReadsInFlightSize ;
6066 private long remainingBytes ;
67+ private final long acquireTimeoutMillis ;
68+ private final ScheduledExecutorService timeOutExecutor ;
69+ private final boolean enabled ;
6170
62- public InflightReadsLimiter (long maxReadsInFlightSize , OpenTelemetry openTelemetry ) {
63- if (maxReadsInFlightSize <= 0 ) {
71+ @ AllArgsConstructor
72+ @ ToString
73+ static class Handle {
74+ final long permits ;
75+ final long creationTime ;
76+ final boolean success ;
77+ }
78+
79+ record QueuedHandle (Handle handle , Consumer <Handle > callback ) {
80+ }
81+
82+ private final Queue <QueuedHandle > queuedHandles ;
83+ private boolean timeoutCheckRunning = false ;
84+
85+ public InflightReadsLimiter (long maxReadsInFlightSize , int maxReadsInFlightAcquireQueueSize ,
86+ long acquireTimeoutMillis , ScheduledExecutorService timeOutExecutor ,
87+ OpenTelemetry openTelemetry ) {
88+ this .maxReadsInFlightSize = maxReadsInFlightSize ;
89+ this .remainingBytes = maxReadsInFlightSize ;
90+ this .acquireTimeoutMillis = acquireTimeoutMillis ;
91+ this .timeOutExecutor = timeOutExecutor ;
92+ if (maxReadsInFlightSize > 0 ) {
93+ enabled = true ;
94+ this .queuedHandles = new SpscArrayQueue <>(maxReadsInFlightAcquireQueueSize );
95+ } else {
96+ enabled = false ;
97+ this .queuedHandles = null ;
6498 // set it to -1 in order to show in the metrics that the metric is not available
6599 PULSAR_ML_READS_BUFFER_SIZE .set (-1 );
66100 PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE .set (-1 );
67101 }
68- this .maxReadsInFlightSize = maxReadsInFlightSize ;
69- this .remainingBytes = maxReadsInFlightSize ;
70-
71102 var meter = openTelemetry .getMeter (Constants .BROKER_INSTRUMENTATION_SCOPE_NAME );
72103 inflightReadsLimitCounter = meter .counterBuilder (INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME )
73104 .setDescription ("Maximum number of bytes that can be retained by managed ledger data read from storage "
@@ -102,71 +133,145 @@ public void close() {
102133 inflightReadsUsageCounter .close ();
103134 }
104135
105- @ AllArgsConstructor
106- @ ToString
107- static class Handle {
108- final long acquiredPermits ;
109- final boolean success ;
110- final int trials ;
136+ private static final Handle DISABLED = new Handle (0 , 0 , true );
137+ private static final Optional <Handle > DISABLED_OPTIONAL = Optional .of (DISABLED );
111138
112- final long creationTime ;
139+ /**
140+ * Acquires permits from the limiter. If the limiter is disabled, it will immediately return a successful handle.
141+ * If permits are available, it will return a handle with the acquired permits. If no permits are available,
142+ * it will return an empty optional and the callback will be called when permits become available or when the
143+ * acquire timeout is reached. The success field in the handle passed to the callback will be false if the acquire
144+ * operation times out. The callback should be non-blocking and run on a desired executor handled within the
145+ * callback itself.
146+ *
147+ * A successful handle will have the success field set to true, and the caller must call release with the handle
148+ * when the permits are no longer needed.
149+ *
150+ * If an unsuccessful handle is returned immediately, it means that the queue limit has been reached and the
151+ * callback will not be called. The caller should fail the read operation in this case to apply backpressure.
152+ *
153+ * @param permits the number of permits to acquire
154+ * @param callback the callback to be called when the permits are acquired or timed out
155+ * @return an optional handle that contains the permits if acquired, otherwise an empty optional
156+ */
157+ public Optional <Handle > acquire (long permits , Consumer <Handle > callback ) {
158+ if (isDisabled ()) {
159+ return DISABLED_OPTIONAL ;
160+ }
161+ return internalAcquire (permits , callback );
113162 }
114163
115- private static final Handle DISABLED = new Handle (0 , true , 0 , -1 );
164+ private synchronized Optional <Handle > internalAcquire (long permits , Consumer <Handle > callback ) {
165+ Handle handle = new Handle (permits , System .currentTimeMillis (), true );
166+ if (remainingBytes >= permits ) {
167+ remainingBytes -= permits ;
168+ if (log .isDebugEnabled ()) {
169+ log .debug ("acquired permits: {}, creationTime: {}, remainingBytes:{}" , permits , handle .creationTime ,
170+ remainingBytes );
171+ }
172+ updateMetrics ();
173+ return Optional .of (handle );
174+ } else {
175+ if (queuedHandles .offer (new QueuedHandle (handle , callback ))) {
176+ scheduleTimeOutCheck (acquireTimeoutMillis );
177+ return Optional .empty ();
178+ } else {
179+ log .warn ("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}" ,
180+ permits , handle .creationTime , remainingBytes );
181+ return Optional .of (new Handle (0 , handle .creationTime , false ));
182+ }
183+ }
184+ }
116185
117- Handle acquire (long permits , Handle current ) {
118- if (maxReadsInFlightSize <= 0 ) {
119- // feature is disabled
120- return DISABLED ;
186+ private synchronized void scheduleTimeOutCheck (long delayMillis ) {
187+ if (acquireTimeoutMillis <= 0 ) {
188+ return ;
121189 }
122- synchronized (this ) {
123- try {
124- if (current == null ) {
125- if (remainingBytes == 0 ) {
126- return new Handle (0 , false , 1 , System .currentTimeMillis ());
127- }
128- if (remainingBytes >= permits ) {
129- remainingBytes -= permits ;
130- return new Handle (permits , true , 1 , System .currentTimeMillis ());
131- } else {
132- long possible = remainingBytes ;
133- remainingBytes = 0 ;
134- return new Handle (possible , false , 1 , System .currentTimeMillis ());
135- }
190+ if (!timeoutCheckRunning ) {
191+ timeoutCheckRunning = true ;
192+ timeOutExecutor .schedule (this ::timeoutCheck , delayMillis , TimeUnit .MILLISECONDS );
193+ }
194+ }
195+
196+ private synchronized void timeoutCheck () {
197+ timeoutCheckRunning = false ;
198+ long delay = 0 ;
199+ while (true ) {
200+ QueuedHandle queuedHandle = queuedHandles .peek ();
201+ if (queuedHandle != null ) {
202+ long age = System .currentTimeMillis () - queuedHandle .handle .creationTime ;
203+ if (age >= acquireTimeoutMillis ) {
204+ // remove the peeked handle from the queue
205+ queuedHandles .poll ();
206+ handleTimeout (queuedHandle );
136207 } else {
137- if (current .trials >= 4 && current .acquiredPermits > 0 ) {
138- remainingBytes += current .acquiredPermits ;
139- return new Handle (0 , false , 1 , current .creationTime );
140- }
141- if (remainingBytes == 0 ) {
142- return new Handle (current .acquiredPermits , false , current .trials + 1 ,
143- current .creationTime );
144- }
145- long needed = permits - current .acquiredPermits ;
146- if (remainingBytes >= needed ) {
147- remainingBytes -= needed ;
148- return new Handle (permits , true , current .trials + 1 , current .creationTime );
149- } else {
150- long possible = remainingBytes ;
151- remainingBytes = 0 ;
152- return new Handle (current .acquiredPermits + possible , false ,
153- current .trials + 1 , current .creationTime );
154- }
208+ delay = acquireTimeoutMillis - age ;
209+ break ;
155210 }
156- } finally {
157- updateMetrics () ;
211+ } else {
212+ break ;
158213 }
159214 }
215+ if (delay > 0 ) {
216+ scheduleTimeOutCheck (delay );
217+ }
218+ }
219+
220+ private void handleTimeout (QueuedHandle queuedHandle ) {
221+ if (log .isDebugEnabled ()) {
222+ log .debug ("timed out queued permits: {}, creationTime: {}, remainingBytes:{}" ,
223+ queuedHandle .handle .permits , queuedHandle .handle .creationTime , remainingBytes );
224+ }
225+ queuedHandle .callback .accept (new Handle (0 , queuedHandle .handle .creationTime , false ));
160226 }
161227
162- void release (Handle handle ) {
228+ /**
229+ * Releases permits back to the limiter. If the handle is disabled, this method will be a no-op.
230+ *
231+ * @param handle the handle containing the permits to release
232+ */
233+ public void release (Handle handle ) {
163234 if (handle == DISABLED ) {
164235 return ;
165236 }
166- synchronized (this ) {
167- remainingBytes += handle .acquiredPermits ;
168- updateMetrics ();
237+ internalRelease (handle );
238+ }
239+
240+ private synchronized void internalRelease (Handle handle ) {
241+ if (log .isDebugEnabled ()) {
242+ log .debug ("release permits: {}, creationTime: {}, remainingBytes:{}" , handle .permits ,
243+ handle .creationTime , getRemainingBytes ());
244+ }
245+ remainingBytes += handle .permits ;
246+ while (true ) {
247+ QueuedHandle queuedHandle = queuedHandles .peek ();
248+ if (queuedHandle != null ) {
249+ if (remainingBytes >= queuedHandle .handle .permits ) {
250+ // remove the peeked handle from the queue
251+ queuedHandles .poll ();
252+ handleQueuedHandle (queuedHandle );
253+ } else if (acquireTimeoutMillis > 0
254+ && System .currentTimeMillis () - queuedHandle .handle .creationTime > acquireTimeoutMillis ) {
255+ // remove the peeked handle from the queue
256+ queuedHandles .poll ();
257+ handleTimeout (queuedHandle );
258+ } else {
259+ break ;
260+ }
261+ } else {
262+ break ;
263+ }
264+ }
265+ updateMetrics ();
266+ }
267+
268+ private void handleQueuedHandle (QueuedHandle queuedHandle ) {
269+ remainingBytes -= queuedHandle .handle .permits ;
270+ if (log .isDebugEnabled ()) {
271+ log .debug ("acquired queued permits: {}, creationTime: {}, remainingBytes:{}" ,
272+ queuedHandle .handle .permits , queuedHandle .handle .creationTime , remainingBytes );
169273 }
274+ queuedHandle .callback .accept (queuedHandle .handle );
170275 }
171276
172277 private synchronized void updateMetrics () {
@@ -175,8 +280,6 @@ private synchronized void updateMetrics() {
175280 }
176281
177282 public boolean isDisabled () {
178- return maxReadsInFlightSize <= 0 ;
283+ return ! enabled ;
179284 }
180-
181-
182- }
285+ }
0 commit comments