Skip to content

Commit 0ed9d90

Browse files
committed
Do not generate scheduled task with timestamp in the past
1 parent db5eaf8 commit 0ed9d90

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

service/history/shard/context.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,6 +1295,7 @@ func (s *contextImpl) allocateTimerIDsLocked(
12951295
workflowID string,
12961296
timerTasks []persistence.Task,
12971297
) error {
1298+
now := s.GetTimeSource().Now().Truncate(persistence.DBTimestampMinPrecision)
12981299
// assign IDs for the timer tasks. They need to be assigned under shard lock.
12991300
cluster := s.GetClusterMetadata().GetCurrentClusterName()
13001301
for _, task := range timerTasks {
@@ -1324,6 +1325,11 @@ func (s *contextImpl) allocateTimerIDsLocked(
13241325
}
13251326

13261327
readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster]
1328+
// make sure scheduled task timestamp is higher than
1329+
// 1. max read level, so that queue processor can read the task back.
1330+
// 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics.
1331+
// Above cases can happen if shard move and new host have a time SKU,
1332+
// or there is db write delay, or we are simply (re-)generating tasks for an old workflow.
13271333
if ts.Before(readCursorTS) {
13281334
// This can happen if shard move and new host have a time SKU, or there is db write delay.
13291335
// We generate a new timer ID using timerMaxReadLevel.
@@ -1336,6 +1342,14 @@ func (s *contextImpl) allocateTimerIDsLocked(
13361342
tag.ValueShardAllocateTimerBeforeRead)
13371343
ts = readCursorTS.Add(persistence.DBTimestampMinPrecision)
13381344
}
1345+
if ts.Before(now) {
1346+
s.logger.Warn("New timer generated is in the past",
1347+
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
1348+
tag.WorkflowID(workflowID),
1349+
tag.Timestamp(ts),
1350+
tag.ValueShardAllocateTimerBeforeRead)
1351+
ts = now.Add(persistence.DBTimestampMinPrecision)
1352+
}
13391353
task.SetVisibilityTimestamp(ts)
13401354

13411355
seqNum, err := s.generateTaskIDLocked()

service/history/shard/context_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,17 +1425,19 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenDomainIsActiveActiveUs
14251425
}
14261426

14271427
func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeReadCursorAdjustsTimestamp() {
1428+
s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now())
1429+
testTimeNow := s.mockResource.TimeSource.Now()
14281430
domainCacheEntry := s.setupAllocateTimerIDsTest()
14291431

14301432
// Set up scheduled task max read level map with read cursor ahead of task timestamp
14311433
// Use the actual current cluster name from cluster metadata
14321434
currentCluster := s.context.GetClusterMetadata().GetCurrentClusterName()
1433-
readCursor := time.Date(2025, 1, 1, 13, 0, 0, 0, time.UTC)
1435+
readCursor := testTimeNow.Add(time.Second)
14341436
s.context.scheduledTaskMaxReadLevelMap[currentCluster] = readCursor
14351437

14361438
task := s.createMockTimerTask(createMockTimerTaskParams{
14371439
Version: constants.EmptyVersion,
1438-
Timestamp: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC), // before read cursor
1440+
Timestamp: readCursor.Add(-time.Second), // before read cursor
14391441
DomainID: testDomainID,
14401442
WorkflowID: testWorkflowID,
14411443
RunID: "test-run-id",
@@ -1457,6 +1459,41 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeRea
14571459
"Adjusted timestamp should match expected adjusted time")
14581460
}
14591461

1462+
func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeNow() {
1463+
s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now())
1464+
testTimeNow := s.mockResource.TimeSource.Now()
1465+
domainCacheEntry := s.setupAllocateTimerIDsTest()
1466+
1467+
// Set up scheduled task max read level map with read cursor ahead of task timestamp
1468+
// Use the actual current cluster name from cluster metadata
1469+
currentCluster := s.context.GetClusterMetadata().GetCurrentClusterName()
1470+
readCursor := testTimeNow.Add(-2 * time.Second) // read cursor is in the past
1471+
s.context.scheduledTaskMaxReadLevelMap[currentCluster] = readCursor
1472+
1473+
task := s.createMockTimerTask(createMockTimerTaskParams{
1474+
Version: constants.EmptyVersion,
1475+
Timestamp: readCursor.Add(-time.Second), // before now but after read cursor
1476+
DomainID: testDomainID,
1477+
WorkflowID: testWorkflowID,
1478+
RunID: "test-run-id",
1479+
})
1480+
1481+
err := s.context.allocateTimerIDsLocked(domainCacheEntry, testWorkflowID, []persistence.Task{task})
1482+
1483+
s.NoError(err)
1484+
1485+
// Verify timestamp was adjusted to be after read cursor
1486+
s.True(task.GetVisibilityTimestamp().After(readCursor),
1487+
"Task timestamp should be adjusted to be after read cursor")
1488+
1489+
// Verify it's the expected adjusted time (readCursor + DBTimestampMinPrecision)
1490+
expectedTime := testTimeNow.Add(persistence.DBTimestampMinPrecision)
1491+
actualTime := task.GetVisibilityTimestamp()
1492+
s.Equal(expectedTime.Truncate(persistence.DBTimestampMinPrecision),
1493+
actualTime.Truncate(persistence.DBTimestampMinPrecision),
1494+
"Adjusted timestamp should match expected adjusted time")
1495+
}
1496+
14601497
func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenClusterManagerLookupFailsReturnsError() {
14611498
// Create active-active domain cache entry
14621499
domainInfo := &persistence.DomainInfo{ID: testDomainID}

0 commit comments

Comments
 (0)