Skip to content

Commit c2df287

Browse files
committed
Because of deadlock and performance issues, mysql does not use transactions when consuming messages, and requeue the message to queue table when they exception. (#36,#68)
1 parent 664c3b8 commit c2df287

File tree

4 files changed

+27
-92
lines changed

4 files changed

+27
-92
lines changed
Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,19 @@
1-
using System;
2-
using System.Data;
3-
using System.Threading;
4-
using Dapper;
1+
using Dapper;
52
using DotNetCore.CAP.Models;
3+
using MySql.Data.MySqlClient;
64

75
namespace DotNetCore.CAP.MySql
86
{
97
public class MySqlFetchedMessage : IFetchedMessage
108
{
11-
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
12-
private readonly IDbConnection _connection;
13-
private readonly object _lockObject = new object();
14-
private readonly Timer _timer;
15-
private readonly IDbTransaction _transaction;
9+
private readonly string _connectionString = null;
1610

17-
public MySqlFetchedMessage(int messageId,
18-
MessageType type,
19-
IDbConnection connection,
20-
IDbTransaction transaction)
11+
public MySqlFetchedMessage(int messageId, MessageType type, string connectionString)
2112
{
2213
MessageId = messageId;
2314
MessageType = type;
24-
_connection = connection;
25-
_transaction = transaction;
26-
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
15+
16+
_connectionString = connectionString;
2717
}
2818

2919
public int MessageId { get; }
@@ -32,43 +22,21 @@ public MySqlFetchedMessage(int messageId,
3222

3323
public void RemoveFromQueue()
3424
{
35-
lock (_lockObject)
36-
{
37-
_transaction.Commit();
38-
}
25+
// ignored
3926
}
4027

4128
public void Requeue()
4229
{
43-
lock (_lockObject)
30+
using (var connection = new MySqlConnection(_connectionString))
4431
{
45-
_transaction.Rollback();
32+
connection.Execute("insert into `cap.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);"
33+
, new {MessageId, MessageType });
4634
}
4735
}
4836

4937
public void Dispose()
5038
{
51-
lock (_lockObject)
52-
{
53-
_timer?.Dispose();
54-
_transaction.Dispose();
55-
_connection.Dispose();
56-
}
57-
}
58-
59-
private void ExecuteKeepAliveQuery(object obj)
60-
{
61-
lock (_lockObject)
62-
{
63-
try
64-
{
65-
_connection?.Execute("SELECT 1", _transaction);
66-
}
67-
catch
68-
{
69-
// ignored
70-
}
71-
}
39+
// ignored
7240
}
7341
}
7442
}

src/DotNetCore.CAP.MySql/MySqlStorage.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ protected virtual string CreateDbTablesScript(string prefix)
5353
$@"
5454
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
5555
`MessageId` int(11) NOT NULL,
56-
`MessageType` tinyint(4) NOT NULL
56+
`MessageType` tinyint(4) NOT NULL,
57+
`ProcessId` varchar(50) DEFAULT NULL
5758
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5859
5960
CREATE TABLE IF NOT EXISTS `{prefix}.received` (

src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs

Lines changed: 12 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Data;
43
using System.Threading.Tasks;
54
using Dapper;
65
using DotNetCore.CAP.Infrastructure;
@@ -43,15 +42,11 @@ public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
4342
public Task<IFetchedMessage> FetchNextMessageAsync()
4443
{
4544
var sql = $@"
46-
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
47-
DELETE FROM `{_prefix}.queue` LIMIT 1;";
45+
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
46+
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;
47+
DELETE FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId";
4848

49-
// The following `sql` can improve performance, but repeated consumption occurs in multiple instances
50-
//var sql = $@"
51-
//SELECT @MId:=`MessageId` as MessageId, @MType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
52-
//DELETE FROM `{_prefix}.queue` where `MessageId` = @MId AND `MessageType`=@MType;";
53-
54-
return FetchNextMessageCoreAsync(sql);
49+
return FetchNextMessageCoreAsync(sql, new { ProcessId = Guid.NewGuid().ToString() });
5550
}
5651

5752
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
@@ -122,11 +117,6 @@ public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceivedMessages()
122117
}
123118
}
124119

125-
126-
public void Dispose()
127-
{
128-
}
129-
130120
public bool ChangePublishedState(int messageId, string state)
131121
{
132122
var sql =
@@ -151,44 +141,20 @@ public bool ChangeReceivedState(int messageId, string state)
151141

152142
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
153143
{
154-
//here don't use `using` to dispose
155-
var connection = new MySqlConnection(Options.ConnectionString);
156-
await connection.OpenAsync();
157-
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
158-
FetchedMessage fetchedMessage = null;
159-
try
160-
{
161-
//fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args, transaction);
162-
// An anomaly with unknown causes, sometimes QuerySingleOrDefaultAsync can't return expected result.
163-
using (var reader = connection.ExecuteReader(sql, args, transaction))
164-
{
165-
while (reader.Read())
166-
{
167-
fetchedMessage = new FetchedMessage
168-
{
169-
MessageId = (int)reader.GetInt64(0),
170-
MessageType = (MessageType)reader.GetInt64(1)
171-
};
172-
}
173-
}
174-
}
175-
catch (MySqlException)
144+
FetchedMessage fetchedMessage;
145+
using (var connection = new MySqlConnection(Options.ConnectionString))
176146
{
177-
transaction.Dispose();
178-
connection.Dispose();
179-
throw;
147+
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args);
180148
}
181149

182150
if (fetchedMessage == null)
183-
{
184-
transaction.Rollback();
185-
transaction.Dispose();
186-
connection.Dispose();
187151
return null;
188-
}
189152

190-
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
191-
transaction);
153+
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, Options.ConnectionString);
154+
}
155+
156+
public void Dispose()
157+
{
192158
}
193159
}
194160
}

src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void EnqueueMessage(CapPublishedMessage message)
4646
{
4747
if (message == null) throw new ArgumentNullException(nameof(message));
4848

49-
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
49+
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
5050
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
5151
_dbTransaction);
5252
}
@@ -55,7 +55,7 @@ public void EnqueueMessage(CapReceivedMessage message)
5555
{
5656
if (message == null) throw new ArgumentNullException(nameof(message));
5757

58-
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
58+
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
5959
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
6060
_dbTransaction);
6161
}

0 commit comments

Comments
 (0)