Skip to content

Commit 2adeb63

Browse files
committed
Do not generate scheduled task with timestamp in the past
1 parent db5eaf8 commit 2adeb63

File tree

2 files changed

+138
-0
lines changed

2 files changed

+138
-0
lines changed

service/history/shard/context.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,6 +1295,7 @@ func (s *contextImpl) allocateTimerIDsLocked(
12951295
workflowID string,
12961296
timerTasks []persistence.Task,
12971297
) error {
1298+
now := s.GetTimeSource().Now().Truncate(persistence.DBTimestampMinPrecision)
12981299
// assign IDs for the timer tasks. They need to be assigned under shard lock.
12991300
cluster := s.GetClusterMetadata().GetCurrentClusterName()
13001301
for _, task := range timerTasks {
@@ -1324,6 +1325,11 @@ func (s *contextImpl) allocateTimerIDsLocked(
13241325
}
13251326

13261327
readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster]
1328+
// make sure scheduled task timestamp is higher than
1329+
// 1. max read level, so that queue processor can read the task back.
1330+
// 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics.
1331+
// Above cases can happen if shard move and new host have a time SKU,
1332+
// or there is db write delay, or we are simply (re-)generating tasks for an old workflow.
13271333
if ts.Before(readCursorTS) {
13281334
// This can happen if shard move and new host have a time SKU, or there is db write delay.
13291335
// We generate a new timer ID using timerMaxReadLevel.
@@ -1336,6 +1342,14 @@ func (s *contextImpl) allocateTimerIDsLocked(
13361342
tag.ValueShardAllocateTimerBeforeRead)
13371343
ts = readCursorTS.Add(persistence.DBTimestampMinPrecision)
13381344
}
1345+
if ts.Before(now) {
1346+
s.logger.Warn("New timer generated is in the past",
1347+
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
1348+
tag.WorkflowID(workflowID),
1349+
tag.Timestamp(ts),
1350+
tag.ValueShardAllocateTimerBeforeRead)
1351+
ts = now
1352+
}
13391353
task.SetVisibilityTimestamp(ts)
13401354

13411355
seqNum, err := s.generateTaskIDLocked()

service/history/shard/context_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,3 +1575,127 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenMultipleTasksProvidedA
15751575
s.True(task1.GetTaskID() > 0, "Task 1 ID should be positive")
15761576
s.True(task2.GetTaskID() > 0, "Task 2 ID should be positive")
15771577
}
1578+
1579+
func (s *contextTestSuite) TestAllocateTimerIDsLocked() {
1580+
s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now())
1581+
testTimeNow := s.mockResource.TimeSource.Now()
1582+
testVisibilityTime := testTimeNow.Add(-time.Hour).Truncate(persistence.DBTimestampMinPrecision)
1583+
testReadCursorTime := testTimeNow.Add(time.Minute).Truncate(persistence.DBTimestampMinPrecision)
1584+
testFutureTime := testTimeNow.Add(30 * time.Minute).Truncate(persistence.DBTimestampMinPrecision)
1585+
1586+
domainInfo := &persistence.DomainInfo{
1587+
ID: testDomainID,
1588+
Name: testDomain,
1589+
}
1590+
1591+
tests := []struct {
1592+
name string
1593+
setupContext func(*contextImpl)
1594+
setupMocks func()
1595+
domainEntry *cache.DomainCacheEntry
1596+
workflowID string
1597+
timerTasks []persistence.Task
1598+
expectError bool
1599+
expectedErrorMsg string
1600+
validateResults func(*testing.T, *contextImpl, []persistence.Task)
1601+
}{
1602+
{
1603+
name: "Queue V2 enabled - simple case",
1604+
setupContext: func(ctx *contextImpl) {
1605+
ctx.config.EnableTimerQueueV2 = func(shardID int) bool { return true }
1606+
ctx.scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName] = testReadCursorTime
1607+
},
1608+
domainEntry: cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, cluster.TestCurrentClusterName),
1609+
workflowID: testWorkflowID,
1610+
timerTasks: []persistence.Task{
1611+
&persistence.UserTimerTask{
1612+
TaskData: persistence.TaskData{
1613+
VisibilityTimestamp: testFutureTime,
1614+
},
1615+
},
1616+
},
1617+
expectError: false,
1618+
validateResults: func(t *testing.T, ctx *contextImpl, tasks []persistence.Task) {
1619+
require.Len(t, tasks, 1)
1620+
// Task should keep its future timestamp
1621+
assert.Equal(t, testFutureTime, tasks[0].GetVisibilityTimestamp())
1622+
},
1623+
},
1624+
{
1625+
name: "Task timestamp before read cursor - should adjust timestamp",
1626+
setupContext: func(ctx *contextImpl) {
1627+
ctx.config.EnableTimerQueueV2 = func(shardID int) bool { return true }
1628+
ctx.scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName] = testReadCursorTime
1629+
ctx.logger = testlogger.New(s.T())
1630+
},
1631+
domainEntry: cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, cluster.TestCurrentClusterName),
1632+
workflowID: testWorkflowID,
1633+
timerTasks: []persistence.Task{
1634+
&persistence.UserTimerTask{
1635+
TaskData: persistence.TaskData{
1636+
VisibilityTimestamp: testVisibilityTime,
1637+
},
1638+
},
1639+
},
1640+
expectError: false,
1641+
validateResults: func(t *testing.T, ctx *contextImpl, tasks []persistence.Task) {
1642+
require.Len(t, tasks, 1)
1643+
// Timestamp should have been adjusted to read cursor + precision
1644+
expectedTime := testReadCursorTime.Add(persistence.DBTimestampMinPrecision)
1645+
assert.Equal(t, expectedTime, tasks[0].GetVisibilityTimestamp())
1646+
},
1647+
},
1648+
{
1649+
name: "Task timestamp before current time - should use current time as base",
1650+
setupContext: func(ctx *contextImpl) {
1651+
ctx.config.EnableTimerQueueV2 = func(shardID int) bool { return true }
1652+
// Set read cursor to past time
1653+
ctx.scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName] = testTimeNow.Add(-2 * time.Hour)
1654+
},
1655+
domainEntry: cache.NewLocalDomainCacheEntryForTest(domainInfo, &persistence.DomainConfig{}, cluster.TestCurrentClusterName),
1656+
workflowID: testWorkflowID,
1657+
timerTasks: []persistence.Task{
1658+
&persistence.UserTimerTask{
1659+
TaskData: persistence.TaskData{
1660+
VisibilityTimestamp: testVisibilityTime,
1661+
},
1662+
},
1663+
},
1664+
expectError: false,
1665+
validateResults: func(t *testing.T, ctx *contextImpl, tasks []persistence.Task) {
1666+
require.Len(t, tasks, 1)
1667+
// Should use current time as base for adjustment
1668+
expectedTime := testTimeNow.Truncate(persistence.DBTimestampMinPrecision).Add(persistence.DBTimestampMinPrecision)
1669+
assert.Equal(t, expectedTime, tasks[0].GetVisibilityTimestamp())
1670+
},
1671+
},
1672+
}
1673+
1674+
for _, tt := range tests {
1675+
s.Run(tt.name, func() {
1676+
// Reset context to clean state
1677+
context := s.newContext()
1678+
1679+
// Apply test-specific setup
1680+
if tt.setupContext != nil {
1681+
tt.setupContext(context)
1682+
}
1683+
1684+
// Execute the method under test
1685+
err := context.allocateTimerIDsLocked(tt.domainEntry, tt.workflowID, tt.timerTasks)
1686+
1687+
// Validate results
1688+
if tt.expectError {
1689+
s.Error(err)
1690+
if tt.expectedErrorMsg != "" {
1691+
s.Contains(err.Error(), tt.expectedErrorMsg)
1692+
}
1693+
} else {
1694+
s.NoError(err)
1695+
if tt.validateResults != nil {
1696+
tt.validateResults(s.T(), context, tt.timerTasks)
1697+
}
1698+
}
1699+
})
1700+
}
1701+
}

0 commit comments

Comments
 (0)