diff --git a/admin/src/api/api_dtm.ts b/admin/src/api/api_dtm.ts index 349cc3be..05ccb6b7 100644 --- a/admin/src/api/api_dtm.ts +++ b/admin/src/api/api_dtm.ts @@ -100,6 +100,14 @@ export function resetNextCronTime(gid: string): Promise { }) } +export function setNextCronTime(gid: string, time: Date): Promise { + return request({ + url: '/api/dtmsvr/setNextCronTime', + method: 'post', + data: { gid, next_cron_time: time} + }) +} + export function getDtmVersion(): Promise> { return request({ url: '/api/dtmsvr/version', diff --git a/admin/src/components.d.ts b/admin/src/components.d.ts index 9bb4eebf..b2f698c9 100644 --- a/admin/src/components.d.ts +++ b/admin/src/components.d.ts @@ -11,6 +11,7 @@ declare module 'vue' { ABreadcrumb: typeof import('ant-design-vue/es')['Breadcrumb'] ABreadcrumbItem: typeof import('ant-design-vue/es')['BreadcrumbItem'] AButton: typeof import('ant-design-vue/es')['Button'] + ADatePicker: typeof import('ant-design-vue/es')['DatePicker'] ADescriptions: typeof import('ant-design-vue/es')['Descriptions'] ADescriptionsItem: typeof import('ant-design-vue/es')['DescriptionsItem'] ADivider: typeof import('ant-design-vue/es')['Divider'] diff --git a/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue b/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue index e27f7638..44e5e388 100644 --- a/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue +++ b/admin/src/views/Dashboard/GlobalTransactions/DialogTransactionDetail.vue @@ -26,6 +26,21 @@ class="action-button" @confirm="handleSetNextCronTimeToNow(transaction?.gid)" > Reset next cron time + + + + Set next cron time @@ -61,7 +76,7 @@ import { getTransaction } from '/@/api/api_dtm' import screenfull from '/@/components/Screenfull/index.vue' import { useRoute } from 'vue-router'; import { string } from 'vue-types'; -import { forceStopTransaction, resetNextCronTime } from '/@/api/api_dtm' +import { forceStopTransaction, resetNextCronTime, setNextCronTime } from '/@/api/api_dtm' // import VueJsonPretty from 'vue-json-pretty'; // import 'vue-json-pretty/lib/styles.css' const route = useRoute(); @@ -72,6 +87,7 @@ const transaction = ref() const visible = ref(false) const textVal = ref('') const closeable = ref(true) +const nextCronTimeInput = ref() let _gid = route.params.gid; @@ -137,6 +153,11 @@ const handleSetNextCronTimeToNow = async(gid: string) => { refresh(); } +const handleSetNextCronTime = async(gid: string) => { + await setNextCronTime(gid, nextCronTimeInput.value); + refresh(); +} + type Data = { branches: { gid: string diff --git a/dtmsvr/api.go b/dtmsvr/api.go index fb9b09ba..cae77bc4 100644 --- a/dtmsvr/api.go +++ b/dtmsvr/api.go @@ -93,6 +93,12 @@ func svcResetNextCronTime(t *TransGlobal) error { return dbt.resetNextCronTime() } +func svcSetNextCronTime(t *TransGlobal, dt *time.Time) error { + dbt := GetTransGlobal(t.Gid) + dbt.NextCronTime = dt + return dbt.setNextCronTime() +} + func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) error { branches := []TransBranch{*branch, *branch} if transType == "tcc" { diff --git a/dtmsvr/api_http.go b/dtmsvr/api_http.go index 3d8d09d7..fceecac3 100644 --- a/dtmsvr/api_http.go +++ b/dtmsvr/api_http.go @@ -41,6 +41,7 @@ func addRoute(engine *gin.Engine) { engine.GET("/api/dtmsvr/scanKV", dtmutil.WrapHandler2(scanKV)) engine.GET("/api/dtmsvr/queryKV", dtmutil.WrapHandler2(queryKV)) engine.POST("/api/dtmsvr/resetNextCronTime", dtmutil.WrapHandler2(resetNextCronTime)) // one global trans only + engine.POST("/api/dtmsvr/setNextCronTime", dtmutil.WrapHandler2(setNextCronTime)) // one global trans only // add prometheus exporter h := promhttp.Handler() @@ -74,6 +75,11 @@ func resetNextCronTime(c *gin.Context) interface{} { return svcResetNextCronTime(TransFromContext(c)) } +func setNextCronTime(c *gin.Context) interface{} { + trans := TransFromContext(c) + return svcSetNextCronTime(trans, trans.NextCronTime) +} + func registerBranch(c *gin.Context) interface{} { data := map[string]string{} err := c.BindJSON(&data) diff --git a/dtmsvr/storage/boltdb/boltdb.go b/dtmsvr/storage/boltdb/boltdb.go index bf3fb31b..8dde7abd 100644 --- a/dtmsvr/storage/boltdb/boltdb.go +++ b/dtmsvr/storage/boltdb/boltdb.go @@ -501,6 +501,24 @@ func (s *Store) ResetTransGlobalCronTime(g *storage.TransGlobalStore) error { return err } +// SetTransGlobalCronTime set nextCronTime of one global trans. +func (s *Store) SetTransGlobalCronTime(g *storage.TransGlobalStore) error { + err := s.boltDb.Update(func(t *bolt.Tx) error { + dt := g.NextCronTime + g := tGetGlobal(t, g.Gid) + if g == nil { + return storage.ErrNotFound + } + now := dtmutil.GetNextTime(0) + g.NextCronTime = dt + g.UpdateTime = now + tPutGlobal(t, g) + return nil + }) + dtmimp.E2P(err) + return err +} + // ScanKV lists KV pairs func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore { kvs := []storage.KVStore{} diff --git a/dtmsvr/storage/redis/redis.go b/dtmsvr/storage/redis/redis.go index 036e2bbd..78c07f57 100644 --- a/dtmsvr/storage/redis/redis.go +++ b/dtmsvr/storage/redis/redis.go @@ -344,6 +344,15 @@ func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error return err } +// SetTransGlobalCronTime set nextCronTime of one global trans. +func (s *Store) SetTransGlobalCronTime(global *storage.TransGlobalStore) error { + now := dtmutil.GetNextTime(0) + global.UpdateTime = now + key := conf.Store.RedisPrefix + "_g_" + global.Gid + _, err := redisGet().Set(ctx, key, dtmimp.MustMarshalString(global), time.Duration(conf.Store.DataExpire)*time.Second).Result() + return err +} + // TouchCronTime updates cronTime func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval int64, nextCronTime *time.Time) { global.UpdateTime = dtmutil.GetNextTime(0) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index 569c966e..ce71ba21 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -227,6 +227,18 @@ func (s *Store) ResetTransGlobalCronTime(global *storage.TransGlobalStore) error return err } +// ResetTransGlobalCronTime set nextCronTime of one global trans. +func (s *Store) SetTransGlobalCronTime(global *storage.TransGlobalStore) error { + timeStr := getTimeSqlStr(global.NextCronTime) + now := getTimeStr(0) + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s' WHERE gid = '%s'`, + now, + timeStr, + global.Gid) + _, err := dtmimp.DBExec(conf.Store.Driver, dbGet().ToSQLDB(), sql) + return err +} + // ScanKV lists KV pairs func (s *Store) ScanKV(cat string, position *string, limit int64) []storage.KVStore { kvs := []storage.KVStore{} @@ -328,3 +340,10 @@ func getTimeStr(afterSecond int64) string { } return dtmutil.GetNextTime(afterSecond).Format("2006-01-02 15:04:05") } + +func getTimeSqlStr(dt *time.Time) string { + if conf.Store.Driver == config.SQLServer { + return dt.Format(time.RFC3339) + } + return dt.Format("2006-01-02 15:04:05") +} diff --git a/dtmsvr/storage/store.go b/dtmsvr/storage/store.go index 32456c1a..f13c9c5f 100644 --- a/dtmsvr/storage/store.go +++ b/dtmsvr/storage/store.go @@ -32,6 +32,7 @@ type Store interface { LockOneGlobalTrans(expireIn time.Duration) *TransGlobalStore ResetCronTime(after time.Duration, limit int64) (succeedCount int64, hasRemaining bool, err error) ResetTransGlobalCronTime(global *TransGlobalStore) error + SetTransGlobalCronTime(global *TransGlobalStore) error ScanKV(cat string, position *string, limit int64) []KVStore FindKV(cat, key string) []KVStore UpdateKV(kv *KVStore) error diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 742a0949..7dd11e50 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -95,6 +95,15 @@ func (t *TransGlobal) resetNextCronTime() error { return nil } +func (t *TransGlobal) setNextCronTime() error { + err := GetStore().SetTransGlobalCronTime(&t.TransGlobalStore) + if err != nil { + return err + } + logger.Infof("SetTransGlobalCronTime to %s for %s", t.NextCronTime, t.TransGlobalStore.String()) + return nil +} + func (t *TransGlobal) changeBranchStatus(b *TransBranch, status string, branchPos int) { now := time.Now() b.Status = status diff --git a/test/api_test.go b/test/api_test.go index 9e75eb33..cb032056 100644 --- a/test/api_test.go +++ b/test/api_test.go @@ -275,3 +275,31 @@ func TestAPIResetNextCronTime(t *testing.T) { assert.Greater(t, time.Now().Add(3*time.Second), *g2.NextCronTime) assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) } + +func TestAPISetNextCronTime(t *testing.T) { + saga := genSaga(dtmimp.GetFuncName(), false, false) + saga.Submit() + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) + gid := saga.Gid + + s := registry.GetStore() + g := s.FindTransGlobalStore(saga.Gid) + + nextCronTime := time.Now().Add(30 * time.Second).UTC() + // set + resp, err := dtmcli.GetRestyClient().R().SetBody(map[string]string{ + "gid": saga.Gid, + "next_cron_time": nextCronTime.Format(time.RFC3339), + }).Post(dtmutil.DefaultHTTPServer + "/setNextCronTime") + assert.Nil(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode()) + + // after set assert + g2 := s.FindTransGlobalStore(gid) + assert.NotNil(t, g2) + assert.Equal(t, gid, g2.Gid) + assert.Equal(t, nextCronTime.Truncate(time.Second).UTC(), g2.NextCronTime.Truncate(time.Second).UTC()) + assert.NotEqual(t, g.NextCronTime, g2.NextCronTime) +}