1414
1515import java .time .Duration ;
1616import java .util .UUID ;
17- import java .util .concurrent .ArrayBlockingQueue ;
18- import java .util .concurrent .CompletableFuture ;
19- import java .util .concurrent .TimeUnit ;
17+ import java .util .concurrent .*;
2018import java .util .concurrent .atomic .AtomicBoolean ;
19+ import java .util .concurrent .locks .Condition ;
20+ import java .util .concurrent .locks .ReentrantLock ;
2121
2222public class ReactorHttpClient implements HypixelHttpClient {
23+
2324 private final HttpClient httpClient ;
2425 private final UUID apiKey ;
2526
@@ -33,8 +34,10 @@ public class ReactorHttpClient implements HypixelHttpClient {
3334
3435 // For shutting down the flux that emits request callbacks
3536 private final Disposable requestCallbackFluxDisposable ;
37+ private final ExecutorService requestCallbackFluxExecutorService = Executors .newSingleThreadExecutor ();
3638
37- private final Object lock = new Object ();
39+ private final ReentrantLock lock = new ReentrantLock (true );
40+ private final Condition limitResetCondition = lock .newCondition ();
3841
3942 /*
4043 * How many requests we can send before reaching the limit
@@ -62,18 +65,13 @@ public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCa
6265 callback = blockingQueue .take ();
6366 }
6467
65- synchronized (lock ) {
66- while (this .actionsLeftThisMinute <= 0 ) {
67- lock .wait ();
68- }
68+ this .decrementActionsOrWait ();
6969
70- actionsLeftThisMinute --;
71- }
7270 synchronousSink .next (callback );
7371 } catch (InterruptedException e ) {
7472 throw new AssertionError ("This should not have been possible" , e );
7573 }
76- }).subscribeOn (Schedulers .boundedElastic ( )).delayElements (Duration .ofMillis (minDelayBetweenRequests ), Schedulers .boundedElastic ()).subscribe (RequestCallback ::sendRequest );
74+ }).subscribeOn (Schedulers .fromExecutorService ( this . requestCallbackFluxExecutorService )).delayElements (Duration .ofMillis (minDelayBetweenRequests ), Schedulers .boundedElastic ()).subscribe (RequestCallback ::sendRequest );
7775 }
7876
7977 public ReactorHttpClient (UUID apiKey , long minDelayBetweenRequests ) {
@@ -89,15 +87,15 @@ public ReactorHttpClient(UUID apiKey) {
8987 }
9088
9189 /**
92- * Canceling the returned future will result in canceling the request if possible
90+ * Canceling the returned future will result in canceling the sending of the request if still possible
9391 */
9492 @ Override
9593 public CompletableFuture <HypixelHttpResponse > makeRequest (String url ) {
9694 return toHypixelResponseFuture (makeRequest (url , false ));
9795 }
9896
9997 /**
100- * Canceling the returned future will result in canceling the request if possible
98+ * Canceling the returned future will result in canceling the sending of the request if still possible
10199 */
102100 @ Override
103101 public CompletableFuture <HypixelHttpResponse > makeAuthenticatedRequest (String url ) {
@@ -112,17 +110,18 @@ private static CompletableFuture<HypixelHttpResponse> toHypixelResponseFuture(Mo
112110 @ Override
113111 public void shutdown () {
114112 this .requestCallbackFluxDisposable .dispose ();
113+ this .requestCallbackFluxExecutorService .shutdown ();
115114 }
116115
117116 /**
118- * Makes a request to the Hypixel api and returns a {@link Mono<Tuple2<String, Integer>> } containing
117+ * Makes a request to the Hypixel api and returns a {@link Mono} containing
119118 * the response body and status code, canceling this mono will prevent the request from being sent if possible
120119 *
121120 * @param path full url
122121 * @param isAuthenticated whether to enable authentication or not
123122 */
124123 public Mono <Tuple2 <String , Integer >> makeRequest (String path , boolean isAuthenticated ) {
125- return Mono .< Tuple2 < String , Integer >> create (sink -> {
124+ return Mono .create (sink -> {
126125 RequestCallback callback = new RequestCallback (path , sink , isAuthenticated , this );
127126
128127 try {
@@ -131,7 +130,40 @@ public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenti
131130 sink .error (e );
132131 throw new AssertionError ("Queue insertion interrupted. This should not have been possible" , e );
133132 }
134- }).subscribeOn (Schedulers .boundedElastic ());
133+ });
134+ }
135+
136+ private void decrementActionsOrWait () throws InterruptedException {
137+ this .lock .lock ();
138+ try {
139+ while (this .actionsLeftThisMinute <= 0 ) {
140+ this .limitResetCondition .await ();
141+ }
142+ this .actionsLeftThisMinute --;
143+ } finally {
144+ this .lock .unlock ();
145+ }
146+ }
147+
148+
149+ private void incrementActionsLeftThisMinute () {
150+ this .lock .lock ();
151+ try {
152+ this .actionsLeftThisMinute ++;
153+ this .limitResetCondition .signal ();
154+ } finally {
155+ this .lock .unlock ();
156+ }
157+ }
158+
159+ private void setActionsLeftThisMinute (int value ) {
160+ this .lock .lock ();
161+ try {
162+ this .actionsLeftThisMinute = Math .max (0 , value );
163+ this .limitResetCondition .signal ();
164+ } finally {
165+ this .lock .unlock ();
166+ }
135167 }
136168
137169 /**
@@ -144,12 +176,11 @@ public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenti
144176 */
145177 private ResponseHandlingResult handleResponse (HttpClientResponse response , RequestCallback requestCallback ) throws InterruptedException {
146178 if (response .status () == HttpResponseStatus .TOO_MANY_REQUESTS ) {
179+ System .out .println ("Too many requests were sent, is something else using the same API Key?!!" );
147180 int timeRemaining = Math .max (1 , response .responseHeaders ().getInt ("ratelimit-reset" , 10 ));
148181
149182 if (this .overflowStartedNewClock .compareAndSet (false , true )) {
150- synchronized (lock ) {
151- this .actionsLeftThisMinute = 0 ;
152- }
183+ this .setActionsLeftThisMinute (0 );
153184 resetForFirstRequest (timeRemaining );
154185 }
155186
@@ -162,10 +193,7 @@ private ResponseHandlingResult handleResponse(HttpClientResponse response, Reque
162193 int timeRemaining = Math .max (1 , response .responseHeaders ().getInt ("ratelimit-reset" , 10 ));
163194 int requestsRemaining = response .responseHeaders ().getInt ("ratelimit-remaining" , 110 );
164195
165- synchronized (lock ) {
166- this .actionsLeftThisMinute = requestsRemaining ;
167- lock .notifyAll ();
168- }
196+ this .setActionsLeftThisMinute (requestsRemaining );
169197
170198 resetForFirstRequest (timeRemaining );
171199 }
@@ -182,10 +210,7 @@ private void resetForFirstRequest(int timeRemaining) {
182210 Schedulers .parallel ().schedule (() -> {
183211 this .firstRequestReturned .set (false );
184212 this .overflowStartedNewClock .set (false );
185- synchronized (lock ) {
186- this .actionsLeftThisMinute = 1 ;
187- lock .notifyAll ();
188- }
213+ this .setActionsLeftThisMinute (1 );
189214 }, timeRemaining + 2 , TimeUnit .SECONDS );
190215 }
191216
@@ -194,37 +219,43 @@ private void resetForFirstRequest(int timeRemaining) {
194219 */
195220 private static class RequestCallback {
196221 private final String url ;
197- private final MonoSink <Tuple2 <String , Integer >> monoSink ;
222+ private final MonoSink <Tuple2 <String , Integer >> requestResultSink ;
198223 private final ReactorHttpClient requestRateLimiter ;
199224 private final boolean isAuthenticated ;
225+ private final ReentrantLock lock = new ReentrantLock ();
200226 private boolean isCanceled = false ;
201227
202- private RequestCallback (String url , MonoSink <Tuple2 <String , Integer >> monoSink , boolean isAuthenticated , ReactorHttpClient requestRateLimiter ) {
228+ private RequestCallback (String url , MonoSink <Tuple2 <String , Integer >> requestResultSink , boolean isAuthenticated , ReactorHttpClient requestRateLimiter ) {
203229 this .url = url ;
204- this .monoSink = monoSink ;
230+ this .requestResultSink = requestResultSink ;
205231 this .requestRateLimiter = requestRateLimiter ;
206232 this .isAuthenticated = isAuthenticated ;
207233
208- this .monoSink .onCancel (() -> {
209- synchronized (this ) {
210- this .isCanceled = true ;
211- }
212- });
234+ this .requestResultSink .onCancel (this ::setCanceled );
235+ }
236+
237+ private void setCanceled () {
238+ this .lock .lock ();
239+ try {
240+ this .isCanceled = true ;
241+ } finally {
242+ this .lock .unlock ();
243+ }
213244 }
214245
215246 public boolean isCanceled () {
216- return this .isCanceled ;
247+ this .lock .lock ();
248+ try {
249+ return this .isCanceled ;
250+ } finally {
251+ this .lock .unlock ();
252+ }
217253 }
218254
219255 private void sendRequest () {
220- synchronized (this ) {
221- if (isCanceled ) {
222- synchronized (this .requestRateLimiter .lock ) {
223- this .requestRateLimiter .actionsLeftThisMinute ++;
224- this .requestRateLimiter .lock .notifyAll ();
225- }
226- return ;
227- }
256+ if (this .isCanceled ()) {
257+ this .requestRateLimiter .incrementActionsLeftThisMinute ();
258+ return ;
228259 }
229260
230261 (this .isAuthenticated ? requestRateLimiter .httpClient .headers (headers -> headers .add ("API-Key" , requestRateLimiter .apiKey .toString ())) : requestRateLimiter .httpClient ).get ()
@@ -238,10 +269,10 @@ private void sendRequest() {
238269 }
239270 return Mono .empty ();
240271 } catch (InterruptedException e ) {
241- monoSink .error (e );
272+ this . requestResultSink .error (e );
242273 throw new AssertionError ("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)" , e );
243274 }
244- }).subscribe (this .monoSink ::success );
275+ }).subscribe (this .requestResultSink ::success );
245276 }
246277 }
247278
0 commit comments