Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
604 changes: 215 additions & 389 deletions client/dtmgrpc/dtmgpb/dtmgimp.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions client/dtmgrpc/dtmgpb/dtmgimp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

option go_package = "./dtmgpb";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package dtmgimp;

Expand Down Expand Up @@ -39,6 +40,7 @@ message DtmRequest {
string Steps = 7;
map<string, string> ReqExtra = 8;
string RollbackReason = 9;
google.protobuf.Timestamp NextCronTime = 10;
}

message DtmGidReply {
Expand Down
52 changes: 32 additions & 20 deletions client/dtmgrpc/dtmgpb/dtmgimp_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions client/dtmgrpc/dtmgpb/generat.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# 步骤 1:先获取并配置 GOPATH(临时生效)
# 获取 GOPATH 并赋值给变量(自动适配你的路径)
GOPATH=$(go env GOPATH)
# 将 GOPATH/bin 加入 PATH(解决插件找不到的问题)
export PATH=$PATH:$GOPATH/bin

# 步骤 2:验证插件是否存在(可选,但建议执行)
# 检查 protoc-gen-go 是否存在
if [ -f "$GOPATH/bin/protoc-gen-go" ]; then
echo "protoc-gen-go 插件存在"
else
echo "protoc-gen-go 插件不存在,正在安装..."
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
fi

# 检查 protoc-gen-go-grpc 是否存在
if [ -f "$GOPATH/bin/protoc-gen-go-grpc" ]; then
echo "protoc-gen-go-grpc 插件存在"
else
echo "protoc-gen-go-grpc 插件不存在,正在安装兼容版本..."
# 使用与项目gRPC v1.56.3兼容的版本
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0
fi


# 步骤 3:生成 Go 代码(核心命令)
# 生成基础 pb.go 文件
protoc --go_out=. --go_opt=paths=source_relative dtmgimp.proto
# 生成 grpc 相关 pb.go 文件
protoc --go-grpc_out=. --go-grpc_opt=paths=source_relative dtmgimp.proto
7 changes: 7 additions & 0 deletions dtmsvr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func svcSubmit(t *TransGlobal) interface{} {
} else if err != nil {
return err
}

// In msg mode, allow delaying consumption by using a custom NextCronTime
// WaitResult=true and NextCronTime are mutually exclusive, WaitResult takes priority
if !t.WaitResult && t.TransType == "msg" && !t.NextCronTime.IsZero() && t.NextCronTime.After(time.Now().Add(3*time.Second)) {
return nil
}

return t.Process(branches)
}

Expand Down
2 changes: 2 additions & 0 deletions dtmsvr/trans_class.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
if c.TransOptions != nil {
o = c.TransOptions
}
nextCronTime := c.NextCronTime.AsTime()
r := TransGlobal{TransGlobalStore: storage.TransGlobalStore{
Gid: c.Gid,
TransType: c.TransType,
Expand All @@ -93,6 +94,7 @@ func TransFromDtmRequest(ctx context.Context, c *dtmgpb.DtmRequest) *TransGlobal
BinPayloads: c.BinPayloads,
CustomData: c.CustomedData,
RollbackReason: c.RollbackReason,
NextCronTime: &nextCronTime,
TransOptions: dtmcli.TransOptions{
WaitResult: o.WaitResult,
TimeoutToFail: o.TimeoutToFail,
Expand Down
4 changes: 3 additions & 1 deletion dtmsvr/trans_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (t *TransGlobal) processInner(ctx context.Context, branches []TransBranch)

func (t *TransGlobal) saveNew() ([]TransBranch, error) {
t.NextCronInterval = t.getNextCronInterval(cronReset)
t.NextCronTime = dtmutil.GetNextTime(t.NextCronInterval)
if t.NextCronTime == nil || t.NextCronTime.IsZero() {
t.NextCronTime = dtmutil.GetNextTime(t.NextCronInterval)
}
t.ExtData = dtmimp.MustMarshalString(t.Ext)
if t.ExtData == "{}" {
t.ExtData = ""
Expand Down
Loading