Skip to content

Commit 8450bd4

Browse files
committed
refactoring of coinapi-ws toolkit
1 parent fcd6cfb commit 8450bd4

File tree

3 files changed

+113
-104
lines changed

3 files changed

+113
-104
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@
77
**/target
88

99
# Visual Studo 2015 cache/options directory
10-
.vs/
10+
.vs/
11+
12+
**/appsettings.json

data-api/csharp-ws/CoinAPI.WebSocket.Stats.Console/Program.cs

Lines changed: 109 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Serilog;
77
using System.Diagnostics;
88
using System.Globalization;
9+
using System.Net;
910
using System.Text;
1011
using System.Xml;
1112
using System.Xml.Xsl;
@@ -118,65 +119,35 @@ public async Task MakeRequest([FromService] IConfiguration configuration, string
118119
using (var wsClient = new CoinApiWsClient(endpoint_name))
119120
{
120121
wsClient.SupressHeartbeat(supress_hb);
121-
int msgCount = 0;
122-
int errorCount = 0;
123122
LatencyType latencyType = Enum.GetValues<LatencyType>().FirstOrDefault(x => x.ToString() == latency_type);
124123

125-
void WsClient_Error(object? sender, Exception e)
126-
{
127-
Serilog.Log.Error(e, "Error in websocket client");
128-
errorCount++;
129-
}
130-
131124
wsClient.Error += WsClient_Error;
132125

133-
List<(DateTime, DateTime)> latencyList = new List<(DateTime, DateTime)>();
134-
135-
void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi)
136-
{
137-
msgCount++;
138-
if (time_coinapi.HasValue && time_exchange.HasValue)
139-
{
140-
switch(latencyType)
141-
{
142-
case LatencyType.nc:
143-
latencyList.Add((DateTime.UtcNow, time_coinapi.Value));
144-
break;
145-
case LatencyType.ne:
146-
latencyList.Add((DateTime.UtcNow, time_exchange.Value));
147-
break;
148-
case LatencyType.ce:
149-
latencyList.Add((time_coinapi.Value, time_exchange.Value));
150-
break;
151-
}
152-
}
153-
}
154-
155126
switch (subscribe_data_type)
156127
{
157128
case "book5":
158-
wsClient.OrderBook5Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
129+
wsClient.OrderBook5Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
159130
break;
160131
case "book20":
161-
wsClient.OrderBook20Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
132+
wsClient.OrderBook20Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
162133
break;
163134
case "book50":
164-
wsClient.OrderBook50Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
135+
wsClient.OrderBook50Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
165136
break;
166137
case "book":
167-
wsClient.OrderBookEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
138+
wsClient.OrderBookEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
168139
break;
169140
case "book_l":
170-
wsClient.OrderBookL3Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
141+
wsClient.OrderBookL3Event += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
171142
break;
172143
case "quote":
173-
wsClient.QuoteEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
144+
wsClient.QuoteEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
174145
break;
175146
case "trade":
176-
wsClient.TradeEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi); };
147+
wsClient.TradeEvent += (s, i) => { ProcessMsg(i.time_exchange, i.time_coinapi, latencyType); };
177148
break;
178149
case "ohlcv":
179-
wsClient.OHLCVEvent += (s, i) => { ProcessMsg(null, null); };
150+
wsClient.OHLCVEvent += (s, i) => { ProcessMsg(null, null, latencyType); };
180151
break;
181152
case "exrate":
182153
wsClient.ExchangeRateEvent += (s, i) => { msgCount++; };
@@ -194,91 +165,126 @@ void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi)
194165
};
195166
wsClient.SendHelloMessage(hello);
196167

197-
Task.Run(async () =>
198-
{
199-
if (!wsClient.ConnectedEvent.WaitOne(10000)) return;
168+
_ = PrintingTaskLoopAsync(wsClient, endpoint_name, subscribe_data_type, asset, symbol, exchange,latency_type);
200169

201-
var iterations = 0;
202-
Serilog.Log.Information($"Time: {DateTime.UtcNow}");
203-
var strbld = new StringBuilder();
170+
await Task.Run(() => Console.ReadKey());
171+
}
172+
}
173+
174+
private ulong errorCount = 0;
175+
void WsClient_Error(object? sender, Exception e)
176+
{
177+
Serilog.Log.Error(e, "Error in websocket client");
178+
errorCount++;
179+
}
180+
181+
private ulong msgCount = 0;
182+
private readonly List<(DateTime, DateTime)> latencyList = new List<(DateTime, DateTime)>();
183+
private void ProcessMsg(DateTime? time_exchange, DateTime? time_coinapi, LatencyType latencyType)
184+
{
185+
msgCount++;
186+
if (time_coinapi.HasValue && time_exchange.HasValue)
187+
{
188+
switch (latencyType)
189+
{
190+
case LatencyType.nc:
191+
latencyList.Add((DateTime.UtcNow, time_coinapi.Value));
192+
break;
193+
case LatencyType.ne:
194+
latencyList.Add((DateTime.UtcNow, time_exchange.Value));
195+
break;
196+
case LatencyType.ce:
197+
latencyList.Add((time_coinapi.Value, time_exchange.Value));
198+
break;
199+
}
200+
}
201+
}
204202

205-
strbld.Append($"Subscribed to: subscribe_data_type = {subscribe_data_type}");
206-
if (!string.IsNullOrWhiteSpace(exchange))
207-
{
208-
strbld.Append($", exchange = {exchange}");
209-
}
210-
if (!string.IsNullOrWhiteSpace(asset))
211-
{
212-
strbld.Append($", asset = {asset}");
213-
}
214-
if (!string.IsNullOrWhiteSpace(symbol))
215-
{
216-
strbld.Append($", symbol = {symbol}");
217-
}
218-
strbld.Append($", supress_hb = {supress_hb}");
219-
strbld.Append($", latency_type = {latency_type}");
203+
private async Task PrintingTaskLoopAsync(CoinApiWsClient wsClient,
204+
string endpoint_name, string subscribe_data_type, string asset,
205+
string symbol, string exchange, string latency_type)
206+
{
207+
var iterations = 0;
208+
Serilog.Log.Information($"Time: {DateTime.UtcNow}");
209+
var strbld = new StringBuilder();
220210

221-
Serilog.Log.Information(strbld.ToString());
211+
strbld.Append($"Subscribed to: subscribe_data_type = {subscribe_data_type}");
212+
if (!string.IsNullOrWhiteSpace(exchange))
213+
{
214+
strbld.Append($", exchange = {exchange}");
215+
}
216+
if (!string.IsNullOrWhiteSpace(asset))
217+
{
218+
strbld.Append($", asset = {asset}");
219+
}
220+
if (!string.IsNullOrWhiteSpace(symbol))
221+
{
222+
strbld.Append($", symbol = {symbol}");
223+
}
224+
strbld.Append($", latency_type = {latency_type}");
222225

223-
var process = Process.GetCurrentProcess();
226+
Serilog.Log.Information(strbld.ToString());
224227

225-
while (true)
226-
{
227-
strbld.Clear();
228+
var process = Process.GetCurrentProcess();
228229

229-
if (iterations % 10 == 0)
230-
{
231-
Serilog.Log.Information($"Endpoint {endpoint_name}, {iterations} iterations, {msgCount} messages received, {wsClient.TotalBytesReceived} bytes received, Error count {errorCount}");
232-
}
233-
iterations++;
230+
while (true)
231+
{
232+
if (!wsClient.IsConnected)
233+
{
234+
Log.Warning("Websocket is not connected.");
235+
await Task.Delay(1000);
236+
continue;
237+
}
238+
strbld.Clear();
234239

235-
var msgCountPrev = msgCount;
236-
var totalBytesReceivedPrev = wsClient.TotalBytesReceived;
240+
if (iterations % 10 == 0)
241+
{
242+
Serilog.Log.Information($"Endpoint {endpoint_name}, {iterations} iterations, {msgCount} messages received, {wsClient.TotalBytesReceived} bytes received, Error count {errorCount}");
243+
}
244+
iterations++;
237245

238-
(TimeSpan cpuWaiting, TimeSpan cpuParsing, TimeSpan cpuHandling) cpuUsagePrev
239-
= (wsClient.TotalWaitTime, wsClient.TotalParseTime, wsClient.TotalHandleTime);
246+
var msgCountPrev = msgCount;
247+
var totalBytesReceivedPrev = wsClient.TotalBytesReceived;
240248

241-
//TimeSpan totalCpuTimePrev = process.TotalProcessorTime;
249+
(TimeSpan cpuWaiting, TimeSpan cpuParsing, TimeSpan cpuHandling) cpuUsagePrev
250+
= (wsClient.TotalWaitTime, wsClient.TotalParseTime, wsClient.TotalHandleTime);
242251

243-
await Task.Delay(1000);
244-
(TimeSpan cpuWaiting, TimeSpan cpuParsing, TimeSpan cpuHandling) cpuUsage
245-
= (wsClient.TotalWaitTime, wsClient.TotalParseTime, wsClient.TotalHandleTime);
252+
//TimeSpan totalCpuTimePrev = process.TotalProcessorTime;
246253

247-
//TimeSpan totalCpuTime = process.TotalProcessorTime;
248-
var deltaCpuWaiting = cpuUsage.cpuWaiting - cpuUsagePrev.cpuWaiting;
249-
var deltaCpuParsing = cpuUsage.cpuParsing - cpuUsagePrev.cpuParsing;
250-
var deltaCpuHandling = cpuUsage.cpuHandling - cpuUsagePrev.cpuHandling;
251-
//var deltaCpuTime = totalCpuTime - totalCpuTimePrev;
252-
var deltaCpuTime = deltaCpuWaiting + deltaCpuParsing + deltaCpuHandling;
254+
await Task.Delay(1000);
255+
(TimeSpan cpuWaiting, TimeSpan cpuParsing, TimeSpan cpuHandling) cpuUsage
256+
= (wsClient.TotalWaitTime, wsClient.TotalParseTime, wsClient.TotalHandleTime);
253257

258+
//TimeSpan totalCpuTime = process.TotalProcessorTime;
259+
var deltaCpuWaiting = cpuUsage.cpuWaiting - cpuUsagePrev.cpuWaiting;
260+
var deltaCpuParsing = cpuUsage.cpuParsing - cpuUsagePrev.cpuParsing;
261+
var deltaCpuHandling = cpuUsage.cpuHandling - cpuUsagePrev.cpuHandling;
262+
//var deltaCpuTime = totalCpuTime - totalCpuTimePrev;
263+
var deltaCpuTime = deltaCpuWaiting + deltaCpuParsing + deltaCpuHandling;
254264

255-
var cpuWaitingPercent = 100 * deltaCpuWaiting.TotalMilliseconds / deltaCpuTime.TotalMilliseconds;
256-
var cpuParsingPercent = 100 * deltaCpuParsing.TotalMilliseconds / deltaCpuTime.TotalMilliseconds;
257-
var cpuHandlingPercent = 100 * deltaCpuHandling.TotalMilliseconds / deltaCpuTime.TotalMilliseconds;
258265

266+
var cpuWaitingPercent = 100 * deltaCpuWaiting.TotalMilliseconds / deltaCpuTime.TotalMilliseconds;
267+
var cpuParsingPercent = 100 * deltaCpuParsing.TotalMilliseconds / deltaCpuTime.TotalMilliseconds;
268+
var cpuHandlingPercent = 100 * deltaCpuHandling.TotalMilliseconds / deltaCpuTime.TotalMilliseconds;
259269

260-
var msgCountOnInterval = msgCount - msgCountPrev;
261-
var bytesCountOnInterval = wsClient.TotalBytesReceived - totalBytesReceivedPrev;
262-
var latencies = latencyList.Select(x => x.Item1 - x.Item2).ToList();
263-
latencyList.Clear();
264270

271+
var msgCountOnInterval = msgCount - msgCountPrev;
272+
var bytesCountOnInterval = wsClient.TotalBytesReceived - totalBytesReceivedPrev;
273+
var latencies = latencyList.Select(x => x.Item1 - x.Item2).ToList();
274+
latencyList.Clear();
265275

266-
strbld.AppendFormat($"Messages: {msgCountOnInterval,-8}");
267-
strbld.AppendFormat($"| Recv bytes: {bytesCountOnInterval,-8}");
268-
strbld.Append($"| CPU: wait: {cpuWaitingPercent:F2}% | parse: {cpuParsingPercent:F2}% | process: {cpuHandlingPercent:F2}%");
269276

270-
if (latencies.Any())
271-
{
272-
strbld.AppendFormat($" | Latency min: {latencies.Min().TotalMilliseconds,-8}ms");
273-
strbld.AppendFormat($" | max: {latencies.Max().TotalMilliseconds,-8}ms");
274-
}
277+
strbld.AppendFormat($"Messages: {msgCountOnInterval,-8}");
278+
strbld.AppendFormat($"| Recv bytes: {bytesCountOnInterval,-8}");
279+
strbld.Append($"| CPU: wait: {cpuWaitingPercent:F2}% | parse: {cpuParsingPercent:F2}% | process: {cpuHandlingPercent:F2}%");
275280

276-
Serilog.Log.Information(strbld.ToString());
277-
}
278-
}
279-
);
281+
if (latencies.Any())
282+
{
283+
strbld.AppendFormat($" | Latency min: {latencies.Min().TotalMilliseconds,-8}ms");
284+
strbld.AppendFormat($" | max: {latencies.Max().TotalMilliseconds,-8}ms");
285+
}
280286

281-
await Task.Run(() => Console.ReadKey());
287+
Serilog.Log.Information(strbld.ToString());
282288
}
283289
}
284290

data-api/csharp-ws/CoinAPI.WebSocket.V1/CoinApiWsClient.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class CoinApiWsClient : ICoinApiWsClient, IDisposable
3939
public long UnprocessedMessagesQueueSize => _queueThread.QueueSize;
4040
public event EventHandler<Exception> Error;
4141
public AutoResetEvent ConnectedEvent { get; } = new AutoResetEvent(false);
42+
public bool IsConnected => _client?.State == WebSocketState.Open;
4243
public DateTime? ConnectedTime { get; private set; }
4344
public ulong TotalBytesReceived { get; private set; }
4445
public TimeSpan TotalWaitTime => _waitStopwatch.Elapsed;

0 commit comments

Comments
 (0)