Skip to content

Commit d22113f

Browse files
authored
Merge pull request #5243 from rizer1980/PR/binance-chinees-fix
[binance] chinees pairs fix
2 parents a35eeb8 + 016e8fc commit d22113f

File tree

4 files changed

+108
-121
lines changed

4 files changed

+108
-121
lines changed

xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import info.bitrich.xchangestream.util.Events;
99
import io.reactivex.rxjava3.core.Completable;
1010
import io.reactivex.rxjava3.core.Observable;
11+
import java.net.URLEncoder;
12+
import java.nio.charset.StandardCharsets;
1113
import org.apache.commons.lang3.StringUtils;
1214
import org.knowm.xchange.binance.BinanceAuthenticated;
1315
import org.knowm.xchange.binance.BinanceExchange;
@@ -29,9 +31,9 @@
2931
public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange {
3032

3133
private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingExchange.class);
32-
private static final String WS_API_BASE_URI = "wss://stream.binance.com:9443/";
34+
private static final String WS_API_BASE_URI = "wss://stream.binance.com/";
3335
private static final String WS_TRADE_API_BASE_URI = "wss://ws-api.binance.com:443/ws-api/v3";
34-
private static final String WS_SANDBOX_API_BASE_URI = "wss://stream.testnet.binance.vision:9443/";
36+
private static final String WS_SANDBOX_API_BASE_URI = "wss://stream.testnet.binance.vision/";
3537
private static final String WS_SANDBOX_TRADE_API_BASE_URI =
3638
"wss://ws-api.testnet.binance.vision/ws-api/v3";
3739
public static final String USE_HIGHER_UPDATE_FREQUENCY = "Binance_Orderbook_Use_Higher_Frequency";
@@ -271,10 +273,11 @@ public BinanceStreamingTradeService getStreamingTradeService() {
271273

272274
protected BinanceStreamingService createStreamingService(
273275
ProductSubscription subscription, KlineSubscription klineSubscription) {
276+
// new chinese pair, like 币安人生usdt, need urlEncode
274277
String path =
275278
getStreamingBaseUri()
276279
+ "stream?streams="
277-
+ buildSubscriptionStreams(subscription, klineSubscription);
280+
+ URLEncoder.encode(buildSubscriptionStreams(subscription, klineSubscription), StandardCharsets.UTF_8);
278281

279282
BinanceStreamingService streamingService =
280283
new BinanceStreamingService(path, subscription, klineSubscription);

xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,6 @@ public void sendMessage(String message) {
192192
// sent.
193193
}
194194

195-
@Override
196-
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
197-
return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE;
198-
}
199-
200195
/**
201196
* The available subscriptions for this streaming service.
202197
*

xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceUserDataStreamingService.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import com.fasterxml.jackson.databind.JsonNode;
44
import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction.BinanceWebSocketTypes;
55
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
6-
import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
7-
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
86
import io.reactivex.rxjava3.core.Observable;
97
import org.slf4j.Logger;
108
import org.slf4j.LoggerFactory;
@@ -58,11 +56,6 @@ public String getUnsubscribeMessage(String channelName, Object... args) {
5856
return null;
5957
}
6058

61-
@Override
62-
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
63-
return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE;
64-
}
65-
6659
@Override
6760
public void sendMessage(String message) {
6861
// Subscriptions are made upon connection - no messages are sent.
Lines changed: 102 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,32 @@
11
package info.bitrich.xchangestream.binance;
22

3+
import static info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderCancelAndReplacePayload.CancelReplaceMode.STOP_ON_FAILURE;
4+
35
import com.fasterxml.jackson.core.type.TypeReference;
46
import com.fasterxml.jackson.databind.JsonNode;
57
import com.fasterxml.jackson.databind.ObjectMapper;
6-
import info.bitrich.xchangestream.binance.dto.trade.*;
8+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketLoginPayloadWithSignature;
9+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketLoginResponse;
10+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderAmendPayload;
11+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderCancelAndReplacePayload;
12+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderCancelPayload;
13+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderResponse;
14+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketPayload;
15+
import info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketPlaceOrderPayload;
716
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
817
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
9-
import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
10-
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
1118
import io.reactivex.rxjava3.core.Completable;
1219
import io.reactivex.rxjava3.core.CompletableSource;
1320
import io.reactivex.rxjava3.core.Observable;
1421
import io.reactivex.rxjava3.disposables.CompositeDisposable;
1522
import io.reactivex.rxjava3.disposables.Disposable;
23+
import java.io.IOException;
24+
import java.nio.charset.Charset;
25+
import java.nio.charset.StandardCharsets;
26+
import java.security.Security;
27+
import java.security.spec.PKCS8EncodedKeySpec;
28+
import java.util.Base64;
29+
import java.util.regex.Pattern;
1630
import lombok.Getter;
1731
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
1832
import org.bouncycastle.crypto.Signer;
@@ -30,26 +44,17 @@
3044
import org.slf4j.Logger;
3145
import org.slf4j.LoggerFactory;
3246

33-
import java.io.IOException;
34-
import java.nio.charset.Charset;
35-
import java.nio.charset.StandardCharsets;
36-
import java.security.Security;
37-
import java.security.spec.PKCS8EncodedKeySpec;
38-
import java.util.Base64;
39-
import java.util.regex.Pattern;
40-
41-
import static info.bitrich.xchangestream.binance.dto.trade.BinanceWebsocketOrderCancelAndReplacePayload.CancelReplaceMode.STOP_ON_FAILURE;
42-
4347
public class BinanceUserTradeStreamingService extends JsonNettyStreamingService {
4448

4549
private static final Logger LOG = LoggerFactory.getLogger(BinanceUserTradeStreamingService.class);
4650
private static final Pattern p = Pattern.compile("[a-z.]+|\\d+");
47-
CompositeDisposable compositeDisposable = new CompositeDisposable();
48-
@Getter private boolean authorized = false;
49-
private String signature = "";
50-
Charset charSet = StandardCharsets.UTF_8;
5151
private final String apiKey;
5252
private final String privateKey;
53+
CompositeDisposable compositeDisposable = new CompositeDisposable();
54+
Charset charSet = StandardCharsets.UTF_8;
55+
@Getter
56+
private boolean authorized = false;
57+
private String signature = "";
5358
private Disposable loginDisposable;
5459

5560
public BinanceUserTradeStreamingService(String apiUrl, String apiKey, String privateKey) {
@@ -95,7 +100,8 @@ public void login() {
95100
.flatMap(
96101
node -> {
97102
TypeReference<BinanceWebsocketOrderResponse<BinanceWebsocketLoginResponse>>
98-
typeReference = new TypeReference<>() {};
103+
typeReference = new TypeReference<>() {
104+
};
99105
BinanceWebsocketOrderResponse<BinanceWebsocketLoginResponse> response =
100106
mapper.treeToValue(node, typeReference);
101107
if (response.getStatus() == 200) {
@@ -151,102 +157,92 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException
151157
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
152158
String method = args[0].toString();
153159
switch (method) {
154-
case "session.logon":
155-
{ // login
156-
long timestamp = System.currentTimeMillis();
157-
try {
158-
String loginPayload = "apiKey=" + apiKey + "&timestamp=" + timestamp;
159-
signature = signPayload(loginPayload);
160-
BinanceWebsocketLoginPayloadWithSignature loginPayloadWithSignature =
161-
new BinanceWebsocketLoginPayloadWithSignature(apiKey, signature, timestamp);
162-
BinanceWebsocketPayload<BinanceWebsocketLoginPayloadWithSignature> payload =
163-
new BinanceWebsocketPayload<>(
164-
channelName, "session.logon", loginPayloadWithSignature);
165-
return objectMapper.writeValueAsString(payload);
166-
} catch (Exception e) {
167-
throw new RuntimeException(e);
168-
}
169-
}
170-
case "order.place":
171-
{
172-
BinanceWebsocketPlaceOrderPayload orderPayload = null;
173-
if (args[1] instanceof MarketOrder) {
174-
MarketOrder marketOrder = (MarketOrder) args[1];
175-
orderPayload = BinanceStreamingAdapters.adaptPlaceOrder(marketOrder);
176-
} else if (args[1] instanceof LimitOrder) {
177-
LimitOrder limitOrder = (LimitOrder) args[1];
178-
orderPayload = BinanceStreamingAdapters.adaptPlaceOrder(limitOrder);
179-
}
180-
assert orderPayload != null;
181-
BinanceWebsocketPayload<BinanceWebsocketPlaceOrderPayload> payload =
182-
new BinanceWebsocketPayload<>(channelName, method, orderPayload);
160+
case "session.logon": { // login
161+
long timestamp = System.currentTimeMillis();
162+
try {
163+
String loginPayload = "apiKey=" + apiKey + "&timestamp=" + timestamp;
164+
signature = signPayload(loginPayload);
165+
BinanceWebsocketLoginPayloadWithSignature loginPayloadWithSignature =
166+
new BinanceWebsocketLoginPayloadWithSignature(apiKey, signature, timestamp);
167+
BinanceWebsocketPayload<BinanceWebsocketLoginPayloadWithSignature> payload =
168+
new BinanceWebsocketPayload<>(
169+
channelName, "session.logon", loginPayloadWithSignature);
183170
return objectMapper.writeValueAsString(payload);
171+
} catch (Exception e) {
172+
throw new RuntimeException(e);
184173
}
185-
case "order.modify":
186-
{
174+
}
175+
case "order.place": {
176+
BinanceWebsocketPlaceOrderPayload orderPayload = null;
177+
if (args[1] instanceof MarketOrder) {
178+
MarketOrder marketOrder = (MarketOrder) args[1];
179+
orderPayload = BinanceStreamingAdapters.adaptPlaceOrder(marketOrder);
180+
} else if (args[1] instanceof LimitOrder) {
187181
LimitOrder limitOrder = (LimitOrder) args[1];
188-
BinanceWebsocketOrderAmendPayload amendOrderPayload =
189-
BinanceStreamingAdapters.adaptAmendOrder(limitOrder);
190-
assert amendOrderPayload != null;
191-
BinanceWebsocketPayload<BinanceWebsocketOrderAmendPayload> payload =
192-
new BinanceWebsocketPayload<>(channelName, method, amendOrderPayload);
193-
return objectMapper.writeValueAsString(payload);
182+
orderPayload = BinanceStreamingAdapters.adaptPlaceOrder(limitOrder);
194183
}
195-
case "order.cancel":
196-
{
197-
BinanceCancelOrderParams params = (BinanceCancelOrderParams) args[1];
198-
Long orderId = null;
199-
if (params.getOrderId() != null && !params.getOrderId().isEmpty()) {
200-
orderId = Long.valueOf(params.getOrderId());
201-
}
202-
BinanceWebsocketOrderCancelPayload cancelOrderPayload =
203-
BinanceWebsocketOrderCancelPayload.builder()
204-
.symbol(BinanceAdapters.toSymbol(params.getInstrument()))
205-
.orderId(orderId)
206-
.origClientOrderId(params.getUserReference())
207-
.newClientOrderId(params.getUserReference())
208-
.timestamp(System.currentTimeMillis())
209-
.build();
210-
BinanceWebsocketPayload<BinanceWebsocketOrderCancelPayload> payload =
211-
new BinanceWebsocketPayload<>(channelName, method, cancelOrderPayload);
212-
return objectMapper.writeValueAsString(payload);
184+
assert orderPayload != null;
185+
BinanceWebsocketPayload<BinanceWebsocketPlaceOrderPayload> payload =
186+
new BinanceWebsocketPayload<>(channelName, method, orderPayload);
187+
return objectMapper.writeValueAsString(payload);
188+
}
189+
case "order.modify": {
190+
LimitOrder limitOrder = (LimitOrder) args[1];
191+
BinanceWebsocketOrderAmendPayload amendOrderPayload =
192+
BinanceStreamingAdapters.adaptAmendOrder(limitOrder);
193+
assert amendOrderPayload != null;
194+
BinanceWebsocketPayload<BinanceWebsocketOrderAmendPayload> payload =
195+
new BinanceWebsocketPayload<>(channelName, method, amendOrderPayload);
196+
return objectMapper.writeValueAsString(payload);
197+
}
198+
case "order.cancel": {
199+
BinanceCancelOrderParams params = (BinanceCancelOrderParams) args[1];
200+
Long orderId = null;
201+
if (params.getOrderId() != null && !params.getOrderId().isEmpty()) {
202+
orderId = Long.valueOf(params.getOrderId());
213203
}
214-
case "order.cancelReplace":
215-
{
216-
LimitOrder limitOrder = (LimitOrder) args[1];
217-
BinanceCancelOrderParams params = (BinanceCancelOrderParams) args[2];
218-
Long cancelOrderId = null;
219-
if (params.getOrderId() != null && !params.getOrderId().isEmpty()) {
220-
cancelOrderId = Long.valueOf(params.getOrderId());
221-
}
222-
TimeInForce tif =
223-
BinanceAdapters.getOrderFlag(limitOrder, TimeInForce.class).orElse(TimeInForce.GTC);
224-
BinanceWebsocketOrderCancelAndReplacePayload orderCancelAndReplacePayload =
225-
BinanceWebsocketOrderCancelAndReplacePayload.builder()
226-
.symbol(BinanceAdapters.toSymbol(params.getInstrument()))
227-
.cancelOrderId(cancelOrderId)
228-
.cancelOrigClientOrderId(params.getUserReference())
229-
.symbol(BinanceAdapters.toSymbol(limitOrder.getInstrument()))
230-
.side(BinanceAdapters.convert(limitOrder.getType()))
231-
.newClientOrderId(limitOrder.getUserReference())
232-
.type(OrderType.LIMIT)
233-
.price(limitOrder.getLimitPrice())
234-
.quantity(limitOrder.getOriginalAmount())
235-
.timeInForce(tif)
236-
.cancelReplaceMode(STOP_ON_FAILURE)
237-
.timestamp(System.currentTimeMillis())
238-
.build();
239-
BinanceWebsocketPayload<BinanceWebsocketOrderCancelAndReplacePayload> payload =
240-
new BinanceWebsocketPayload<>(channelName, method, orderCancelAndReplacePayload);
241-
return objectMapper.writeValueAsString(payload);
204+
BinanceWebsocketOrderCancelPayload cancelOrderPayload =
205+
BinanceWebsocketOrderCancelPayload.builder()
206+
.symbol(BinanceAdapters.toSymbol(params.getInstrument()))
207+
.orderId(orderId)
208+
.origClientOrderId(params.getUserReference())
209+
.newClientOrderId(params.getUserReference())
210+
.timestamp(System.currentTimeMillis())
211+
.build();
212+
BinanceWebsocketPayload<BinanceWebsocketOrderCancelPayload> payload =
213+
new BinanceWebsocketPayload<>(channelName, method, cancelOrderPayload);
214+
return objectMapper.writeValueAsString(payload);
215+
}
216+
case "order.cancelReplace": {
217+
LimitOrder limitOrder = (LimitOrder) args[1];
218+
BinanceCancelOrderParams params = (BinanceCancelOrderParams) args[2];
219+
Long cancelOrderId = null;
220+
if (params.getOrderId() != null && !params.getOrderId().isEmpty()) {
221+
cancelOrderId = Long.valueOf(params.getOrderId());
242222
}
223+
TimeInForce tif =
224+
BinanceAdapters.getOrderFlag(limitOrder, TimeInForce.class).orElse(TimeInForce.GTC);
225+
BinanceWebsocketOrderCancelAndReplacePayload orderCancelAndReplacePayload =
226+
BinanceWebsocketOrderCancelAndReplacePayload.builder()
227+
.symbol(BinanceAdapters.toSymbol(params.getInstrument()))
228+
.cancelOrderId(cancelOrderId)
229+
.cancelOrigClientOrderId(params.getUserReference())
230+
.symbol(BinanceAdapters.toSymbol(limitOrder.getInstrument()))
231+
.side(BinanceAdapters.convert(limitOrder.getType()))
232+
.newClientOrderId(limitOrder.getUserReference())
233+
.type(OrderType.LIMIT)
234+
.price(limitOrder.getLimitPrice())
235+
.quantity(limitOrder.getOriginalAmount())
236+
.timeInForce(tif)
237+
.cancelReplaceMode(STOP_ON_FAILURE)
238+
.timestamp(System.currentTimeMillis())
239+
.build();
240+
BinanceWebsocketPayload<BinanceWebsocketOrderCancelAndReplacePayload> payload =
241+
new BinanceWebsocketPayload<>(channelName, method, orderCancelAndReplacePayload);
242+
return objectMapper.writeValueAsString(payload);
243+
}
243244
default:
244245
return null;
245246
}
246247
}
247-
248-
@Override
249-
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
250-
return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE;
251-
}
252248
}

0 commit comments

Comments
 (0)