Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ func (s *contextImpl) allocateTimerIDsLocked(
workflowID string,
timerTasks []persistence.Task,
) error {
now := s.GetTimeSource().Now().Truncate(persistence.DBTimestampMinPrecision)
// assign IDs for the timer tasks. They need to be assigned under shard lock.
cluster := s.GetClusterMetadata().GetCurrentClusterName()
for _, task := range timerTasks {
Expand Down Expand Up @@ -1324,6 +1325,11 @@ func (s *contextImpl) allocateTimerIDsLocked(
}

readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster]
// make sure scheduled task timestamp is higher than
// 1. max read level, so that queue processor can read the task back.
// 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics.
// Above cases can happen if shard move and new host have a time SKU,
// or there is db write delay, or we are simply (re-)generating tasks for an old workflow.
if ts.Before(readCursorTS) {
// This can happen if shard move and new host have a time SKU, or there is db write delay.
// We generate a new timer ID using timerMaxReadLevel.
Expand All @@ -1336,6 +1342,14 @@ func (s *contextImpl) allocateTimerIDsLocked(
tag.ValueShardAllocateTimerBeforeRead)
ts = readCursorTS.Add(persistence.DBTimestampMinPrecision)
}
if ts.Before(now) {
s.logger.Warn("New timer generated is in the past",
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
tag.WorkflowID(workflowID),
tag.Timestamp(ts),
tag.ValueShardAllocateTimerBeforeRead)
ts = now.Add(persistence.DBTimestampMinPrecision)
}
task.SetVisibilityTimestamp(ts)

seqNum, err := s.generateTaskIDLocked()
Expand Down
41 changes: 39 additions & 2 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,17 +1425,19 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenDomainIsActiveActiveUs
}

func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeReadCursorAdjustsTimestamp() {
s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now())
testTimeNow := s.mockResource.TimeSource.Now()
domainCacheEntry := s.setupAllocateTimerIDsTest()

// Set up scheduled task max read level map with read cursor ahead of task timestamp
// Use the actual current cluster name from cluster metadata
currentCluster := s.context.GetClusterMetadata().GetCurrentClusterName()
readCursor := time.Date(2025, 1, 1, 13, 0, 0, 0, time.UTC)
readCursor := testTimeNow.Add(time.Second)
s.context.scheduledTaskMaxReadLevelMap[currentCluster] = readCursor

task := s.createMockTimerTask(createMockTimerTaskParams{
Version: constants.EmptyVersion,
Timestamp: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC), // before read cursor
Timestamp: readCursor.Add(-time.Second), // before read cursor
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: "test-run-id",
Expand All @@ -1457,6 +1459,41 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeRea
"Adjusted timestamp should match expected adjusted time")
}

func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeNow() {
s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now())
testTimeNow := s.mockResource.TimeSource.Now()
domainCacheEntry := s.setupAllocateTimerIDsTest()

// Set up scheduled task max read level map with read cursor ahead of task timestamp
// Use the actual current cluster name from cluster metadata
currentCluster := s.context.GetClusterMetadata().GetCurrentClusterName()
readCursor := testTimeNow.Add(-2 * time.Second) // read cursor is in the past
s.context.scheduledTaskMaxReadLevelMap[currentCluster] = readCursor

task := s.createMockTimerTask(createMockTimerTaskParams{
Version: constants.EmptyVersion,
Timestamp: readCursor.Add(-time.Second), // before now but after read cursor
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: "test-run-id",
})

err := s.context.allocateTimerIDsLocked(domainCacheEntry, testWorkflowID, []persistence.Task{task})

s.NoError(err)

// Verify timestamp was adjusted to be after read cursor
s.True(task.GetVisibilityTimestamp().After(readCursor),
"Task timestamp should be adjusted to be after read cursor")

// Verify it's the expected adjusted time (readCursor + DBTimestampMinPrecision)
expectedTime := testTimeNow.Add(persistence.DBTimestampMinPrecision)
actualTime := task.GetVisibilityTimestamp()
s.Equal(expectedTime.Truncate(persistence.DBTimestampMinPrecision),
actualTime.Truncate(persistence.DBTimestampMinPrecision),
"Adjusted timestamp should match expected adjusted time")
}

func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenClusterManagerLookupFailsReturnsError() {
// Create active-active domain cache entry
domainInfo := &persistence.DomainInfo{ID: testDomainID}
Expand Down
Loading