Skip to content

Commit 9cfd171

Browse files
authored
[c#/beetlex] Optimized actions (#6228)
* update beetlex 1.4.3 update beetlex 1.4.3 * docker add COMPlus_ReadyToRun variable update beetlex * update beetlex, enabled thread queue * beetlex framework add db and queries cases * add db code * change result json data * update query url * beetlex framework add fortunes cases * change Content-Type * add beetlex core cases * fix queries cases * update config * change try readline * update benchmark config * Update README.md * Update README.md * change versus property * beetlex-core update .net core to v3.0 * change beetlex-core project file * beetlex update raw db class * beetlex update raw db * beetlex debug plaintext * change debug docker file * update beetlex to 1.4.0 * update * beetlex update core 3.1 * [c#/beetlex] add updates cases * [c#/beetlex] change Server: TFB, change custom connection pool, add update docker * fix errors * change pool init * change connection pool maxsize * fix fortunes errors * clear DBRaw _connectionString value. * [c#beetlex] change update dbconnection pool size * [c#/beetlex] udpate spanjson to v3.0.1, Npgsql v5.0.0 * [c#/beetlex] add caching sample * set connectionstring multiplexing * remove connection multiplexing setting * [c#/beetlex]change NpgsqlParameter to NpgsqlParameter<T> * [c#/beetlex] update dbraw * [c#/beetlex] change connection string * [c#/beetlex] add fortunes cases to core-updb * update beetlex 1.5.6 * update 5.0.0-alpha1 * update docker file * Enabled IOQueues * Set IOQueues debug mode * update * [c#/beetlex] udpate to v1.6.0.1-beta * update pg drive * [c#/beetlex] update to beetlex v1.6.3 and support pipelining * set options * [c#/beetlex] Optimized actions
1 parent 0d82ec9 commit 9cfd171

File tree

5 files changed

+126
-73
lines changed

5 files changed

+126
-73
lines changed

frameworks/CSharp/beetlex/PlatformBenchmarks/HttpHandler.cs

Lines changed: 97 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ public partial class HttpHandler : ServerHandlerBase
5151

5252
public HttpHandler()
5353
{
54-
54+
RequestDispatchs = new BeetleX.Dispatchs.DispatchCenter<HttpToken>(OnRequest, Math.Min(Environment.ProcessorCount, 16));
5555
}
5656

57-
public Task Default(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
57+
private BeetleX.Dispatchs.DispatchCenter<HttpToken> RequestDispatchs;
58+
59+
public Task Default(PipeStream stream, HttpToken token, ISession session)
5860
{
5961
stream.Write("<b> beetlex server</b><hr/>");
60-
stream.Write($"{Encoding.ASCII.GetString(url)} not found!");
62+
stream.Write("path not found!");
6163
OnCompleted(stream, session, token);
6264
return Task.CompletedTask;
6365
}
@@ -67,6 +69,8 @@ public override void Connected(IServer server, ConnectedEventArgs e)
6769
base.Connected(server, e);
6870
e.Session.Socket.NoDelay = true;
6971
var token = new HttpToken();
72+
token.ThreadDispatcher = RequestDispatchs.Next();
73+
token.Session = e.Session;
7074
token.Db = new RawDb(new ConcurrentRandom(), Npgsql.NpgsqlFactory.Instance);
7175
e.Session.Tag = token;
7276
}
@@ -82,6 +86,14 @@ private int AnalysisUrl(ReadOnlySpan<byte> url)
8286

8387
}
8488

89+
private void OnRequest(HttpToken token)
90+
{
91+
if (token.Requests.TryDequeue(out RequestData result))
92+
{
93+
OnStartRequest(result, token.Session, token, token.Session.Stream.ToPipeStream());
94+
}
95+
}
96+
8597
public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
8698
{
8799
base.SessionReceive(server, e);
@@ -96,17 +108,34 @@ public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
96108
{
97109
token.Requests.Enqueue(token.CurrentRequest);
98110
token.CurrentRequest = null;
111+
token.ThreadDispatcher.Enqueue(token);
99112
}
100113
pipeStream.ReadFree(result.Length);
101114
}
102115
else
103116
{
104117
if (token.CurrentRequest == null)
105118
{
106-
token.CurrentRequest = new RequestData();
107-
var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(result.Length);
119+
var request = new RequestData();
120+
121+
byte[] buffer = null;
122+
if (Program.Debug)
123+
buffer = new byte[result.Length];
124+
else
125+
buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(result.Length);
108126
pipeStream.Read(buffer, 0, result.Length);
109-
token.CurrentRequest.Data = new ArraySegment<byte>(buffer, 0, result.Length);
127+
request.Data = new ArraySegment<byte>(buffer, 0, result.Length);
128+
AnalysisAction(request);
129+
if (request.Action == ActionType.Plaintext)
130+
{
131+
token.CurrentRequest = request;
132+
}
133+
else
134+
{
135+
pipeStream.ReadFree((int)pipeStream.Length);
136+
OnStartRequest(request, e.Session, token, pipeStream);
137+
return;
138+
}
110139
}
111140
else
112141
{
@@ -118,41 +147,9 @@ public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
118147
else
119148
break;
120149
}
121-
if (pipeStream.Length == 0 && token.CurrentRequest == null)
122-
{
123-
ProcessReqeusts(token, pipeStream, e.Session);
124-
}
125-
}
126-
127-
private async Task ProcessReqeusts(HttpToken token, PipeStream pipeStream, ISession session)
128-
{
129-
PROCESS:
130-
if (token.EnterProcess())
131-
{
132-
while (true)
133-
{
134-
if (token.Requests.TryDequeue(out RequestData item))
135-
{
136-
using (item)
137-
{
138-
await OnProcess(item, pipeStream, token, session);
139-
}
140-
}
141-
else
142-
{
143-
break;
144-
}
145-
}
146-
session.Stream.Flush();
147-
token.CompletedProcess();
148-
if (!token.Requests.IsEmpty)
149-
{
150-
goto PROCESS;
151-
}
152-
}
153150
}
154151

155-
private Task OnProcess(RequestData requestData, PipeStream pipeStream, HttpToken token, ISession sessino)
152+
private void AnalysisAction(RequestData requestData)
156153
{
157154
var line = _line.AsSpan();
158155
int len = requestData.Data.Count;
@@ -187,77 +184,112 @@ private Task OnProcess(RequestData requestData, PipeStream pipeStream, HttpToken
187184
}
188185
}
189186
}
190-
return OnStartLine(http, method, url, sessino, token, pipeStream);
191-
192-
}
193-
194-
195-
public virtual Task OnStartLine(ReadOnlySpan<byte> http, ReadOnlySpan<byte> method, ReadOnlySpan<byte> url, ISession session, HttpToken token, PipeStream stream)
196-
{
197-
198187
int queryIndex = AnalysisUrl(url);
199188
ReadOnlySpan<byte> baseUrl = default;
200189
ReadOnlySpan<byte> queryString = default;
201190
if (queryIndex > 0)
202191
{
203192
baseUrl = url.Slice(0, queryIndex);
204193
queryString = url.Slice(queryIndex + 1, url.Length - queryIndex - 1);
194+
requestData.QueryString = Encoding.ASCII.GetString(queryString);
205195
}
206196
else
207197
{
208198
baseUrl = url;
209199
}
210-
OnWriteHeader(stream, token);
211200
if (baseUrl.Length == _path_Plaintext.Length && baseUrl.StartsWith(_path_Plaintext))
201+
{
202+
requestData.Action = ActionType.Plaintext;
203+
}
204+
else if (baseUrl.Length == _path_Json.Length && baseUrl.StartsWith(_path_Json))
205+
{
206+
requestData.Action = ActionType.Json;
207+
}
208+
else if (baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
209+
{
210+
requestData.Action = ActionType.Db;
211+
}
212+
else if (baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
213+
{
214+
requestData.Action = ActionType.Queries;
215+
}
216+
217+
else if (baseUrl.Length == _cached_worlds.Length && baseUrl.StartsWith(_cached_worlds))
218+
{
219+
requestData.Action = ActionType.Caching;
220+
}
221+
222+
else if (baseUrl.Length == _path_Updates.Length && baseUrl.StartsWith(_path_Updates))
223+
{
224+
requestData.Action = ActionType.Updates;
225+
}
226+
else if (baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
227+
{
228+
requestData.Action = ActionType.Fortunes;
229+
}
230+
else
231+
{
232+
requestData.Action = ActionType.Other;
233+
}
234+
}
235+
236+
public virtual async Task OnStartRequest(RequestData data, ISession session, HttpToken token, PipeStream stream)
237+
{
238+
OnWriteHeader(stream, token);
239+
ActionType type = data.Action;
240+
if (type == ActionType.Plaintext)
212241
{
213242
stream.Write(_headerContentTypeText.Data, 0, _headerContentTypeText.Length);
214243
OnWriteContentLength(stream, token);
215-
return Plaintext(url, stream, token, session);
244+
await Plaintext(stream, token, session);
216245
}
217-
else if (baseUrl.Length == _path_Json.Length && baseUrl.StartsWith(_path_Json))
246+
else if (type == ActionType.Json)
218247
{
219248
stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
220249
OnWriteContentLength(stream, token);
221-
return Json(stream, token, session);
250+
await Json(stream, token, session);
222251
}
223-
else if (baseUrl.Length == _path_Db.Length && baseUrl.StartsWith(_path_Db))
252+
else if (type == ActionType.Db)
224253
{
225254
stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
226255
OnWriteContentLength(stream, token);
227-
return db(stream, token, session);
256+
await db(stream, token, session);
228257
}
229-
else if (baseUrl.Length == _path_Queries.Length && baseUrl.StartsWith(_path_Queries))
258+
else if (type == ActionType.Queries)
230259
{
231260
stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
232261
OnWriteContentLength(stream, token);
233-
return queries(Encoding.ASCII.GetString(queryString), stream, token, session);
262+
await queries(data.QueryString, stream, token, session);
234263
}
235264

236-
else if (baseUrl.Length == _cached_worlds.Length && baseUrl.StartsWith(_cached_worlds))
265+
else if (type == ActionType.Caching)
237266
{
238267
stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
239268
OnWriteContentLength(stream, token);
240-
return caching(Encoding.ASCII.GetString(queryString), stream, token, session);
269+
await caching(data.QueryString, stream, token, session);
241270
}
242271

243-
else if (baseUrl.Length == _path_Updates.Length && baseUrl.StartsWith(_path_Updates))
272+
else if (type == ActionType.Updates)
244273
{
245274
stream.Write(_headerContentTypeJson.Data, 0, _headerContentTypeJson.Length);
246275
OnWriteContentLength(stream, token);
247-
return updates(Encoding.ASCII.GetString(queryString), stream, token, session);
276+
await updates(data.QueryString, stream, token, session);
248277
}
249-
else if (baseUrl.Length == _path_Fortunes.Length && baseUrl.StartsWith(_path_Fortunes))
278+
else if (type == ActionType.Fortunes)
250279
{
251280
stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
252281
OnWriteContentLength(stream, token);
253-
return fortunes(stream, token, session);
282+
await fortunes(stream, token, session);
254283
}
255284
else
256285
{
257286
stream.Write(_headerContentTypeHtml.Data, 0, _headerContentTypeHtml.Length);
258287
OnWriteContentLength(stream, token);
259-
return Default(url, stream, token, session);
288+
await Default(stream, token, session);
260289
}
290+
if (!Program.Debug)
291+
data.Dispose();
292+
261293
}
262294

263295
private void OnWriteHeader(PipeStream stream, HttpToken token)
@@ -278,9 +310,10 @@ private void OnWriteContentLength(PipeStream stream, HttpToken token)
278310

279311
private void OnCompleted(PipeStream stream, ISession session, HttpToken token)
280312
{
281-
stream.ReadFree((int)stream.Length);
282-
token.FullLength((stream.CacheLength - token.ContentPostion).ToString());
283313

314+
token.FullLength((stream.CacheLength - token.ContentPostion).ToString());
315+
if (token.Requests.IsEmpty)
316+
session.Stream.Flush();
284317
}
285318

286319
}

frameworks/CSharp/beetlex/PlatformBenchmarks/HttpServer.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,6 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
2323
serverOptions.BufferSize = 1024 * 8;
2424
serverOptions.BufferPoolMaxMemory = 1000;
2525
serverOptions.BufferPoolSize = 1024 * 10;
26-
if (Program.Debug)
27-
{
28-
serverOptions.IOQueueEnabled = true;
29-
serverOptions.IOQueues = System.Math.Min(Environment.ProcessorCount, 16);
30-
serverOptions.SyncAccept = false;
31-
}
3226
ApiServer = SocketFactory.CreateTcpServer<HttpHandler>(serverOptions);
3327
ApiServer.Open();
3428
if (!Program.UpDB)
@@ -38,8 +32,9 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
3832
}
3933
else
4034
{
41-
// RawDb._connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
35+
4236
RawDb._connectionString = "Server=tfb-database;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3;Multiplexing=true;Write Coalescing Delay Us=500;Write Coalescing Buffer Threshold Bytes=1000";
37+
//RawDb._connectionString = "Server=192.168.2.19;Database=hello_world;User Id=benchmarkdbuser;Password=benchmarkdbpass;Maximum Pool Size=64;NoResetOnClose=true;Enlist=false;Max Auto Prepare=3";
4338
}
4439
ApiServer.Log(LogType.Info, null, $"Debug mode [{Program.Debug}]");
4540
return Task.CompletedTask;

frameworks/CSharp/beetlex/PlatformBenchmarks/HttpToken.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using BeetleX.Buffers;
1+
using BeetleX;
2+
using BeetleX.Buffers;
3+
using BeetleX.Dispatchs;
24
using System;
35
using System.Collections.Concurrent;
46
using System.Text;
@@ -16,8 +18,12 @@ public HttpToken()
1618

1719
}
1820

21+
public SingleThreadDispatcher<HttpToken> ThreadDispatcher { get; set; }
22+
1923
public ConcurrentQueue<RequestData> Requests { get; set; } = new ConcurrentQueue<RequestData>();
2024

25+
public ISession Session { get; set; }
26+
2127
public RequestData CurrentRequest { get; set; }
2228

2329
public byte[] GetLengthBuffer(string length)

frameworks/CSharp/beetlex/PlatformBenchmarks/RequestData.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,26 @@ public ReadOnlySpan<byte> GetSpan()
1717

1818
public void Dispose()
1919
{
20+
2021
System.Buffers.ArrayPool<byte>.Shared.Return(Data.Array);
2122
}
23+
24+
public string QueryString { get; set; }
25+
26+
public ActionType Action { get; set; }
27+
28+
29+
}
30+
31+
public enum ActionType
32+
{
33+
Plaintext,
34+
Json,
35+
Db,
36+
Queries,
37+
Caching,
38+
Updates,
39+
Fortunes,
40+
Other
2241
}
2342
}

frameworks/CSharp/beetlex/PlatformBenchmarks/plaintext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace PlatformBenchmarks
99
{
1010
public partial class HttpHandler
1111
{
12-
public Task Plaintext(ReadOnlySpan<byte> url, PipeStream stream, HttpToken token, ISession session)
12+
public Task Plaintext(PipeStream stream, HttpToken token, ISession session)
1313
{
1414
stream.Write(_result_plaintext.Data, 0, _result_plaintext.Length);
1515
OnCompleted(stream, session, token);

0 commit comments

Comments
 (0)