Skip to content

Commit 13ee8c0

Browse files
authored
Update IDataStorage.MongoDB.cs (#1722)
* Update IDataStorage.MongoDB.cs * Update IDataStorage.MongoDB.cs
1 parent 8b38a1f commit 13ee8c0

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using DotNetCore.CAP.Monitoring;
1212
using DotNetCore.CAP.Persistence;
1313
using DotNetCore.CAP.Serialization;
14+
using Microsoft.Extensions.Logging;
1415
using Microsoft.Extensions.Options;
1516
using MongoDB.Bson;
1617
using MongoDB.Driver;
@@ -25,43 +26,52 @@ public class MongoDBDataStorage : IDataStorage
2526
private readonly IOptions<MongoDBOptions> _options;
2627
private readonly ISerializer _serializer;
2728
private readonly ISnowflakeId _snowflakeId;
29+
private readonly ILogger _logger;
2830

2931
public MongoDBDataStorage(
3032
IOptions<CapOptions> capOptions,
3133
IOptions<MongoDBOptions> options,
3234
IMongoClient client,
3335
ISerializer serializer,
34-
ISnowflakeId snowflakeId)
36+
ISnowflakeId snowflakeId,
37+
ILogger<MongoDBDataStorage> logger)
3538
{
3639
_capOptions = capOptions;
3740
_options = options;
3841
_client = client;
3942
_database = _client.GetDatabase(_options.Value.DatabaseName);
4043
_serializer = serializer;
4144
_snowflakeId = snowflakeId;
45+
_logger = logger;
4246
}
4347

4448
public async Task<bool> AcquireLockAsync(string key, TimeSpan ttl, string instance,
4549
CancellationToken token = default)
4650
{
51+
4752
var collection = _database.GetCollection<Lock>(_options.Value.LockCollection);
4853
using var session = await _client.StartSessionAsync(cancellationToken: token).ConfigureAwait(false);
4954
var transactionOptions =
5055
new TransactionOptions(ReadConcern.Majority, ReadPreference.Primary, WriteConcern.WMajority);
51-
session.StartTransaction(transactionOptions);
56+
5257
try
5358
{
54-
var opResult = await collection.UpdateOneAsync(session,
55-
model => model.Key == key && model.LastLockTime < DateTime.Now.Subtract(ttl),
56-
Builders<Lock>.Update.Set(model => model.Instance, instance)
57-
.Set(model => model.LastLockTime, DateTime.Now), null, token);
58-
var isAcquired = opResult.IsModifiedCountAvailable && opResult.ModifiedCount > 0;
59-
await session.CommitTransactionAsync(token).ConfigureAwait(false);
60-
return isAcquired;
59+
var result = await session.WithTransactionAsync(async (handle, cancellationToken) =>
60+
{
61+
var opResult = await collection.UpdateOneAsync(handle,
62+
model => model.Key == key && model.LastLockTime < DateTime.Now.Subtract(ttl),
63+
Builders<Lock>.Update.Set(model => model.Instance, instance)
64+
.Set(model => model.LastLockTime, DateTime.Now), null, cancellationToken);
65+
var isAcquired = opResult.IsModifiedCountAvailable && opResult.ModifiedCount > 0;
66+
return isAcquired;
67+
}, transactionOptions, token);
68+
69+
return result;
6170
}
62-
catch (Exception)
71+
catch (Exception ex)
6372
{
64-
await session.AbortTransactionAsync(token).ConfigureAwait(false);
73+
_logger.LogWarning(
74+
ex, "Failed to acquire lock for key '{Key}' with instance '{Instance}'.", key, instance);
6575
return false;
6676
}
6777
}

0 commit comments

Comments
 (0)