File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change @@ -1301,7 +1301,9 @@ func (s *contextImpl) allocateTimerIDsLocked(
1301
1301
cluster := s .GetClusterMetadata ().GetCurrentClusterName ()
1302
1302
for _ , task := range timerTasks {
1303
1303
ts := task .GetVisibilityTimestamp ().Truncate (persistence .DBTimestampMinPrecision )
1304
- if task .GetVersion () != constants .EmptyVersion {
1304
+ // always use current cluster's max read level for queue v2, and this is safe for rollback,
1305
+ // because if we go back to queue v1, the standby queue and active queue will start from the same ack level to read tasks
1306
+ if task .GetVersion () != constants .EmptyVersion && ! s .GetConfig ().EnableTimerQueueV2 (s .shardID ) {
1305
1307
// cannot use version to determine the corresponding cluster for timer task
1306
1308
// this is because during failover, timer task should be created as active
1307
1309
// or otherwise, failover + active processing logic may not pick up the task.
@@ -1559,6 +1561,7 @@ func acquireShard(
1559
1561
1560
1562
// initialize the cluster current time to be the same as ack level
1561
1563
remoteClusterCurrentTime := make (map [string ]time.Time )
1564
+ // TODO: get this information from QueueState once TimerAckLevel field is deprecated
1562
1565
scheduledTaskMaxReadLevelMap := make (map [string ]time.Time )
1563
1566
for clusterName := range shardItem .GetClusterMetadata ().GetEnabledClusterInfo () {
1564
1567
if clusterName != shardItem .GetClusterMetadata ().GetCurrentClusterName () {
You can’t perform that action at this time.
0 commit comments