Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions admin/src/api/api_dtm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ export function resetNextCronTime(gid: string): Promise<AxiosResponse> {
})
}

export function setNextCronTime(gid: string, time: Date): Promise<AxiosResponse> {
return request({
url: '/api/dtmsvr/setNextCronTime',
method: 'post',
data: { gid, next_cron_time: time}
})
}

export function getDtmVersion(): Promise<AxiosResponse<any>> {
return request({
url: '/api/dtmsvr/version',
Expand Down
1 change: 1 addition & 0 deletions admin/src/components.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ declare module 'vue' {
ABreadcrumb: typeof import('ant-design-vue/es')['Breadcrumb']
ABreadcrumbItem: typeof import('ant-design-vue/es')['BreadcrumbItem']
AButton: typeof import('ant-design-vue/es')['Button']
ADatePicker: typeof import('ant-design-vue/es')['DatePicker']
ADescriptions: typeof import('ant-design-vue/es')['Descriptions']
ADescriptionsItem: typeof import('ant-design-vue/es')['DescriptionsItem']
ADivider: typeof import('ant-design-vue/es')['Divider']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@
class="action-button"
@confirm="handleSetNextCronTimeToNow(<string>transaction?.gid)" >
<a-button type="default">Reset next cron time</a-button>
</a-popconfirm>
<a-date-picker
v-model:value="nextCronTimeInput"
format="YYYY-MM-DD HH:mm:ss"
placeholder="Set NextCronTime"
show-time
/>
<a-popconfirm
title="Set next cron time?"
ok-text="Yes, reset"
cancel-text="No"
class="action-button"
:disabled="!nextCronTimeInput"
@confirm="handleSetNextCronTime(<string>transaction?.gid)" >
<a-button type="default" :disabled="!nextCronTimeInput">Set next cron time</a-button>
</a-popconfirm>
<a-descriptions bordered size="small" :column="{ xxl: 4, xl: 3, lg: 3, md: 3, sm: 2, xs: 1 }">
<a-descriptions-item label="Status">
Expand Down Expand Up @@ -61,7 +76,7 @@ import { getTransaction } from '/@/api/api_dtm'
import screenfull from '/@/components/Screenfull/index.vue'
import { useRoute } from 'vue-router';
import { string } from 'vue-types';
import { forceStopTransaction, resetNextCronTime } from '/@/api/api_dtm'
import { forceStopTransaction, resetNextCronTime, setNextCronTime } from '/@/api/api_dtm'
// import VueJsonPretty from 'vue-json-pretty';
// import 'vue-json-pretty/lib/styles.css'
const route = useRoute();
Expand All @@ -72,6 +87,7 @@ const transaction = ref<Transaction>()
const visible = ref(false)
const textVal = ref('')
const closeable = ref(true)
const nextCronTimeInput = ref()


let _gid = <string>route.params.gid;
Expand Down Expand Up @@ -137,6 +153,11 @@ const handleSetNextCronTimeToNow = async(gid: string) => {
refresh();
}

const handleSetNextCronTime = async(gid: string) => {
await setNextCronTime(gid, nextCronTimeInput.value);
refresh();
}

type Data = {
branches: {
gid: string
Expand Down
6 changes: 6 additions & 0 deletions dtmsvr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func svcResetNextCronTime(t *TransGlobal) error {
return dbt.resetNextCronTime()
}

func svcSetNextCronTime(t *TransGlobal, dt *time.Time) error {
dbt := GetTransGlobal(t.Gid)
dbt.NextCronTime = dt
return dbt.setNextCronTime()
}

func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) error {
branches := []TransBranch{*branch, *branch}
if transType == "tcc" {
Expand Down
6 changes: 6 additions & 0 deletions dtmsvr/api_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/scanKV", dtmutil.WrapHandler2(scanKV))
engine.GET("/api/dtmsvr/queryKV", dtmutil.WrapHandler2(queryKV))
engine.POST("/api/dtmsvr/resetNextCronTime", dtmutil.WrapHandler2(resetNextCronTime)) // one global trans only
engine.POST("/api/dtmsvr/setNextCronTime", dtmutil.WrapHandler2(setNextCronTime)) // one global trans only

// add prometheus exporter
h := promhttp.Handler()
Expand Down Expand Up @@ -74,6 +75,11 @@ func resetNextCronTime(c *gin.Context) interface{} {
return svcResetNextCronTime(TransFromContext(c))
}

func setNextCronTime(c *gin.Context) interface{} {
trans := TransFromContext(c)
return svcSetNextCronTime(trans, trans.NextCronTime)
}

func registerBranch(c *gin.Context) interface{} {
data := map[string]string{}
err := c.BindJSON(&data)
Expand Down
18 changes: 18 additions & 0 deletions dtmsvr/storage/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,24 @@ func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error {
return err
}

// SetTransGlobalCronTime set nextCronTime of one global trans.
func (s *Store) SetTransGlobalCronTime(g *storage.TransGlobalStore) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
dt := g.NextCronTime
g := tGetGlobal(t, g.Gid)
if g == nil {
return storage.ErrNotFound
}
now := dtmutil.GetNextTime(0)
g.NextCronTime = dt
g.UpdateTime = now
tPutGlobal(t, g)
return nil
})
dtmimp.E2P(err)
return err
}

// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}
Expand Down
9 changes: 9 additions & 0 deletions dtmsvr/storage/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error
return err
}

// SetTransGlobalCronTime set nextCronTime of one global trans.
func (s *Store) SetTransGlobalCronTime(global *storage.TransGlobalStore) error {
now := dtmutil.GetNextTime(0)
global.UpdateTime = now
key := conf.Store.RedisPrefix + "_g_" + global.Gid
_, err := redisGet().Set(ctx, key, dtmimp.MustMarshalString(global), time.Duration(conf.Store.DataExpire)*time.Second).Result()
return err
}

// TouchCronTime updates cronTime
func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) {
global.UpdateTime = dtmutil.GetNextTime(0)
Expand Down
19 changes: 19 additions & 0 deletions dtmsvr/storage/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@
return err
}

// ResetTransGlobalCronTime set nextCronTime of one global trans.

Check failure on line 230 in dtmsvr/storage/sql/sql.go

View workflow job for this annotation

GitHub Actions / CI

comment on exported method Store.SetTransGlobalCronTime should be of the form "SetTransGlobalCronTime ..."
func (s *Store) SetTransGlobalCronTime(global *storage.TransGlobalStore) error {
timeStr := getTimeSqlStr(global.NextCronTime)
now := getTimeStr(0)
sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE gid = '%s'`,
now,
timeStr,
global.Gid)
_, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql)
return err
}

// ScanKV lists KV pairs
func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore {
kvs := []storage.KVStore{}
Expand Down Expand Up @@ -328,3 +340,10 @@
}
return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05")
}

func getTimeSqlStr(dt *time.Time) string {

Check failure on line 344 in dtmsvr/storage/sql/sql.go

View workflow job for this annotation

GitHub Actions / CI

func getTimeSqlStr should be getTimeSQLStr
if conf.Store.Driver == config.SQLServer {
return dt.Format(time.RFC3339)
}
return dt.Format("2006-01-02 15:04:05")
}
1 change: 1 addition & 0 deletions dtmsvr/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Store interface {
LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore
ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error)
ResetTransGlobalCronTime(global *TransGlobalStore) error
SetTransGlobalCronTime(global *TransGlobalStore) error
ScanKV(cat string, position *string, limit int64) []KVStore
FindKV(cat, key string) []KVStore
UpdateKV(kv *KVStore) error
Expand Down
9 changes: 9 additions & 0 deletions dtmsvr/trans_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ func (t *TransGlobal) resetNextCronTime() error {
return nil
}

func (t *TransGlobal) setNextCronTime() error {
err := GetStore().SetTransGlobalCronTime(&t.TransGlobalStore)
if err != nil {
return err
}
logger.Infof("SetTransGlobalCronTime to %s for %s", t.NextCronTime, t.TransGlobalStore.String())
return nil
}

func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPos int) {
now := time.Now()
b.Status = status
Expand Down
28 changes: 28 additions & 0 deletions test/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,31 @@ func TestAPIResetNextCronTime(t *testing.T) {
assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime)
assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
}

func TestAPISetNextCronTime(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
saga.Submit()
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
gid := saga.Gid

s := registry.GetStore()
g := s.FindTransGlobalStore(saga.Gid)

nextCronTime := time.Now().Add(30 * time.Second).UTC()
// set
resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{
"gid": saga.Gid,
"next_cron_time": nextCronTime.Format(time.RFC3339),
}).Post(dtmutil.DefaultHTTPServer + "/setNextCronTime")
assert.Nil(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode())

// after set assert
g2 := s.FindTransGlobalStore(gid)
assert.NotNil(t, g2)
assert.Equal(t, gid, g2.Gid)
assert.Equal(t, nextCronTime.Truncate(time.Second).UTC(), g2.NextCronTime.Truncate(time.Second).UTC())
assert.NotEqual(t, g.NextCronTime, g2.NextCronTime)
}
Loading