diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 9d0d2355d59..a493234c2bc 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -1295,6 +1295,7 @@ func (s *contextImpl) allocateTimerIDsLocked( workflowID string, timerTasks []persistence.Task, ) error { + now := s.GetTimeSource().Now().Truncate(persistence.DBTimestampMinPrecision) // assign IDs for the timer tasks. They need to be assigned under shard lock. cluster := s.GetClusterMetadata().GetCurrentClusterName() for _, task := range timerTasks { @@ -1324,6 +1325,11 @@ func (s *contextImpl) allocateTimerIDsLocked( } readCursorTS := s.scheduledTaskMaxReadLevelMap[cluster] + // make sure scheduled task timestamp is higher than + // 1. max read level, so that queue processor can read the task back. + // 2. current time. Otherwise the task timestamp is in the past and causes aritical load latency in queue processor metrics. + // Above cases can happen if shard move and new host have a time SKU, + // or there is db write delay, or we are simply (re-)generating tasks for an old workflow. if ts.Before(readCursorTS) { // This can happen if shard move and new host have a time SKU, or there is db write delay. // We generate a new timer ID using timerMaxReadLevel. @@ -1336,6 +1342,14 @@ func (s *contextImpl) allocateTimerIDsLocked( tag.ValueShardAllocateTimerBeforeRead) ts = readCursorTS.Add(persistence.DBTimestampMinPrecision) } + if ts.Before(now) { + s.logger.Warn("New timer generated is in the past", + tag.WorkflowDomainID(domainEntry.GetInfo().ID), + tag.WorkflowID(workflowID), + tag.Timestamp(ts), + tag.ValueShardAllocateTimerBeforeRead) + ts = now.Add(persistence.DBTimestampMinPrecision) + } task.SetVisibilityTimestamp(ts) seqNum, err := s.generateTaskIDLocked() diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index 4e107027a6f..725326ccdb6 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -1425,17 +1425,19 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenDomainIsActiveActiveUs } func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeReadCursorAdjustsTimestamp() { + s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now()) + testTimeNow := s.mockResource.TimeSource.Now() domainCacheEntry := s.setupAllocateTimerIDsTest() // Set up scheduled task max read level map with read cursor ahead of task timestamp // Use the actual current cluster name from cluster metadata currentCluster := s.context.GetClusterMetadata().GetCurrentClusterName() - readCursor := time.Date(2025, 1, 1, 13, 0, 0, 0, time.UTC) + readCursor := testTimeNow.Add(time.Second) s.context.scheduledTaskMaxReadLevelMap[currentCluster] = readCursor task := s.createMockTimerTask(createMockTimerTaskParams{ Version: constants.EmptyVersion, - Timestamp: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC), // before read cursor + Timestamp: readCursor.Add(-time.Second), // before read cursor DomainID: testDomainID, WorkflowID: testWorkflowID, RunID: "test-run-id", @@ -1457,6 +1459,41 @@ func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeRea "Adjusted timestamp should match expected adjusted time") } +func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenTaskTimestampBeforeNow() { + s.mockResource.TimeSource = clock.NewMockedTimeSourceAt(time.Now()) + testTimeNow := s.mockResource.TimeSource.Now() + domainCacheEntry := s.setupAllocateTimerIDsTest() + + // Set up scheduled task max read level map with read cursor ahead of task timestamp + // Use the actual current cluster name from cluster metadata + currentCluster := s.context.GetClusterMetadata().GetCurrentClusterName() + readCursor := testTimeNow.Add(-2 * time.Second) // read cursor is in the past + s.context.scheduledTaskMaxReadLevelMap[currentCluster] = readCursor + + task := s.createMockTimerTask(createMockTimerTaskParams{ + Version: constants.EmptyVersion, + Timestamp: readCursor.Add(-time.Second), // before now but after read cursor + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: "test-run-id", + }) + + err := s.context.allocateTimerIDsLocked(domainCacheEntry, testWorkflowID, []persistence.Task{task}) + + s.NoError(err) + + // Verify timestamp was adjusted to be after read cursor + s.True(task.GetVisibilityTimestamp().After(readCursor), + "Task timestamp should be adjusted to be after read cursor") + + // Verify it's the expected adjusted time (readCursor + DBTimestampMinPrecision) + expectedTime := testTimeNow.Add(persistence.DBTimestampMinPrecision) + actualTime := task.GetVisibilityTimestamp() + s.Equal(expectedTime.Truncate(persistence.DBTimestampMinPrecision), + actualTime.Truncate(persistence.DBTimestampMinPrecision), + "Adjusted timestamp should match expected adjusted time") +} + func (s *contextTestSuite) TestAllocateTimerIDsLocked_WhenClusterManagerLookupFailsReturnsError() { // Create active-active domain cache entry domainInfo := &persistence.DomainInfo{ID: testDomainID}