Skip to content

Commit a664ee0

Browse files
authored
Merge pull request #177 from binance/handle_data_streams
handle data streams
2 parents 86f180d + 50149c2 commit a664ee0

File tree

405 files changed

+57337
-1516
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

405 files changed

+57337
-1516
lines changed

.github/workflows/java.yml

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
name: Java Main Workflow
22

33
on:
4-
# pull_request:
5-
# branches: [ master, rc-** ]
4+
push:
5+
branches: [ master ]
6+
pull_request:
7+
branches: [ master, rc-* ]
68

79
jobs:
810
detect-targets:
@@ -28,21 +30,21 @@ jobs:
2830
fi
2931
3032
# Check each client dynamically
31-
find clients/* -type d -maxdepth 0 | while read client; do
33+
while read client; do
3234
CLIENT_NAME=$(basename "$client")
3335
if ! git diff --quiet "$BASE_COMMIT" HEAD -- "$client"; then
3436
echo "Changes detected in $CLIENT_NAME"
3537
MODIFIED_TARGETS+=("$CLIENT_NAME")
3638
fi
37-
done
39+
done < <(find clients/* -type d -maxdepth 0)
3840
3941
# Convert to JSON array format
4042
MODIFIED_TARGETS_JSON=$(printf '%s\n' "${MODIFIED_TARGETS[@]}" | jq -R -s -c 'split("\n") | map(select(. != ""))')
4143
4244
echo "Detected modified targets: $MODIFIED_TARGETS_JSON"
4345
echo "modified_targets=$MODIFIED_TARGETS_JSON" >> $GITHUB_ENV
4446
echo "::set-output name=modified_targets::$MODIFIED_TARGETS_JSON"
45-
checkstyle:
47+
verify:
4648
runs-on: ubuntu-latest
4749
needs: detect-targets
4850
if: ${{ needs.detect-targets.outputs.modified_targets != '[]' }}
@@ -59,29 +61,5 @@ jobs:
5961
'examples/pom.xml'
6062
- name: Validate modules
6163
run: |
62-
mvn -f clients/pom.xml -N install
63-
mvn -f clients/pom.xml -pl common install
64-
mvn -f clients/pom.xml -pl `echo '${{ needs.detect-targets.outputs.modified_targets }}' | jq -r 'join(",")'` validate
65-
build:
66-
runs-on: ubuntu-latest
67-
needs: detect-targets
68-
if: ${{ needs.detect-targets.outputs.modified_targets != '[]' }}
69-
strategy:
70-
matrix:
71-
target: ${{ fromJson(needs.detect-targets.outputs.modified_targets) }}
72-
java-version: [ 11, 17 ]
73-
steps:
74-
- uses: actions/checkout@v3
75-
- name: Set up JDK
76-
uses: actions/setup-java@v3
77-
with:
78-
java-version: ${{ matrix.java-version }}
79-
distribution: 'adopt'
80-
cache: 'maven'
81-
cache-dependency-path: |
82-
'clients/pom.xml'
83-
'examples/pom.xml'
84-
- name: Build ${{ matrix.target }} module
85-
run: |
86-
mvn -f clients/pom.xml -N install
87-
mvn -f clients/pom.xml -pl common,${{ matrix.target }} install -Dcheckstyle.skip=true
64+
mvn -f clients/pom.xml -pl common install -Dgpg.skip
65+
mvn -f clients/pom.xml -pl `echo '${{ needs.detect-targets.outputs.modified_targets }}' | jq -r 'join(",")'` verify -Dgpg.skip

clients/common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@
1010

1111
<artifactId>binance-common</artifactId>
1212
<name>common</name>
13-
<version>1.3.0</version>
13+
<version>1.4.0</version>
1414
<packaging>jar</packaging>
1515
</project>

clients/common/src/main/java/com/binance/connector/client/common/ApiClient.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public class ApiClient {
134134

135135
private Gson json;
136136

137-
private Set<String> forbiddenHeaders = new HashSet<>(Arrays.asList("host", "authorization", "cookie", ":method", ":path"));
137+
private Set<String> forbiddenHeaders =
138+
new HashSet<>(Arrays.asList("host", "authorization", "cookie", ":method", ":path"));
138139

139140
public ApiClient(ClientConfiguration configuration) {
140141
this(configuration, new BinanceAuthenticationFactory(), null);
@@ -188,8 +189,10 @@ public ApiClient(
188189
}
189190
}
190191

191-
if (configuration.getCustomHeaders() != null && !configuration.getCustomHeaders().isEmpty()) {
192-
Interceptor customHeadersInterceptor = getCustomHeadersInterceptor(configuration.getCustomHeaders());
192+
if (configuration.getCustomHeaders() != null
193+
&& !configuration.getCustomHeaders().isEmpty()) {
194+
Interceptor customHeadersInterceptor =
195+
getCustomHeadersInterceptor(configuration.getCustomHeaders());
193196
builder.addInterceptor(customHeadersInterceptor);
194197
}
195198

@@ -217,13 +220,15 @@ public ApiClient(
217220
if (authentication != null) {
218221
authentications.put(BINANCE_SIGNATURE, authentication);
219222
}
223+
}
220224

221-
Authentication binanceApiKeyOnly =
222-
(queryParams, headerParams, cookieParams, payload, method, uri) -> {
225+
Authentication binanceApiKeyOnly =
226+
(queryParams, headerParams, cookieParams, payload, method, uri) -> {
227+
if (signatureConfiguration != null && signatureConfiguration.getApiKey() != null) {
223228
headerParams.put(HEADER_API_KEY, signatureConfiguration.getApiKey());
224-
};
225-
authentications.put(BINANCE_API_KEY_ONLY, binanceApiKeyOnly);
226-
}
229+
}
230+
};
231+
authentications.put(BINANCE_API_KEY_ONLY, binanceApiKeyOnly);
227232
}
228233

229234
private void init() {
@@ -250,13 +255,15 @@ public void setJson(Gson json) {
250255

251256
public Interceptor getCustomHeadersInterceptor(Map<String, String> customHeaders) {
252257
return chain -> {
253-
254258
Request request = chain.request();
255259
Request.Builder newBuilder = request.newBuilder();
256260
for (String headerName : customHeaders.keySet()) {
257261
String headerValue = customHeaders.get(headerName);
258262
if (!validateHeader(headerName, headerValue)) {
259-
throw new ApiException("Invalid header " + headerName + ", it is forbidden or invalid (contains CR/LF)");
263+
throw new ApiException(
264+
"Invalid header "
265+
+ headerName
266+
+ ", it is forbidden or invalid (contains CR/LF)");
260267
}
261268

262269
newBuilder.addHeader(headerName, headerValue);
@@ -1413,9 +1420,22 @@ public Request buildRequest(
14131420

14141421
List<Pair> updatedQueryParams = new ArrayList<>(queryParams);
14151422

1423+
boolean hasAuth =
1424+
Arrays.stream(authNames)
1425+
.anyMatch(
1426+
s -> s.equals(BINANCE_SIGNATURE) || s.equals(BINANCE_API_KEY_ONLY));
1427+
1428+
// add api key to every request
1429+
String[] finalAuthNames;
1430+
if (!hasAuth) {
1431+
finalAuthNames = append(authNames, BINANCE_API_KEY_ONLY);
1432+
} else {
1433+
finalAuthNames = authNames;
1434+
}
1435+
14161436
// update parameters with authentication settings
14171437
updateParamsForAuth(
1418-
authNames,
1438+
finalAuthNames,
14191439
updatedQueryParams,
14201440
headerParams,
14211441
cookieParams,
@@ -1862,4 +1882,13 @@ private Boolean validateHeader(String name, String value) {
18621882

18631883
return !value.contains("\n") && !value.contains("\t");
18641884
}
1885+
1886+
private String[] append(String[] array, String value) {
1887+
if (array == null) {
1888+
return new String[] {value};
1889+
}
1890+
String[] newArray = Arrays.copyOf(array, array.length + 1);
1891+
newArray[newArray.length - 1] = value;
1892+
return newArray;
1893+
}
18651894
}

clients/common/src/main/java/com/binance/connector/client/common/configuration/ClientConfiguration.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.binance.connector.client.common.dtos.TimeUnit;
44
import java.net.Proxy;
55
import java.util.Map;
6-
76
import okhttp3.Authenticator;
87
import okhttp3.CertificatePinner;
98

clients/common/src/main/java/com/binance/connector/client/common/websocket/adapter/ConnectionInterface.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22

33
import com.binance.connector.client.common.websocket.dtos.ApiRequestWrapperDTO;
44
import com.binance.connector.client.common.websocket.dtos.RequestWrapperDTO;
5+
import java.util.concurrent.BlockingQueue;
56

67
public interface ConnectionInterface {
78
void connect();
89

910
void send(ApiRequestWrapperDTO request) throws InterruptedException;
1011

12+
BlockingQueue<String> sendForStream(ApiRequestWrapperDTO request) throws InterruptedException;
13+
1114
void send(RequestWrapperDTO request) throws InterruptedException;
1215

1316
void setUserAgent(String userAgent);

clients/common/src/main/java/com/binance/connector/client/common/websocket/adapter/ConnectionWrapper.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.nio.channels.ClosedChannelException;
2929
import java.text.DecimalFormat;
3030
import java.time.Duration;
31+
import java.util.ArrayList;
3132
import java.util.Arrays;
3233
import java.util.Collection;
3334
import java.util.HashMap;
@@ -36,8 +37,10 @@
3637
import java.util.Timer;
3738
import java.util.TimerTask;
3839
import java.util.UUID;
40+
import java.util.concurrent.BlockingQueue;
3941
import java.util.concurrent.CompletableFuture;
4042
import java.util.concurrent.CountDownLatch;
43+
import java.util.concurrent.LinkedBlockingDeque;
4144
import java.util.concurrent.TimeUnit;
4245
import java.util.function.Consumer;
4346
import java.util.logging.Level;
@@ -87,6 +90,8 @@ public class ConnectionWrapper implements WebSocketListener, ConnectionInterface
8790

8891
private boolean pendingReconnect = false;
8992

93+
private List<BlockingQueue<String>> streamQueues = new ArrayList<>();
94+
9095
public ConnectionWrapper(WebSocketClientConfiguration configuration, Gson gson) {
9196
this(configuration, null, gson);
9297
}
@@ -349,6 +354,15 @@ public void send(ApiRequestWrapperDTO request) {
349354
innerSend(request);
350355
}
351356

357+
@Override
358+
public BlockingQueue<String> sendForStream(ApiRequestWrapperDTO request)
359+
throws InterruptedException {
360+
LinkedBlockingDeque<String> streamQueue = new LinkedBlockingDeque<>();
361+
streamQueues.add(streamQueue);
362+
send(request);
363+
return streamQueue;
364+
}
365+
352366
public void innerSend(RequestWrapperDTO requestWrapperDTO) {
353367
send(requestWrapperDTO);
354368
}
@@ -431,12 +445,18 @@ public void onWebSocketText(String message) {
431445
JsonObject obj = root.getAsJsonObject();
432446
JsonElement idElem = obj.get("id");
433447
String id = idElem == null ? null : idElem.getAsString();
448+
RequestWrapperDTO requestWrapperDTO = null;
449+
if (id != null) {
450+
requestWrapperDTO = pendingRequest.get(id);
451+
}
434452

435-
if (id == null) {
453+
if (requestWrapperDTO == null) {
454+
for (BlockingQueue<String> streamQueue : streamQueues) {
455+
JsonElement eventElem = obj.get("event");
456+
streamQueue.offer(eventElem != null ? eventElem.toString() : message);
457+
}
436458
return;
437459
}
438-
439-
RequestWrapperDTO requestWrapperDTO = pendingRequest.get(id);
440460
Type responseType = requestWrapperDTO.getResponseType();
441461

442462
Object responseResult = gson.fromJson(root, responseType);

clients/common/src/main/java/com/binance/connector/client/common/websocket/adapter/PoolConnectionWrapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.ListIterator;
1010
import java.util.Timer;
1111
import java.util.TimerTask;
12+
import java.util.concurrent.BlockingQueue;
1213

1314
public class PoolConnectionWrapper implements ConnectionInterface {
1415
private final LinkedList<ConnectionWrapper> connectionList = new LinkedList<>();
@@ -86,6 +87,12 @@ public void send(RequestWrapperDTO request) throws InterruptedException {
8687
getConnection().send(request);
8788
}
8889

90+
@Override
91+
public BlockingQueue<String> sendForStream(ApiRequestWrapperDTO request)
92+
throws InterruptedException {
93+
return getConnection().sendForStream(request);
94+
}
95+
8996
/**
9097
* @return the next connection from the pool, using round-robin
9198
*/
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.binance.connector.client.common.websocket.dtos;
2+
3+
import com.binance.connector.client.common.websocket.service.StreamBlockingQueueWrapper;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
public class StreamResponse<T, U> {
7+
private final CompletableFuture<T> response;
8+
private final StreamBlockingQueueWrapper<U> stream;
9+
10+
public StreamResponse(CompletableFuture<T> response, StreamBlockingQueueWrapper<U> stream) {
11+
this.response = response;
12+
this.stream = stream;
13+
}
14+
15+
public CompletableFuture<T> getResponse() {
16+
return response;
17+
}
18+
19+
public StreamBlockingQueueWrapper<U> getStream() {
20+
return stream;
21+
}
22+
}

clients/common/src/main/java/com/binance/connector/client/common/websocket/service/StreamBlockingQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ public class StreamBlockingQueue<T> implements BlockingQueue<T> {
1111
private final BlockingQueue<T> innerQueue;
1212
private final String operationId;
1313

14+
public StreamBlockingQueue(BlockingQueue<T> innerQueue) {
15+
this(innerQueue, "");
16+
}
17+
1418
public StreamBlockingQueue(BlockingQueue<T> innerQueue, String operationId) {
1519
this.innerQueue = innerQueue;
1620
this.operationId = operationId;
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
package com.binance.connector.client.common.websocket.service;
22

33
import com.binance.connector.client.common.JSON;
4+
import com.google.gson.Gson;
45
import com.google.gson.reflect.TypeToken;
56

67
public class StreamBlockingQueueWrapper<T> {
78
private final StreamBlockingQueue<String> innerQueue;
89
private final TypeToken<T> convertType;
10+
private final Gson gson;
911

1012
public StreamBlockingQueueWrapper(StreamBlockingQueue<String> innerQueue, TypeToken<T> type) {
13+
this(innerQueue, type, JSON.getGson());
14+
}
15+
16+
public StreamBlockingQueueWrapper(
17+
StreamBlockingQueue<String> innerQueue, TypeToken<T> type, Gson gson) {
1118
this.innerQueue = innerQueue;
1219
this.convertType = type;
20+
this.gson = gson;
1321
}
1422

1523
public StreamBlockingQueue<String> getInnerQueue() {
@@ -18,6 +26,6 @@ public StreamBlockingQueue<String> getInnerQueue() {
1826

1927
public T take() throws InterruptedException {
2028
String take = innerQueue.take();
21-
return JSON.getGson().fromJson(take, convertType);
29+
return gson.fromJson(take, convertType);
2230
}
2331
}

0 commit comments

Comments
 (0)