Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/DtmCommon/Imp/TransBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;

namespace DtmCommon
Expand Down Expand Up @@ -76,6 +77,8 @@ public class TransBase
[JsonIgnore]
public string Dtm { get; set; }

public DateTime NextCronTime { get; set; }

public static TransBase NewTransBase(string gid, string transType, string dtm, string branchID)
{
return new TransBase
Expand Down
12 changes: 10 additions & 2 deletions src/Dtmcli/DtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Dtmcli
using System;

namespace Dtmcli
{
public class DtmTransFactory : IDtmTransFactory
{
Expand All @@ -16,7 +18,13 @@ public Msg NewMsg(string gid)
var msg = new Msg(_cient, _branchBarrierFactory, gid);
return msg;
}


public Msg NewMsg(string gid, DateTime nextCronTime)
{
var msg = new Msg(_cient, _branchBarrierFactory, gid, nextCronTime);
return msg;
}

public Saga NewSaga(string gid)
{
var saga = new Saga(_cient, gid);
Expand Down
6 changes: 5 additions & 1 deletion src/Dtmcli/IDtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
namespace Dtmcli
using System;

namespace Dtmcli
{
public interface IDtmTransFactory
{
Saga NewSaga(string gid);

Msg NewMsg(string gid);

Msg NewMsg(string gid, DateTime nextCronTime);
}
}
13 changes: 11 additions & 2 deletions src/Dtmcli/Msg/Msg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@ public class Msg
private readonly IDtmClient _dtmClient;
private readonly IBranchBarrierFactory _branchBarrierFactory;

public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid)
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid):
this(dtmHttpClient, branchBarrierFactory, gid, default)
{
}

public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid, DateTime nextCronTime)
{
this._dtmClient = dtmHttpClient;
this._branchBarrierFactory = branchBarrierFactory;
this._transBase = TransBase.NewTransBase(gid, DtmCommon.Constant.TYPE_MSG, string.Empty, string.Empty);
if (nextCronTime != default(DateTime))
{
this._transBase.NextCronTime = nextCronTime;
}
}

public Msg Add(string action, object postData)
{
if (this._transBase.Steps == null) this._transBase.Steps = new List<Dictionary<string, string>>();
Expand Down
2 changes: 2 additions & 0 deletions src/Dtmgrpc/DtmGImp/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public static dtmgpb.DtmRequest BuildDtmRequest(TransBase transBase)
Steps = transBase.Steps == null ? string.Empty : Utils.ToJsonString(transBase.Steps),
RollbackReason = transBase.RollbackReason ?? string.Empty,
};
if (transBase.NextCronTime != default)
dtmRequest.NextCronTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(transBase.NextCronTime.ToUniversalTime());

foreach (var item in transBase.BinPayloads ?? new List<byte[]>())
{
Expand Down
18 changes: 15 additions & 3 deletions src/Dtmgrpc/DtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using DtmCommon;
using System;
using DtmCommon;
using Dtmgrpc.DtmGImp;
using Microsoft.Extensions.Options;

Expand All @@ -19,10 +20,21 @@ public DtmTransFactory(IOptions<DtmOptions> optionsAccs, IDtmgRPCClient rpcClien

public MsgGrpc NewMsgGrpc(string gid)
{
var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid);
return this.NewMsgGrpc(gid, default);
}

/// <summary>
///
/// </summary>
/// <param name="gid"></param>
/// <param name="nextCronTime">The desired execution time, which can be used to delay downstream consumption</param>
/// <returns></returns>
public MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime)
{
var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid, nextCronTime);
return msg;
}

public SagaGrpc NewSagaGrpc(string gid)
{
var saga = new SagaGrpc(_rpcClient, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid);
Expand Down
12 changes: 11 additions & 1 deletion src/Dtmgrpc/IDtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
namespace Dtmgrpc
using System;

namespace Dtmgrpc
{
public interface IDtmTransFactory
{
SagaGrpc NewSagaGrpc(string gid);

MsgGrpc NewMsgGrpc(string gid);

/// <summary>
///
/// </summary>
/// <param name="gid"></param>
/// <param name="nextCronTime">The desired execution time, which can be used to delay downstream consumption</param>
/// <returns></returns>
MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime);

TccGrpc NewTccGrpc(string gid);
}
Expand Down
9 changes: 9 additions & 0 deletions src/Dtmgrpc/Msg/MsgGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ public class MsgGrpc
private readonly IBranchBarrierFactory _branchBarrierFactory;

public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid)
: this(dtmHttpClient, branchBarrierFactory, server, gid, default)
{
}

public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid, DateTime nextCronTime)
{
this._dtmClient = dtmHttpClient;
this._branchBarrierFactory = branchBarrierFactory;
this._transBase = TransBase.NewTransBase(gid, Constant.TYPE_MSG, server, string.Empty);
if (nextCronTime != default(DateTime))
{
this._transBase.NextCronTime = nextCronTime;
}
}

public MsgGrpc Add(string action, IMessage payload)
Expand Down
2 changes: 2 additions & 0 deletions src/Dtmgrpc/dtmgpb/dtmgimp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
option csharp_namespace = "dtmgpb";
option go_package = "./dtmgpb";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package dtmgimp;

Expand Down Expand Up @@ -40,6 +41,7 @@ message DtmRequest {
string Steps = 7;
map<string, string> ReqExtra = 8;
string RollbackReason = 9;
google.protobuf.Timestamp NextCronTime = 10;
}

message DtmGidReply {
Expand Down
31 changes: 31 additions & 0 deletions tests/Dtmcli.IntegrationTests/MsgHttpTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,35 @@ public async Task Submit_With_EffectTime_Should_Succeed_Later()
status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("succeed", status);
}

[Fact]
public async Task Submit_With_NextCronTime_Should_Succeed_Later()
{
var provider = ITTestHelper.AddDtmHttp();
var transFactory = provider.GetRequiredService<Dtmcli.IDtmTransFactory>();

var gid = "msgTestGid" + Guid.NewGuid().ToString();
DateTime effectTime = DateTime.Now.AddSeconds(10);
var msg = transFactory.NewMsg(gid, effectTime);
var req = ITTestHelper.GenBusiReq(false, false);
var busiUrl = ITTestHelper.BuisHttpUrl;
msg.Add(busiUrl + "/busi.Busi/TransOut", req)
.Add(busiUrl + "/busi.Busi/TransIn", req);

await msg.Prepare(busiUrl + "/busi.Busi/QueryPrepared_404");
await msg.Submit();

// Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds
await Task.Delay(TimeSpan.FromSeconds(0));
var status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("submitted", status);

await Task.Delay(TimeSpan.FromSeconds(2));
status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("submitted", status);

await Task.Delay(TimeSpan.FromSeconds(13));
status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("succeed", status);
}
}
26 changes: 26 additions & 0 deletions tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,36 @@
Assert.Equal("succeed", status);
}

[Fact]
public async Task Submit_With_NextCronTime_Should_Succeed_Later()
{
var provider = ITTestHelper.AddDtmGrpc();
var transFactory = provider.GetRequiredService<IDtmTransFactory>();

var gid = "msgTestGid" + Guid.NewGuid().ToString();
DateTime effectTime = DateTime.Now.AddSeconds(10);
var msg = transFactory.NewMsgGrpc(gid, effectTime);
var req = ITTestHelper.GenBusiReq(false, false);
var busiGrpc = ITTestHelper.BuisgRPCUrl;
msg.Add(busiGrpc + "/busi.Busi/TransOut", req)
.Add(busiGrpc + "/busi.Busi/TransIn", req);

await msg.Prepare(busiGrpc + "/busi.Busi/QueryPrepared");
await msg.Submit();

// Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds
await Task.Delay(TimeSpan.FromSeconds(2));
var status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("submitted", status);

await Task.Delay(TimeSpan.FromSeconds(13));
status = await ITTestHelper.GetTranStatus(gid);
Assert.Equal("succeed", status);
}

private static readonly int TransOutUID = 1;

private static readonly int TransInUID = 2;

Check warning on line 124 in tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs

View workflow job for this annotation

GitHub Actions / build on ubuntu-latest

The field 'MsgGrpcTest.TransInUID' is assigned but its value is never used

Check warning on line 124 in tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs

View workflow job for this annotation

GitHub Actions / build on ubuntu-latest

The field 'MsgGrpcTest.TransInUID' is assigned but its value is never used

Check warning on line 124 in tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs

View workflow job for this annotation

GitHub Actions / build on windows-latest

The field 'MsgGrpcTest.TransInUID' is assigned but its value is never used

Check warning on line 124 in tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs

View workflow job for this annotation

GitHub Actions / build on windows-latest

The field 'MsgGrpcTest.TransInUID' is assigned but its value is never used

private MySqlConnection getBarrierMySqlConnection() => new("Server=localhost;port=3306;User ID=root;Password=;Database=dtm_barrier");

Expand Down
Loading