Skip to content

Commit 14a3fcd

Browse files
committed
Do not generate scheduled task with timestamp in the past
1 parent 9dbbad2 commit 14a3fcd

File tree

2 files changed

+133
-1
lines changed

2 files changed

+133
-1
lines changed

service/history/shard/context.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1296,7 +1296,7 @@ func (s *contextImpl) allocateTimerIDsLocked(
12961296
workflowID string,
12971297
timerTasks []persistence.Task,
12981298
) error {
1299-
1299+
now := s.GetTimeSource().Now().Truncate(persistence.DBTimestampMinPrecision)
13001300
// assign IDs for the timer tasks. They need to be assigned under shard lock.
13011301
cluster := s.GetClusterMetadata().GetCurrentClusterName()
13021302
for _, task := range timerTasks {
@@ -1324,6 +1324,14 @@ func (s *contextImpl) allocateTimerIDsLocked(
13241324
}
13251325

13261326
readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster]
1327+
if readCursorTS.Before(now) {
1328+
readCursorTS = now
1329+
}
1330+
// make sure scheduled task timestamp is higher than
1331+
// 1. max read level, so that queue processor can read the task back.
1332+
// 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics.
1333+
// Above cases can happen if shard move and new host have a time SKU,
1334+
// or there is db write delay, or we are simply (re-)generating tasks for an old workflow.
13271335
if ts.Before(readCursorTS) {
13281336
// This can happen if shard move and new host have a time SKU, or there is db write delay.
13291337
// We generate a new timer ID using timerMaxReadLevel.

service/history/shard/context_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,3 +1148,127 @@ func TestShardClosedGuard(t *testing.T) {
11481148
})
11491149
}
11501150
}
1151+
1152+
func (s *contextTestSuite) TestAllocateTimerIDsLocked() {
1153+
s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now())
1154+
testTimeNow := s.mockResource.TimeSource.Now()
1155+
testVisibilityTime := testTimeNow.Add(-time.Hour).Truncate(persistence.DBTimestampMinPrecision)
1156+
testReadCursorTime := testTimeNow.Add(time.Minute).Truncate(persistence.DBTimestampMinPrecision)
1157+
testFutureTime := testTimeNow.Add(30 * time.Minute).Truncate(persistence.DBTimestampMinPrecision)
1158+
1159+
domainInfo := &persistence.DomainInfo{
1160+
ID: testDomainID,
1161+
Name: testDomain,
1162+
}
1163+
1164+
tests := []struct {
1165+
name string
1166+
setupContext func(*contextImpl)
1167+
setupMocks func()
1168+
domainEntry *cache.DomainCacheEntry
1169+
workflowID string
1170+
timerTasks []persistence.Task
1171+
expectError bool
1172+
expectedErrorMsg string
1173+
validateResults func(*testing.T, *contextImpl, []persistence.Task)
1174+
}{
1175+
{
1176+
name: "Queue V2 enabled - simple case",
1177+
setupContext: func(ctx *contextImpl) {
1178+
ctx.config.EnableTimerQueueV2 = func(shardID int) bool { return true }
1179+
ctx.scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName] = testReadCursorTime
1180+
},
1181+
domainEntry: cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, cluster.TestCurrentClusterName),
1182+
workflowID: testWorkflowID,
1183+
timerTasks: []persistence.Task{
1184+
&persistence.UserTimerTask{
1185+
TaskData: persistence.TaskData{
1186+
VisibilityTimestamp: testFutureTime,
1187+
},
1188+
},
1189+
},
1190+
expectError: false,
1191+
validateResults: func(t *testing.T, ctx *contextImpl, tasks []persistence.Task) {
1192+
require.Len(t, tasks, 1)
1193+
// Task should keep its future timestamp
1194+
assert.Equal(t, testFutureTime, tasks[0].GetVisibilityTimestamp())
1195+
},
1196+
},
1197+
{
1198+
name: "Task timestamp before read cursor - should adjust timestamp",
1199+
setupContext: func(ctx *contextImpl) {
1200+
ctx.config.EnableTimerQueueV2 = func(shardID int) bool { return true }
1201+
ctx.scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName] = testReadCursorTime
1202+
ctx.logger = testlogger.New(s.T())
1203+
},
1204+
domainEntry: cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, cluster.TestCurrentClusterName),
1205+
workflowID: testWorkflowID,
1206+
timerTasks: []persistence.Task{
1207+
&persistence.UserTimerTask{
1208+
TaskData: persistence.TaskData{
1209+
VisibilityTimestamp: testVisibilityTime,
1210+
},
1211+
},
1212+
},
1213+
expectError: false,
1214+
validateResults: func(t *testing.T, ctx *contextImpl, tasks []persistence.Task) {
1215+
require.Len(t, tasks, 1)
1216+
// Timestamp should have been adjusted to read cursor + precision
1217+
expectedTime := testReadCursorTime.Add(persistence.DBTimestampMinPrecision)
1218+
assert.Equal(t, expectedTime, tasks[0].GetVisibilityTimestamp())
1219+
},
1220+
},
1221+
{
1222+
name: "Task timestamp before current time - should use current time as base",
1223+
setupContext: func(ctx *contextImpl) {
1224+
ctx.config.EnableTimerQueueV2 = func(shardID int) bool { return true }
1225+
// Set read cursor to past time
1226+
ctx.scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName] = testTimeNow.Add(-2 * time.Hour)
1227+
},
1228+
domainEntry: cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, cluster.TestCurrentClusterName),
1229+
workflowID: testWorkflowID,
1230+
timerTasks: []persistence.Task{
1231+
&persistence.UserTimerTask{
1232+
TaskData: persistence.TaskData{
1233+
VisibilityTimestamp: testVisibilityTime,
1234+
},
1235+
},
1236+
},
1237+
expectError: false,
1238+
validateResults: func(t *testing.T, ctx *contextImpl, tasks []persistence.Task) {
1239+
require.Len(t, tasks, 1)
1240+
// Should use current time as base for adjustment
1241+
expectedTime := testTimeNow.Truncate(persistence.DBTimestampMinPrecision).Add(persistence.DBTimestampMinPrecision)
1242+
assert.Equal(t, expectedTime, tasks[0].GetVisibilityTimestamp())
1243+
},
1244+
},
1245+
}
1246+
1247+
for _, tt := range tests {
1248+
s.Run(tt.name, func() {
1249+
// Reset context to clean state
1250+
context := s.newContext()
1251+
1252+
// Apply test-specific setup
1253+
if tt.setupContext != nil {
1254+
tt.setupContext(context)
1255+
}
1256+
1257+
// Execute the method under test
1258+
err := context.allocateTimerIDsLocked(tt.domainEntry, tt.workflowID, tt.timerTasks)
1259+
1260+
// Validate results
1261+
if tt.expectError {
1262+
s.Error(err)
1263+
if tt.expectedErrorMsg != "" {
1264+
s.Contains(err.Error(), tt.expectedErrorMsg)
1265+
}
1266+
} else {
1267+
s.NoError(err)
1268+
if tt.validateResults != nil {
1269+
tt.validateResults(s.T(), context, tt.timerTasks)
1270+
}
1271+
}
1272+
})
1273+
}
1274+
}

0 commit comments

Comments
 (0)