Skip to content

Commit 1eea3d9

Browse files
Added project reactor transport - reactive rate limiting (Tested) (#425)
* Just a start/idea of what we have in mind for 4.0. All subject to change. * Add HTTPQueryParams for some cleaner handling over an array * Add a README.md here and remove PlayerCountReply.java * Add ApacheHTTPClient to Example * checkReply return the same type * Add comment for issue 249 * Tweak doc * Some more tweaks towards javadocs etc * Move SkyBlockBazaarReply to use longs instead of ints Fixes #274 * Fix Bazaar Example * Add a comment about the recent change to the name endpoint * Make member static * Add REPLAY and SMP GameType * Readme * Some tweaks, API Key is now passed and only handdled by the HTTP Client * Move a lot of things * Readme changes * Readme changes * Update README.md * update license time * Remove these, will make sure its all on the main docs * Example link * tweaks * Tweak * Link direct to README.md * Tweak examples * New exceptions and rename some stuff * Add getSkyBlockProfiles method * Add ServerType along with LobbyType * Add a comment * Back to Hypixel example * Add User Agent * Be consistent * Include versions in Readme * Remove skyblock profiles cause another PR has it * Added reactive transport Non-blocking, automatic rate limiting * Style conventions update + readme * updated main Readme * updated main readme * bugfix * Update documentation * added limitRate * changed limitRate from 3 to 1 * limitRate to delayElements * added subscription on scheduler * changed delay Scheduler * different blocking method + unit tested and fixed bugs and inconsistencies * Change versions to 4.0 Co-authored-by: Connor Linfoot <[email protected]>
1 parent b67ba5e commit 1eea3d9

File tree

5 files changed

+369
-1
lines changed

5 files changed

+369
-1
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ repositories {
5555

5656
#### Transports
5757

58-
We include two built in options communicating with the Hypixel API, you can include either of these or even include the
58+
We include three built-in options for communicating with the Hypixel API, you can include either of these or even include the
5959
core API directly and create your own instance of HypixelHTTPClient.
6060

6161
* [Apache HttpClient Transport](hypixel-api-transport-apache/README.md)
6262
* [Unirest Java Transport](hypixel-api-transport-unirest/README.md)
63+
* [Project Reactor Transport](hypixel-api-transport-reactor/README.md) (automatic rate-limiting by default)
6364

6465
### Dependencies
6566

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
2+
Hypixel Public API - Reactive Transport
3+
======
4+
5+
### Usage
6+
7+
```xml
8+
<dependency>
9+
<groupId>net.hypixel</groupId>
10+
<artifactId>hypixel-api-transport-reactor</artifactId>
11+
<version>4.0</version>
12+
</dependency>
13+
```
14+
15+
Can also be included with Gradle.
16+
17+
```gradle
18+
dependencies {
19+
implementation 'net.hypixel:hypixel-api-transport-reactor:4.0'
20+
}
21+
```
22+
23+
### Example code
24+
25+
```java
26+
public class Main {
27+
public static void main(String[] args) {
28+
HypixelHttpClient client = new ReactorHttpClient(UUID.fromString("your-api-key-here"));
29+
HypixelAPI hypixelAPI = new HypixelAPI(client);
30+
hypixelAPI.getPlayerByName("Hypixel")
31+
.exceptionally(throwable -> {
32+
// Handle exceptions here
33+
throwable.printStackTrace();
34+
return null;
35+
})
36+
.thenAccept(System.out::println);
37+
}
38+
}
39+
```
40+
41+
### Dependencies
42+
43+
This transport depends on the following:
44+
45+
* [Google Gson library - 2.8.6](https://mvnrepository.com/artifact/com.google.code.gson/gson) (for hypixel-api-core)
46+
* [Reactor Core 3.4.5](https://mvnrepository.com/artifact/io.projectreactor/reactor-core) (for reactor netty)
47+
* Reactor Netty [(project-reactor)](https://projectreactor.io/docs):
48+
* [Netty Core 1.0.6](https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty-core)
49+
* [Netty Http 1.0.6](https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty-http)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>hypixel-api</artifactId>
7+
<groupId>net.hypixel</groupId>
8+
<version>4.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>hypixel-api-transport-reactor</artifactId>
13+
14+
<build>
15+
<plugins>
16+
<plugin>
17+
<groupId>org.apache.maven.plugins</groupId>
18+
<artifactId>maven-compiler-plugin</artifactId>
19+
<configuration>
20+
<source>1.8</source>
21+
<target>1.8</target>
22+
</configuration>
23+
</plugin>
24+
</plugins>
25+
</build>
26+
27+
<dependencyManagement>
28+
<dependencies>
29+
<dependency>
30+
<groupId>io.projectreactor</groupId>
31+
<artifactId>reactor-bom</artifactId>
32+
<version>2020.0.6</version>
33+
<type>pom</type>
34+
<scope>import</scope>
35+
</dependency>
36+
</dependencies>
37+
</dependencyManagement>
38+
39+
<dependencies>
40+
<dependency>
41+
<groupId>net.hypixel</groupId>
42+
<artifactId>hypixel-api-core</artifactId>
43+
<version>4.0-SNAPSHOT</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>io.projectreactor.netty</groupId>
47+
<artifactId>reactor-netty-core</artifactId>
48+
</dependency>
49+
<dependency>
50+
<groupId>io.projectreactor.netty</groupId>
51+
<artifactId>reactor-netty-http</artifactId>
52+
</dependency>
53+
</dependencies>
54+
55+
</project>
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
package net.hypixel.api.reactor;
2+
3+
import io.netty.handler.codec.http.HttpResponseStatus;
4+
import net.hypixel.api.http.HypixelHttpClient;
5+
import net.hypixel.api.http.HypixelHttpResponse;
6+
import reactor.core.Disposable;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.Mono;
9+
import reactor.core.publisher.MonoSink;
10+
import reactor.core.scheduler.Schedulers;
11+
import reactor.netty.http.client.HttpClient;
12+
import reactor.netty.http.client.HttpClientResponse;
13+
import reactor.util.function.Tuple2;
14+
15+
import java.time.Duration;
16+
import java.util.UUID;
17+
import java.util.concurrent.ArrayBlockingQueue;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
public class ReactorHttpClient implements HypixelHttpClient {
23+
private final HttpClient httpClient;
24+
private final UUID apiKey;
25+
26+
// Marker to reset the request counter and release waiting threads
27+
private final AtomicBoolean firstRequestReturned = new AtomicBoolean(false);
28+
// Marker to only schedule a reset clock once on error 429
29+
private final AtomicBoolean overflowStartedNewClock = new AtomicBoolean(false);
30+
31+
// Callbacks that will trigger their corresponding requests
32+
private final ArrayBlockingQueue<RequestCallback> blockingQueue;
33+
34+
// For shutting down the flux that emits request callbacks
35+
private final Disposable requestCallbackFluxDisposable;
36+
37+
private final Object lock = new Object();
38+
39+
/*
40+
* How many requests we can send before reaching the limit
41+
* Starts as 1 so the first request returns and resets this value before allowing other requests to be sent.
42+
*/
43+
private int actionsLeftThisMinute = 1;
44+
45+
/**
46+
* Constructs a new instance of this client using the specified API key.
47+
*
48+
* @param apiKey the key associated with this connection
49+
* @param minDelayBetweenRequests minimum time between sending requests (in ms) default is 8
50+
* @param bufferCapacity fixed size of blockingQueue
51+
*/
52+
public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCapacity) {
53+
this.apiKey = apiKey;
54+
this.httpClient = HttpClient.create().secure();
55+
this.blockingQueue = new ArrayBlockingQueue<>(bufferCapacity);
56+
57+
this.requestCallbackFluxDisposable = Flux.<RequestCallback>generate((synchronousSink) -> {
58+
try {
59+
RequestCallback callback = blockingQueue.take();
60+
// prune skipped/completed requests to avoid counting them
61+
while (callback.isCanceled()) {
62+
callback = blockingQueue.take();
63+
}
64+
65+
synchronized (lock) {
66+
while (this.actionsLeftThisMinute <= 0) {
67+
lock.wait();
68+
}
69+
70+
actionsLeftThisMinute--;
71+
}
72+
synchronousSink.next(callback);
73+
} catch (InterruptedException e) {
74+
throw new AssertionError("This should not have been possible", e);
75+
}
76+
}).subscribeOn(Schedulers.boundedElastic()).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest);
77+
}
78+
79+
public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests)
80+
{
81+
this(apiKey, minDelayBetweenRequests, 500);
82+
}
83+
84+
public ReactorHttpClient(UUID apiKey, int bufferCapacity)
85+
{
86+
this(apiKey, 8, bufferCapacity);
87+
}
88+
89+
public ReactorHttpClient(UUID apiKey)
90+
{
91+
this(apiKey, 8, 500);
92+
}
93+
94+
/**
95+
* Canceling the returned future will result in canceling the request if possible
96+
*/
97+
@Override
98+
public CompletableFuture<HypixelHttpResponse> makeRequest(String url) {
99+
return toHypixelResponseFuture(makeRequest(url, false));
100+
}
101+
102+
/**
103+
* Canceling the returned future will result in canceling the request if possible
104+
*/
105+
@Override
106+
public CompletableFuture<HypixelHttpResponse> makeAuthenticatedRequest(String url) {
107+
return toHypixelResponseFuture(makeRequest(url, true));
108+
}
109+
110+
private static CompletableFuture<HypixelHttpResponse> toHypixelResponseFuture(Mono<Tuple2<String, Integer>> result) {
111+
return result.map(tuple -> new HypixelHttpResponse(tuple.getT2(), tuple.getT1()))
112+
.toFuture();
113+
}
114+
115+
@Override
116+
public void shutdown() {
117+
this.requestCallbackFluxDisposable.dispose();
118+
}
119+
120+
/**
121+
* Makes a request to the Hypixel api and returns a {@link Mono<Tuple2<String, Integer>>} containing
122+
* the response body and status code, canceling this mono will prevent the request from being sent if possible
123+
* @param path full url
124+
* @param isAuthenticated whether to enable authentication or not
125+
*/
126+
public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenticated) {
127+
return Mono.<Tuple2<String, Integer>>create(sink -> {
128+
RequestCallback callback = new RequestCallback(path, sink, isAuthenticated, this);
129+
130+
try {
131+
this.blockingQueue.put(callback);
132+
} catch (InterruptedException e) {
133+
sink.error(e);
134+
throw new AssertionError("Queue insertion interrupted. This should not have been possible", e);
135+
}
136+
}).subscribeOn(Schedulers.boundedElastic());
137+
}
138+
139+
/**
140+
* Reads response status and retries error 429 (too many requests)
141+
* The first request after every limit reset will be used to schedule the next limit reset
142+
*
143+
* @param response the {@link HttpClientResponse} from our request
144+
* @param requestCallback the callback controlling our request
145+
* @return whether to return the request body or wait for a retry
146+
*/
147+
private ResponseHandlingResult handleResponse(HttpClientResponse response, RequestCallback requestCallback) throws InterruptedException {
148+
if (response.status() == HttpResponseStatus.TOO_MANY_REQUESTS) {
149+
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10));
150+
151+
if (this.overflowStartedNewClock.compareAndSet(false, true)) {
152+
synchronized (lock) {
153+
this.actionsLeftThisMinute = 0;
154+
}
155+
resetForFirstRequest(timeRemaining);
156+
}
157+
158+
// execute this last to prevent a possible exception from messing up our clock synchronization
159+
this.blockingQueue.put(requestCallback);
160+
return new ResponseHandlingResult(false, response.status().code());
161+
}
162+
163+
if (this.firstRequestReturned.compareAndSet(false, true)) {
164+
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10));
165+
int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110);
166+
167+
synchronized (lock) {
168+
this.actionsLeftThisMinute = requestsRemaining;
169+
lock.notifyAll();
170+
}
171+
172+
resetForFirstRequest(timeRemaining);
173+
}
174+
return new ResponseHandlingResult(true, response.status().code());
175+
}
176+
177+
/**
178+
* Wakes up all waiting threads in the specified amount of seconds
179+
* (Adds two seconds to account for sync errors in the server).
180+
*
181+
* @param timeRemaining how much time is left until the next reset
182+
*/
183+
private void resetForFirstRequest(int timeRemaining) {
184+
Schedulers.parallel().schedule(() -> {
185+
this.firstRequestReturned.set(false);
186+
this.overflowStartedNewClock.set(false);
187+
synchronized (lock) {
188+
this.actionsLeftThisMinute = 1;
189+
lock.notifyAll();
190+
}
191+
}, timeRemaining + 2, TimeUnit.SECONDS);
192+
}
193+
194+
/**
195+
* Controls a request
196+
*/
197+
private static class RequestCallback {
198+
private final String url;
199+
private final MonoSink<Tuple2<String, Integer>> monoSink;
200+
private final ReactorHttpClient requestRateLimiter;
201+
private final boolean isAuthenticated;
202+
private boolean isCanceled = false;
203+
204+
private RequestCallback(String url, MonoSink<Tuple2<String, Integer>> monoSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) {
205+
this.url = url;
206+
this.monoSink = monoSink;
207+
this.requestRateLimiter = requestRateLimiter;
208+
this.isAuthenticated = isAuthenticated;
209+
210+
this.monoSink.onCancel(() -> {
211+
synchronized (this) {
212+
this.isCanceled = true;
213+
}
214+
});
215+
}
216+
217+
public boolean isCanceled() {
218+
return this.isCanceled;
219+
}
220+
221+
private void sendRequest() {
222+
synchronized (this) {
223+
if (isCanceled) {
224+
synchronized (this.requestRateLimiter.lock) {
225+
this.requestRateLimiter.actionsLeftThisMinute++;
226+
this.requestRateLimiter.lock.notifyAll();
227+
}
228+
return;
229+
}
230+
}
231+
232+
(this.isAuthenticated ? requestRateLimiter.httpClient.headers(headers -> headers.add("API-Key", requestRateLimiter.apiKey.toString())) : requestRateLimiter.httpClient).get()
233+
.uri(url)
234+
.responseSingle((response, body) -> {
235+
try {
236+
ResponseHandlingResult result = requestRateLimiter.handleResponse(response, this);
237+
238+
if (result.allowToPass) {
239+
return body.asString().zipWith(Mono.just(result.statusCode));
240+
}
241+
return Mono.empty();
242+
} catch (InterruptedException e) {
243+
monoSink.error(e);
244+
throw new AssertionError("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)", e);
245+
}
246+
}).subscribe(this.monoSink::success);
247+
}
248+
}
249+
250+
/**
251+
* Data object
252+
*/
253+
private static class ResponseHandlingResult {
254+
public final boolean allowToPass;
255+
public final int statusCode;
256+
257+
public ResponseHandlingResult(boolean allowToPass, int statusCode) {
258+
this.allowToPass = allowToPass;
259+
this.statusCode = statusCode;
260+
}
261+
}
262+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<module>hypixel-api-example</module>
1515
<module>hypixel-api-transport-apache</module>
1616
<module>hypixel-api-transport-unirest</module>
17+
<module>hypixel-api-transport-reactor</module>
1718
</modules>
1819

1920
<distributionManagement>

0 commit comments

Comments
 (0)