@@ -11,6 +11,8 @@ The above copyright notice and this permission notice shall be included in all c
1111*/
1212
1313using System . Diagnostics ;
14+ using System . Threading ;
15+ using Newtonsoft . Json ;
1416
1517namespace ExchangeSharp
1618{
@@ -24,7 +26,7 @@ namespace ExchangeSharp
2426 public sealed partial class ExchangePoloniexAPI : ExchangeAPI
2527 {
2628 public override string BaseUrl { get ; set ; } = "https://api.poloniex.com" ;
27- public override string BaseUrlWebSocket { get ; set ; } = "wss://api2 .poloniex.com" ;
29+ public override string BaseUrlWebSocket { get ; set ; } = "wss://ws .poloniex.com/ws " ;
2830
2931 private ExchangePoloniexAPI ( )
3032 {
@@ -222,18 +224,25 @@ private static IEnumerable<ExchangeOrderResult> ParseCompletedOrderDetails(JToke
222224
223225 private async Task < ExchangeTicker > ParseTickerWebSocketAsync ( string symbol , JToken token )
224226 {
225- /*
226- last: args[1],
227- lowestAsk: args[2],
228- highestBid: args[3],
229- percentChange: args[4],
230- baseVolume: args[5],
231- quoteVolume: args[6],
232- isFrozen: args[7],
233- high24hr: args[8],
234- low24hr: args[9]
235- */
236- return await this . ParseTickerAsync ( token , symbol , 2 , 3 , 1 , 5 , 6 ) ;
227+ // {
228+ // "symbol": "ETH_USDT",
229+ // "dailyChange": "0.9428",
230+ // "high": "507",
231+ // "amount": "20",
232+ // "quantity": "3",
233+ // "tradeCount": 11,
234+ // "low": "16",
235+ // "closeTime": 1634062351868,
236+ // "startTime": 1633996800000,
237+ // "close": "204",
238+ // "open": "105",
239+ // "ts": 1648052794867,
240+ // "markPrice": "205",
241+ // }
242+
243+ return await this . ParseTickerAsync ( token , symbol , askKey : null , bidKey : null , lastKey : "close" ,
244+ baseVolumeKey : "quantity" , quoteVolumeKey : "amount" , timestampKey : "ts" ,
245+ TimestampType . UnixMilliseconds ) ;
237246 }
238247
239248 public override string PeriodSecondsToString ( int seconds )
@@ -449,210 +458,47 @@ protected override async Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>
449458
450459 protected override async Task < IWebSocket > OnGetTickersWebSocketAsync (
451460 Action < IReadOnlyCollection < KeyValuePair < string , ExchangeTicker > > > callback ,
452- params string [ ] symbols )
453- {
454- Dictionary < string , string > idsToSymbols = new Dictionary < string , string > ( ) ;
455- return await ConnectPublicWebSocketAsync ( string . Empty , async ( _socket , msg ) =>
456- {
457- JToken token = JToken . Parse ( msg . ToStringFromUTF8 ( ) ) ;
458- if ( token [ 0 ] . ConvertInvariant < int > ( ) == 1002 )
461+ params string [ ] symbols ) =>
462+ await ConnectWebsocketPublicAsync (
463+ async ( socket ) => { await SubscribeToChannel ( socket , "ticker" , symbols ) ; } ,
464+ async ( socket , symbol , sArray , token ) =>
459465 {
460- if ( token is JArray outerArray && outerArray . Count > 2 && outerArray [ 2 ] is JArray array &&
461- array . Count > 9 &&
462- idsToSymbols . TryGetValue ( array [ 0 ] . ToStringInvariant ( ) , out string symbol ) )
466+ var tickers = new List < KeyValuePair < string , ExchangeTicker > >
463467 {
464- callback . Invoke ( new List < KeyValuePair < string , ExchangeTicker > >
465- {
466- new KeyValuePair < string , ExchangeTicker > ( symbol ,
467- await ParseTickerWebSocketAsync ( symbol , array ) )
468- } ) ;
469- }
470- }
471- } , async ( _socket ) =>
472- {
473- var tickers = await GetTickersAsync ( ) ;
474- foreach ( var ticker in tickers )
475- {
476- idsToSymbols [ ticker . Value . Id ] = ticker . Key ;
477- }
478-
479- // subscribe to ticker channel (1002)
480- await _socket . SendMessageAsync ( new { command = "subscribe" , channel = 1002 } ) ;
481- } ) ;
482- }
468+ new KeyValuePair < string , ExchangeTicker > ( symbol ,
469+ await this . ParseTickerWebSocketAsync ( symbol , token ) )
470+ } ;
471+ callback ( tickers ) ;
472+ } ) ;
483473
484474 protected override async Task < IWebSocket > OnGetTradesWebSocketAsync (
485475 Func < KeyValuePair < string , ExchangeTrade > , Task > callback ,
486- params string [ ] marketSymbols )
487- {
488- Dictionary < int , string > messageIdToSymbol = new Dictionary < int , string > ( ) ;
489- Dictionary < string , int > symbolToMessageId = new Dictionary < string , int > ( ) ;
490- var symMeta = await GetMarketSymbolsMetadataAsync ( ) ;
491- foreach ( var symbol in symMeta )
492- {
493- messageIdToSymbol . Add ( int . Parse ( symbol . MarketId ) , symbol . MarketSymbol ) ;
494- symbolToMessageId . Add ( symbol . MarketSymbol , int . Parse ( symbol . MarketId ) ) ;
495- }
496-
497- return await ConnectPublicWebSocketAsync ( string . Empty , async ( _socket , msg ) =>
498- {
499- JToken token = JToken . Parse ( msg . ToStringFromUTF8 ( ) ) ;
500- if ( token . Type == JTokenType . Object && token [ "error" ] != null )
501- throw new APIException ( $ "Exchange returned error: { token [ "error" ] . ToStringInvariant ( ) } ") ;
502- int msgId = token [ 0 ] . ConvertInvariant < int > ( ) ;
503-
504- if ( msgId == 1010 || token . Count ( ) == 2 ) // "[7,2]"
505- {
506- // this is a heartbeat message
507- return ;
508- }
509-
510- var seq = token [ 1 ] . ConvertInvariant < long > ( ) ;
511- var dataArray = token [ 2 ] ;
512- foreach ( var data in dataArray )
513- {
514- var dataType = data [ 0 ] . ToStringInvariant ( ) ;
515- if ( dataType == "i" )
516- {
517- // can also populate messageIdToSymbol from here
518- continue ;
519- }
520- else if ( dataType == "t" )
521- {
522- if ( messageIdToSymbol . TryGetValue ( msgId , out string symbol ) )
523- {
524- // 0 1 2 3 4 5 6
525- // ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>, "<epoch_ms>"]
526- ExchangeTrade trade = data . ParseTrade ( amountKey : 4 , priceKey : 3 , typeKey : 2 ,
527- timestampKey : 6 ,
528- timestampType : TimestampType . UnixMilliseconds , idKey : 1 , typeKeyIsBuyValue : "1" ) ;
529- await callback ( new KeyValuePair < string , ExchangeTrade > ( symbol , trade ) ) ;
530- }
531- }
532- else if ( dataType == "o" )
533- {
534- continue ;
535- }
536- else
537- {
538- continue ;
539- }
540- }
541- } , async ( _socket ) =>
542- {
543- IEnumerable < int > marketIDs = null ;
544- if ( marketSymbols == null || marketSymbols . Length == 0 )
545- {
546- marketIDs = messageIdToSymbol . Keys ;
547- }
548- else
476+ params string [ ] marketSymbols ) =>
477+ await ConnectWebsocketPublicAsync (
478+ async ( socket ) => { await SubscribeToChannel ( socket , "trades" , marketSymbols ) ; } ,
479+ async ( socket , symbol , sArray , token ) =>
549480 {
550- marketIDs = marketSymbols . Select ( s => symbolToMessageId [ s ] ) ;
551- }
552-
553- // subscribe to order book and trades channel for each symbol
554- foreach ( var id in marketIDs )
555- {
556- await _socket . SendMessageAsync ( new { command = "subscribe" , channel = id } ) ;
557- }
558- } ) ;
559- }
481+ var trade = token . ParseTrade ( amountKey : "quantity" , priceKey : "price" , typeKey : "takerSide" ,
482+ timestampKey : "ts" , TimestampType . UnixMilliseconds , idKey : "id" ) ;
483+ await callback ( new KeyValuePair < string , ExchangeTrade > ( symbol , trade ) ) ;
484+ } ) ;
560485
561486 protected override async Task < IWebSocket > OnGetDeltaOrderBookWebSocketAsync (
562487 Action < ExchangeOrderBook > callback ,
563488 int maxCount = 20 ,
564489 params string [ ] marketSymbols )
565490 {
566- Dictionary < int , Tuple < string , long > > messageIdToSymbol = new Dictionary < int , Tuple < string , long > > ( ) ;
567- return await ConnectPublicWebSocketAsync ( string . Empty , ( _socket , msg ) =>
568- {
569- JToken token = JToken . Parse ( msg . ToStringFromUTF8 ( ) ) ;
570- int msgId = token [ 0 ] . ConvertInvariant < int > ( ) ;
571-
572- //return if this is a heartbeat message
573- if ( msgId == 1010 )
574- {
575- return Task . CompletedTask ;
576- }
577-
578- var seq = token [ 1 ] . ConvertInvariant < long > ( ) ;
579- var dataArray = token [ 2 ] ;
580- ExchangeOrderBook book = new ExchangeOrderBook ( ) ;
581- foreach ( var data in dataArray )
491+ return await ConnectWebsocketPublicAsync (
492+ async ( socket ) =>
582493 {
583- var dataType = data [ 0 ] . ToStringInvariant ( ) ;
584- if ( dataType == "i" )
585- {
586- var marketInfo = data [ 1 ] ;
587- var market = marketInfo [ "currencyPair" ] . ToStringInvariant ( ) ;
588- messageIdToSymbol [ msgId ] = new Tuple < string , long > ( market , 0 ) ;
589-
590- // we are only returning the deltas, this would create a full order book which we don't want, but keeping it
591- // here for historical reference
592- /*
593- foreach (JProperty jprop in marketInfo["orderBook"][0].Cast<JProperty>())
594- {
595- var depth = new ExchangeOrderPrice
596- {
597- Price = jprop.Name.ConvertInvariant<decimal>(),
598- Amount = jprop.Value.ConvertInvariant<decimal>()
599- };
600- book.Asks[depth.Price] = depth;
601- }
602- foreach (JProperty jprop in marketInfo["orderBook"][1].Cast<JProperty>())
603- {
604- var depth = new ExchangeOrderPrice
605- {
606- Price = jprop.Name.ConvertInvariant<decimal>(),
607- Amount = jprop.Value.ConvertInvariant<decimal>()
608- };
609- book.Bids[depth.Price] = depth;
610- }
611- */
612- }
613- else if ( dataType == "o" )
614- {
615- //removes or modifies an existing item on the order books
616- if ( messageIdToSymbol . TryGetValue ( msgId , out Tuple < string , long > symbol ) )
617- {
618- int type = data [ 1 ] . ConvertInvariant < int > ( ) ;
619- var depth = new ExchangeOrderPrice
620- {
621- Price = data [ 2 ] . ConvertInvariant < decimal > ( ) ,
622- Amount = data [ 3 ] . ConvertInvariant < decimal > ( )
623- } ;
624- var list = ( type == 1 ? book . Bids : book . Asks ) ;
625- list [ depth . Price ] = depth ;
626- book . MarketSymbol = symbol . Item1 ;
627- book . SequenceId = symbol . Item2 + 1 ;
628- messageIdToSymbol [ msgId ] = new Tuple < string , long > ( book . MarketSymbol , book . SequenceId ) ;
629- }
630- }
631- else
632- {
633- continue ;
634- }
635- }
636-
637- if ( book != null && ( book . Asks . Count != 0 || book . Bids . Count != 0 ) )
494+ await SubscribeToOrderBookDepthChannel ( socket , marketSymbols , maxCount ) ;
495+ } , ( socket , symbol , sArray , token ) =>
638496 {
497+ var book = token . ParseOrderBookFromJTokenArrays ( ) ;
498+ book . MarketSymbol = symbol ;
639499 callback ( book ) ;
640- }
641-
642- return Task . CompletedTask ;
643- } , async ( _socket ) =>
644- {
645- if ( marketSymbols == null || marketSymbols . Length == 0 )
646- {
647- marketSymbols = ( await GetMarketSymbolsAsync ( ) ) . ToArray ( ) ;
648- }
649-
650- // subscribe to order book and trades channel for each symbol
651- foreach ( var sym in marketSymbols )
652- {
653- await _socket . SendMessageAsync ( new { command = "subscribe" , channel = NormalizeMarketSymbol ( sym ) } ) ;
654- }
655- } ) ;
500+ return Task . CompletedTask ;
501+ } ) ;
656502 }
657503
658504 protected override async Task < ExchangeOrderBook > OnGetOrderBookAsync ( string marketSymbol , int maxCount = 100 )
@@ -1112,6 +958,92 @@ private async Task<ExchangeDepositDetails> CreateDepositAddress(
1112958
1113959 return details ;
1114960 }
961+
962+ private Task < IWebSocket > ConnectWebsocketPublicAsync (
963+ Func < IWebSocket , Task > connected ,
964+ Func < IWebSocket , string , string [ ] , JToken , Task > callback )
965+ {
966+ Timer pingTimer = null ;
967+ return ConnectPublicWebSocketAsync (
968+ url : "/public" ,
969+ messageCallback : async ( socket , msg ) =>
970+ {
971+ var token = JToken . Parse ( msg . ToStringFromUTF8 ( ) ) ;
972+ var eventType = token [ "event" ] ? . ToStringInvariant ( ) ;
973+ if ( eventType != null )
974+ {
975+ if ( eventType != "error" ) return ;
976+ Logger . Info ( "Websocket unable to connect: " + token [ "msg" ] ? . ToStringInvariant ( ) ) ;
977+ return ;
978+ }
979+
980+ if ( token [ "data" ] == null ) return ;
981+
982+ foreach ( var d in token [ "data" ] )
983+ {
984+ await callback ( socket , d [ "symbol" ] ? . ToStringInvariant ( ) , null , d ) ;
985+ }
986+ } ,
987+ connectCallback : async ( socket ) =>
988+ {
989+ await connected ( socket ) ;
990+ pingTimer ??= new Timer (
991+ callback : async s =>
992+ await socket . SendMessageAsync (
993+ JsonConvert . SerializeObject ( new { Event = "ping" } , SerializerSettings ) ) ,
994+ null , 0 , 15000 ) ;
995+ } ,
996+ disconnectCallback : socket =>
997+ {
998+ pingTimer ? . Dispose ( ) ;
999+ pingTimer = null ;
1000+ return Task . CompletedTask ;
1001+ } ) ;
1002+ }
1003+
1004+ private static async Task SubscribeToChannel (
1005+ IWebSocket socket ,
1006+ string channel ,
1007+ string [ ] marketSymbols )
1008+ {
1009+ if ( marketSymbols . Length == 0 )
1010+ {
1011+ marketSymbols = new [ ] { "all" } ;
1012+ }
1013+
1014+ var payload = JsonConvert . SerializeObject ( new
1015+ {
1016+ Event = "subscribe" ,
1017+ Channel = new [ ] { channel } ,
1018+ Symbols = marketSymbols
1019+ } , SerializerSettings ) ;
1020+
1021+ await socket . SendMessageAsync ( payload ) ;
1022+ }
1023+
1024+ private async Task SubscribeToOrderBookDepthChannel (
1025+ IWebSocket socket ,
1026+ string [ ] marketSymbols ,
1027+ int depth = 20 )
1028+ {
1029+ var depthIsValid = depth == 5 || depth == 10 || depth == 20 ;
1030+ if ( ! depthIsValid )
1031+ throw new ArgumentOutOfRangeException ( nameof ( depth ) ) ;
1032+ if ( marketSymbols . Length == 0 )
1033+ {
1034+ marketSymbols = ( await OnGetMarketSymbolsAsync ( ) ) . ToArray ( ) ;
1035+ }
1036+
1037+ var payload = JsonConvert . SerializeObject ( new
1038+ {
1039+ Event = "subscribe" ,
1040+ Channel = new [ ] { "book" } ,
1041+ Symbols = marketSymbols ,
1042+ Depth = depth
1043+ } , SerializerSettings ) ;
1044+
1045+ await socket . SendMessageAsync ( payload ) ;
1046+ }
11151047 }
11161048
11171049 public partial class ExchangeName
0 commit comments