Skip to content

Commit d4b3ac8

Browse files
authored
Use asynchronous data access methods in appMpower.Orm (#9281)
* Use asynchronous data access methods in appMpower.Orm * Improved connection pool handling * Using tuple with base connection elements instead of "this" for pooling * Improved caching * Better separation of keyed and unkeyed command caching --------- Co-authored-by: LLT21 <>
1 parent c966c96 commit d4b3ac8

File tree

7 files changed

+192
-119
lines changed

7 files changed

+192
-119
lines changed

frameworks/CSharp/appmpower/src/appMpower.Orm/Data/DbCommand.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ public DbCommand(string commandText, DbConnection dbConnection)
2020
_dbConnection = dbConnection;
2121
}
2222

23-
public DbCommand(string commandText, DbConnection dbConnection, bool keyed)
24-
{
25-
_odbcCommand = dbConnection.GetCommand(commandText, CommandType.Text, keyed);
26-
_dbConnection = dbConnection;
27-
}
28-
2923
public DbCommand(string commandText, CommandType commandType, DbConnection dbConnection)
3024
{
3125
_odbcCommand = dbConnection.GetCommand(commandText, commandType);
@@ -175,14 +169,19 @@ public IDataReader ExecuteReader()
175169

176170
public async Task<int> ExecuteNonQueryAsync()
177171
{
178-
return await (_odbcCommand as System.Data.Common.DbCommand).ExecuteNonQueryAsync();
172+
return await _odbcCommand.ExecuteNonQueryAsync();
179173
}
180174

181175
public IDataReader ExecuteReader(CommandBehavior behavior)
182176
{
183177
return _odbcCommand.ExecuteReader(behavior);
184178
}
185179

180+
public async Task<System.Data.Common.DbDataReader> ExecuteReaderAsync(CommandBehavior behavior)
181+
{
182+
return await _odbcCommand.ExecuteReaderAsync(behavior);
183+
}
184+
186185
#nullable enable
187186
public object? ExecuteScalar()
188187
{
@@ -197,8 +196,7 @@ public void Prepare()
197196

198197
public void Dispose()
199198
{
200-
if (_dbConnection._keyed) _dbConnection._keyedOdbcCommands.TryAdd(_odbcCommand.CommandText, _odbcCommand);
201-
else _dbConnection._odbcCommands.Push(_odbcCommand);
199+
_dbConnection.Release(_odbcCommand);
202200
}
203201
}
204202
}

frameworks/CSharp/appmpower/src/appMpower.Orm/Data/DbConnection.cs

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
using System.Collections.Concurrent;
22
using System.Data;
3-
using System.Data.Odbc;
3+
using System.Data.Odbc;
44

55
namespace appMpower.Orm.Data
66
{
77
public class DbConnection : IDbConnection
88
{
99
private string _connectionString;
10-
internal bool _keyed = false;
11-
internal int _number;
12-
internal OdbcConnection _odbcConnection;
13-
internal ConcurrentStack<OdbcCommand> _odbcCommands = new();
14-
internal Dictionary<string, OdbcCommand> _keyedOdbcCommands;
10+
private bool _keyed = false;
11+
private int _number;
12+
private OdbcConnection _odbcConnection;
13+
private ConcurrentStack<OdbcCommand> _odbcCommands = new();
14+
private Dictionary<string, OdbcCommand> _keyedOdbcCommands;
1515

1616
public DbConnection()
1717
{
18-
_connectionString = DbProviderFactory.ConnectionString;
1918
}
2019

21-
public DbConnection(string connectionString)
20+
public DbConnection(string connectionString, bool keyed = false)
2221
{
23-
_connectionString = connectionString;
22+
_keyed = keyed;
23+
_connectionString = connectionString;
24+
GetConnection();
2425
}
2526

2627
public IDbConnection Connection
@@ -43,7 +44,22 @@ public string ConnectionString
4344
}
4445
set
4546
{
46-
_odbcConnection.ConnectionString = value;
47+
_connectionString = value;
48+
GetConnection();
49+
}
50+
}
51+
52+
private void GetConnection()
53+
{
54+
if (_keyed)
55+
{
56+
(_number, _odbcConnection, _keyedOdbcCommands) =
57+
DbConnectionsKeyed.GetConnectionBase(_connectionString).GetAwaiter().GetResult();
58+
}
59+
else
60+
{
61+
(_number, _odbcConnection, _odbcCommands) =
62+
DbConnections.GetConnectionBase(_connectionString).GetAwaiter().GetResult();
4763
}
4864
}
4965

@@ -99,23 +115,33 @@ public IDbCommand CreateCommand()
99115

100116
public void Open()
101117
{
102-
if (_odbcConnection is null)
118+
if (_odbcConnection.State == ConnectionState.Closed)
103119
{
104-
DbConnections.GetConnection(_connectionString, this);
120+
_odbcConnection.Open();
105121
}
122+
}
106123

124+
public async Task OpenAsync()
125+
{
107126
if (_odbcConnection.State == ConnectionState.Closed)
108127
{
109-
_odbcConnection.Open();
128+
await _odbcConnection.OpenAsync();
110129
}
111130
}
112131

113132
public void Dispose()
114133
{
115-
DbConnections.Release(this);
134+
if (_keyed)
135+
{
136+
DbConnectionsKeyed.Release((Number: _number, OdbcConnection: _odbcConnection, KeyedOdbcCommands: _keyedOdbcCommands));
137+
}
138+
else
139+
{
140+
DbConnections.Release((Number: _number, OdbcConnection: _odbcConnection, OdbcCommands: _odbcCommands));
141+
}
116142
}
117143

118-
internal OdbcCommand GetCommand(string commandText, CommandType commandType, bool keyed = false)
144+
internal OdbcCommand GetCommand(string commandText, CommandType commandType)
119145
{
120146
OdbcCommand odbcCommand;
121147

@@ -129,25 +155,20 @@ internal OdbcCommand GetCommand(string commandText, CommandType commandType, boo
129155

130156
return odbcCommand;
131157
}
132-
else if (_keyed && _keyedOdbcCommands.TryGetValue(commandText, out odbcCommand))
133-
{
134-
return odbcCommand;
135-
}
136-
else
137-
{
138-
if (!_keyed && keyed)
139-
{
140-
_keyedOdbcCommands = new();
141-
_keyed = keyed;
142-
}
143-
144-
odbcCommand = _odbcConnection.CreateCommand();
145-
odbcCommand.CommandText = commandText;
146-
odbcCommand.CommandType = commandType;
147-
odbcCommand.Prepare();
158+
else if (_keyed && _keyedOdbcCommands.TryGetValue(commandText, out odbcCommand)) return odbcCommand;
148159

149-
return odbcCommand;
150-
}
160+
odbcCommand = _odbcConnection.CreateCommand();
161+
odbcCommand.CommandText = commandText;
162+
odbcCommand.CommandType = commandType;
163+
odbcCommand.Prepare();
164+
165+
return odbcCommand;
166+
}
167+
168+
internal void Release(OdbcCommand odbcCommand)
169+
{
170+
if (_keyed) _keyedOdbcCommands.TryAdd(odbcCommand.CommandText, odbcCommand);
171+
else _odbcCommands.Push(odbcCommand);
151172
}
152173
}
153174
}
Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,61 @@
11
using System.Collections.Concurrent;
2+
using System.Data.Odbc;
23

34
namespace appMpower.Orm.Data
45
{
5-
public static class DbConnections
6+
internal static class DbConnections
67
{
8+
private static bool _maxConnectionsCreated = false;
79
private static short _createdConnections = 0;
8-
private static ConcurrentStack<DbConnection> _connectionsStack = new();
10+
private static short _maxConnections = 500;
911

10-
public static DbConnection GetConnection(string connectionString)
12+
private static ConcurrentStack<(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands)> _connectionsStack = new();
13+
private static ConcurrentQueue<TaskCompletionSource<(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands)>> _waitingQueue = new();
14+
15+
internal static async Task<(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands)> GetConnectionBase(string connectionString)
1116
{
12-
DbConnection popDbConnection;
17+
(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands) dbConnectionBase;
1318

14-
if (!_connectionsStack.TryPop(out popDbConnection))
19+
if (!_connectionsStack.TryPop(out dbConnectionBase))
1520
{
16-
popDbConnection = new DbConnection();
17-
popDbConnection._odbcConnection = new System.Data.Odbc.OdbcConnection(connectionString);
21+
if (_maxConnectionsCreated)
22+
{
23+
dbConnectionBase = await GetDbConnectionBaseAsync();
24+
}
25+
else
26+
{
27+
_createdConnections++;
28+
dbConnectionBase = (Number: _maxConnections, OdbcConnection: new OdbcConnection(connectionString), OdbcCommands: new ConcurrentStack<OdbcCommand>());
1829

19-
_createdConnections++;
20-
popDbConnection._number = _createdConnections;
30+
if (_createdConnections == _maxConnections) _maxConnectionsCreated = true;
2131

22-
if (_createdConnections % 25 == 0)
23-
{
24-
Console.WriteLine("Pooled connections created: " + _createdConnections.ToString());
32+
//Console.WriteLine("opened connection number: " + dbConnectionBase._number);
2533
}
2634
}
2735

28-
return popDbConnection;
36+
return dbConnectionBase;
2937
}
3038

31-
32-
public static void GetConnection(string connectionString, DbConnection dbConnection)
39+
internal static void Release((int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands) dbConnectionBase)
3340
{
34-
DbConnection popDbConnection = null;
41+
TaskCompletionSource<(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands)> taskCompletionSource;
3542

36-
if (_connectionsStack.TryPop(out popDbConnection))
43+
if (_waitingQueue.TryDequeue(out taskCompletionSource))
3744
{
38-
dbConnection._odbcConnection = popDbConnection._odbcConnection;
39-
dbConnection._odbcCommands = popDbConnection._odbcCommands;
40-
dbConnection._number = popDbConnection._number;
45+
taskCompletionSource.SetResult(dbConnectionBase);
4146
}
4247
else
4348
{
44-
dbConnection._odbcConnection = new System.Data.Odbc.OdbcConnection(connectionString);
45-
46-
_createdConnections++;
47-
dbConnection._number = _createdConnections;
48-
49-
if (_createdConnections % 25 == 0)
50-
{
51-
Console.WriteLine("Pooled connections created: " + _createdConnections.ToString());
52-
}
53-
}
49+
_connectionsStack.Push(dbConnectionBase);
50+
}
5451
}
5552

56-
public static void Release(DbConnection dbConnection)
53+
private static Task<(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands)> GetDbConnectionBaseAsync()
5754
{
58-
_connectionsStack.Push(dbConnection);
55+
var taskCompletionSource = new TaskCompletionSource<(int Number, OdbcConnection OdbcConnection, ConcurrentStack<OdbcCommand> OdbcCommands)>(TaskCreationOptions.RunContinuationsAsynchronously);
56+
57+
_waitingQueue.Enqueue(taskCompletionSource);
58+
return taskCompletionSource.Task;
5959
}
6060
}
6161
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System.Collections.Concurrent;
2+
using System.Data.Odbc;
3+
4+
namespace appMpower.Orm.Data
5+
{
6+
internal static class DbConnectionsKeyed
7+
{
8+
private static bool _maxConnectionsCreated = false;
9+
private static short _createdConnections = 0;
10+
private static short _maxConnections = 500;
11+
12+
private static ConcurrentStack<(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand>)> _connectionsStack = new();
13+
private static ConcurrentQueue<TaskCompletionSource<(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand>)>> _waitingQueue = new();
14+
15+
internal static async Task<(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand> KeyedOdbcCommands)> GetConnectionBase(string connectionString)
16+
{
17+
(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand> KeyedOdbcCommands) dbConnectionBase;
18+
19+
if (!_connectionsStack.TryPop(out dbConnectionBase))
20+
{
21+
if (_maxConnectionsCreated)
22+
{
23+
dbConnectionBase = await GetDbConnectionBaseAsync();
24+
}
25+
else
26+
{
27+
_createdConnections++;
28+
dbConnectionBase = (Number: _maxConnections, OdbcConnection: new OdbcConnection(connectionString), KeyedOdbcCommands: new Dictionary<string, OdbcCommand>());
29+
30+
if (_createdConnections == _maxConnections) _maxConnectionsCreated = true;
31+
32+
//Console.WriteLine("opened connection number: " + dbConnectionBase._number);
33+
}
34+
}
35+
36+
return dbConnectionBase;
37+
}
38+
39+
internal static void Release((int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand> KeyedOdbcCommands) dbConnectionBase)
40+
{
41+
TaskCompletionSource<(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand>)> taskCompletionSource;
42+
43+
if (_waitingQueue.TryDequeue(out taskCompletionSource))
44+
{
45+
taskCompletionSource.SetResult(dbConnectionBase);
46+
}
47+
else
48+
{
49+
_connectionsStack.Push(dbConnectionBase);
50+
}
51+
}
52+
53+
private static Task<(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand>)> GetDbConnectionBaseAsync()
54+
{
55+
var taskCompletionSource = new TaskCompletionSource<(int Number, OdbcConnection OdbcConnection, Dictionary<string, OdbcCommand>)>(TaskCreationOptions.RunContinuationsAsynchronously);
56+
57+
_waitingQueue.Enqueue(taskCompletionSource);
58+
return taskCompletionSource.Task;
59+
}
60+
}
61+
}

frameworks/CSharp/appmpower/src/appMpower.Orm/DotnetMethods.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static class DotnetMethods
2121

2222
public static byte[] Db()
2323
{
24-
var world = RawDb.LoadSingleQueryRow();
24+
var world = RawDb.LoadSingleQueryRow().GetAwaiter().GetResult();
2525

2626
var memoryStream = new MemoryStream();
2727
using var utf8JsonWriter = new Utf8JsonWriter(memoryStream, _jsonWriterOptions);
@@ -33,7 +33,7 @@ public static byte[] Db()
3333

3434
public static byte[] Query(int queries)
3535
{
36-
World[] worlds = RawDb.ReadMultipleRows(queries);
36+
World[] worlds = RawDb.ReadMultipleRows(queries).GetAwaiter().GetResult();
3737

3838
var memoryStream = new MemoryStream();
3939
using var utf8JsonWriter = new Utf8JsonWriter(memoryStream, _jsonWriterOptions);
@@ -45,7 +45,7 @@ public static byte[] Query(int queries)
4545

4646
public static byte[] Updates(int count)
4747
{
48-
World[] worlds = RawDb.LoadMultipleUpdatesRows(count);
48+
World[] worlds = RawDb.LoadMultipleUpdatesRows(count).GetAwaiter().GetResult();
4949

5050
var memoryStream = new MemoryStream();
5151
using var utf8JsonWriter = new Utf8JsonWriter(memoryStream, _jsonWriterOptions);
@@ -57,7 +57,7 @@ public static byte[] Updates(int count)
5757

5858
public static byte[] Fortunes()
5959
{
60-
List<Fortune> fortunes = RawDb.LoadFortunesRows();
60+
List<Fortune> fortunes = RawDb.LoadFortunesRows().GetAwaiter().GetResult();
6161
string fortunesView = FortunesView.Render(fortunes);
6262
byte[] byteArray = Encoding.UTF8.GetBytes(fortunesView);
6363

0 commit comments

Comments
 (0)