2020
2121import com .google .common .annotations .VisibleForTesting ;
2222import io .prometheus .client .Gauge ;
23- import lombok .AllArgsConstructor ;
24- import lombok .ToString ;
23+ import java .util .Optional ;
24+ import java .util .Queue ;
25+ import java .util .concurrent .ScheduledExecutorService ;
26+ import java .util .concurrent .TimeUnit ;
27+ import java .util .function .Consumer ;
2528import lombok .extern .slf4j .Slf4j ;
29+ import org .jctools .queues .SpscArrayQueue ;
2630
2731@ Slf4j
2832public class InflightReadsLimiter {
@@ -41,86 +45,214 @@ public class InflightReadsLimiter {
4145
4246 private final long maxReadsInFlightSize ;
4347 private long remainingBytes ;
48+ private final long acquireTimeoutMillis ;
49+ private final ScheduledExecutorService timeOutExecutor ;
50+ private final boolean enabled ;
4451
45- public InflightReadsLimiter (long maxReadsInFlightSize ) {
46- if (maxReadsInFlightSize <= 0 ) {
52+ record Handle (long permits , long creationTime , boolean success ) {
53+ }
54+
55+ record QueuedHandle (Handle handle , Consumer <Handle > callback ) {
56+ }
57+
58+ private final Queue <QueuedHandle > queuedHandles ;
59+ private boolean timeoutCheckRunning = false ;
60+
61+ public InflightReadsLimiter (long maxReadsInFlightSize , int maxReadsInFlightAcquireQueueSize ,
62+ long acquireTimeoutMillis , ScheduledExecutorService timeOutExecutor ) {
63+ this .maxReadsInFlightSize = maxReadsInFlightSize ;
64+ this .remainingBytes = maxReadsInFlightSize ;
65+ this .acquireTimeoutMillis = acquireTimeoutMillis ;
66+ this .timeOutExecutor = timeOutExecutor ;
67+ if (maxReadsInFlightSize > 0 ) {
68+ enabled = true ;
69+ this .queuedHandles = new SpscArrayQueue <>(maxReadsInFlightAcquireQueueSize );
70+ } else {
71+ enabled = false ;
72+ this .queuedHandles = null ;
4773 // set it to -1 in order to show in the metrics that the metric is not available
4874 PULSAR_ML_READS_BUFFER_SIZE .set (-1 );
4975 PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE .set (-1 );
5076 }
51- this .maxReadsInFlightSize = maxReadsInFlightSize ;
52- this .remainingBytes = maxReadsInFlightSize ;
5377 }
5478
5579 @ VisibleForTesting
5680 public synchronized long getRemainingBytes () {
5781 return remainingBytes ;
5882 }
5983
60- @ AllArgsConstructor
61- @ ToString
62- static class Handle {
63- final long acquiredPermits ;
64- final boolean success ;
65- final int trials ;
84+ private static final Handle DISABLED = new Handle (0 , 0 , true );
85+ private static final Optional <Handle > DISABLED_OPTIONAL = Optional .of (DISABLED );
6686
67- final long creationTime ;
87+ /**
88+ * Acquires permits from the limiter. If the limiter is disabled, it will immediately return a successful handle.
89+ * If permits are available, it will return a handle with the acquired permits. If no permits are available,
90+ * it will return an empty optional and the callback will be called when permits become available or when the
91+ * acquire timeout is reached. The success field in the handle passed to the callback will be false if the acquire
92+ * operation times out. The callback should be non-blocking and run on a desired executor handled within the
93+ * callback itself.
94+ *
95+ * A successful handle will have the success field set to true, and the caller must call release with the handle
96+ * when the permits are no longer needed.
97+ *
98+ * If an unsuccessful handle is returned immediately, it means that the queue limit has been reached and the
99+ * callback will not be called. The caller should fail the read operation in this case to apply backpressure.
100+ *
101+ * @param permits the number of permits to acquire
102+ * @param callback the callback to be called when the permits are acquired or timed out
103+ * @return an optional handle that contains the permits if acquired, otherwise an empty optional
104+ */
105+ public Optional <Handle > acquire (long permits , Consumer <Handle > callback ) {
106+ if (isDisabled ()) {
107+ return DISABLED_OPTIONAL ;
108+ }
109+ return internalAcquire (permits , callback );
68110 }
69111
70- private static final Handle DISABLED = new Handle (0 , true , 0 , -1 );
112+ private synchronized Optional <Handle > internalAcquire (long permits , Consumer <Handle > callback ) {
113+ Handle handle = new Handle (permits , System .currentTimeMillis (), true );
114+ if (remainingBytes >= permits ) {
115+ remainingBytes -= permits ;
116+ if (log .isDebugEnabled ()) {
117+ log .debug ("acquired permits: {}, creationTime: {}, remainingBytes:{}" , permits , handle .creationTime ,
118+ remainingBytes );
119+ }
120+ updateMetrics ();
121+ return Optional .of (handle );
122+ } else if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize ) {
123+ remainingBytes = 0 ;
124+ if (log .isInfoEnabled ()) {
125+ log .info ("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
126+ + "Allowing request with permits set to maxReadsInFlightSize." ,
127+ permits , maxReadsInFlightSize , handle .creationTime , remainingBytes );
128+ }
129+ updateMetrics ();
130+ return Optional .of (new Handle (maxReadsInFlightSize , handle .creationTime , true ));
131+ } else {
132+ if (queuedHandles .offer (new QueuedHandle (handle , callback ))) {
133+ scheduleTimeOutCheck (acquireTimeoutMillis );
134+ return Optional .empty ();
135+ } else {
136+ log .warn ("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}" ,
137+ permits , handle .creationTime , remainingBytes );
138+ return Optional .of (new Handle (0 , handle .creationTime , false ));
139+ }
140+ }
141+ }
71142
72- Handle acquire (long permits , Handle current ) {
73- if (maxReadsInFlightSize <= 0 ) {
74- // feature is disabled
75- return DISABLED ;
143+ private synchronized void scheduleTimeOutCheck (long delayMillis ) {
144+ if (acquireTimeoutMillis <= 0 ) {
145+ return ;
146+ }
147+ if (!timeoutCheckRunning ) {
148+ timeoutCheckRunning = true ;
149+ timeOutExecutor .schedule (this ::timeoutCheck , delayMillis , TimeUnit .MILLISECONDS );
76150 }
77- synchronized (this ) {
78- try {
79- if (current == null ) {
80- if (remainingBytes == 0 ) {
81- return new Handle (0 , false , 1 , System .currentTimeMillis ());
82- }
83- if (remainingBytes >= permits ) {
84- remainingBytes -= permits ;
85- return new Handle (permits , true , 1 , System .currentTimeMillis ());
86- } else {
87- long possible = remainingBytes ;
88- remainingBytes = 0 ;
89- return new Handle (possible , false , 1 , System .currentTimeMillis ());
90- }
151+ }
152+
153+ private synchronized void timeoutCheck () {
154+ timeoutCheckRunning = false ;
155+ long delay = 0 ;
156+ while (true ) {
157+ QueuedHandle queuedHandle = queuedHandles .peek ();
158+ if (queuedHandle != null ) {
159+ long age = System .currentTimeMillis () - queuedHandle .handle .creationTime ;
160+ if (age >= acquireTimeoutMillis ) {
161+ // remove the peeked handle from the queue
162+ queuedHandles .poll ();
163+ handleTimeout (queuedHandle );
91164 } else {
92- if (current .trials >= 4 && current .acquiredPermits > 0 ) {
93- remainingBytes += current .acquiredPermits ;
94- return new Handle (0 , false , 1 , current .creationTime );
95- }
96- if (remainingBytes == 0 ) {
97- return new Handle (current .acquiredPermits , false , current .trials + 1 ,
98- current .creationTime );
99- }
100- long needed = permits - current .acquiredPermits ;
101- if (remainingBytes >= needed ) {
102- remainingBytes -= needed ;
103- return new Handle (permits , true , current .trials + 1 , current .creationTime );
104- } else {
105- long possible = remainingBytes ;
106- remainingBytes = 0 ;
107- return new Handle (current .acquiredPermits + possible , false ,
108- current .trials + 1 , current .creationTime );
109- }
165+ delay = acquireTimeoutMillis - age ;
166+ break ;
110167 }
111- } finally {
112- updateMetrics () ;
168+ } else {
169+ break ;
113170 }
114171 }
172+ if (delay > 0 ) {
173+ scheduleTimeOutCheck (delay );
174+ }
175+ }
176+
177+ private void handleTimeout (QueuedHandle queuedHandle ) {
178+ if (log .isDebugEnabled ()) {
179+ log .debug ("timed out queued permits: {}, creationTime: {}, remainingBytes:{}" ,
180+ queuedHandle .handle .permits , queuedHandle .handle .creationTime , remainingBytes );
181+ }
182+ try {
183+ queuedHandle .callback .accept (new Handle (0 , queuedHandle .handle .creationTime , false ));
184+ } catch (Exception e ) {
185+ log .error ("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}" ,
186+ queuedHandle .handle .permits , queuedHandle .handle .creationTime , remainingBytes , e );
187+ }
115188 }
116189
117- void release (Handle handle ) {
190+ /**
191+ * Releases permits back to the limiter. If the handle is disabled, this method will be a no-op.
192+ *
193+ * @param handle the handle containing the permits to release
194+ */
195+ public void release (Handle handle ) {
118196 if (handle == DISABLED ) {
119197 return ;
120198 }
121- synchronized (this ) {
122- remainingBytes += handle .acquiredPermits ;
123- updateMetrics ();
199+ internalRelease (handle );
200+ }
201+
202+ private synchronized void internalRelease (Handle handle ) {
203+ if (log .isDebugEnabled ()) {
204+ log .debug ("release permits: {}, creationTime: {}, remainingBytes:{}" , handle .permits ,
205+ handle .creationTime , getRemainingBytes ());
206+ }
207+ remainingBytes += handle .permits ;
208+ while (true ) {
209+ QueuedHandle queuedHandle = queuedHandles .peek ();
210+ if (queuedHandle != null ) {
211+ boolean timedOut = acquireTimeoutMillis > 0
212+ && System .currentTimeMillis () - queuedHandle .handle .creationTime > acquireTimeoutMillis ;
213+ if (timedOut ) {
214+ // remove the peeked handle from the queue
215+ queuedHandles .poll ();
216+ handleTimeout (queuedHandle );
217+ } else if (remainingBytes >= queuedHandle .handle .permits
218+ || queuedHandle .handle .permits > maxReadsInFlightSize
219+ && remainingBytes == maxReadsInFlightSize ) {
220+ // remove the peeked handle from the queue
221+ queuedHandles .poll ();
222+ handleQueuedHandle (queuedHandle );
223+ } else {
224+ break ;
225+ }
226+ } else {
227+ break ;
228+ }
229+ }
230+ updateMetrics ();
231+ }
232+
233+ private void handleQueuedHandle (QueuedHandle queuedHandle ) {
234+ long permits = queuedHandle .handle .permits ;
235+ Handle handleForCallback = queuedHandle .handle ;
236+ if (permits > maxReadsInFlightSize && remainingBytes == maxReadsInFlightSize ) {
237+ remainingBytes = 0 ;
238+ if (log .isInfoEnabled ()) {
239+ log .info ("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. "
240+ + "Allowing request with permits set to maxReadsInFlightSize." ,
241+ permits , maxReadsInFlightSize , queuedHandle .handle .creationTime , remainingBytes );
242+ }
243+ handleForCallback = new Handle (maxReadsInFlightSize , queuedHandle .handle .creationTime , true );
244+ } else {
245+ remainingBytes -= permits ;
246+ if (log .isDebugEnabled ()) {
247+ log .debug ("acquired queued permits: {}, creationTime: {}, remainingBytes:{}" ,
248+ permits , queuedHandle .handle .creationTime , remainingBytes );
249+ }
250+ }
251+ try {
252+ queuedHandle .callback .accept (handleForCallback );
253+ } catch (Exception e ) {
254+ log .error ("Error in callback of acquired queued permits: {}, creationTime: {}, remainingBytes:{}" ,
255+ handleForCallback .permits , handleForCallback .creationTime , remainingBytes , e );
124256 }
125257 }
126258
@@ -130,8 +262,6 @@ private synchronized void updateMetrics() {
130262 }
131263
132264 public boolean isDisabled () {
133- return maxReadsInFlightSize <= 0 ;
265+ return ! enabled ;
134266 }
135-
136-
137- }
267+ }
0 commit comments