@@ -1148,3 +1148,127 @@ func TestShardClosedGuard(t *testing.T) {
1148
1148
})
1149
1149
}
1150
1150
}
1151
+
1152
+ func (s * contextTestSuite ) TestAllocateTimerIDsLocked () {
1153
+ s .mockResource .TimeSource = clock .NewMockedTimeSourceAt (time .Now ())
1154
+ testTimeNow := s .mockResource .TimeSource .Now ()
1155
+ testVisibilityTime := testTimeNow .Add (- time .Hour ).Truncate (persistence .DBTimestampMinPrecision )
1156
+ testReadCursorTime := testTimeNow .Add (time .Minute ).Truncate (persistence .DBTimestampMinPrecision )
1157
+ testFutureTime := testTimeNow .Add (30 * time .Minute ).Truncate (persistence .DBTimestampMinPrecision )
1158
+
1159
+ domainInfo := & persistence.DomainInfo {
1160
+ ID : testDomainID ,
1161
+ Name : testDomain ,
1162
+ }
1163
+
1164
+ tests := []struct {
1165
+ name string
1166
+ setupContext func (* contextImpl )
1167
+ setupMocks func ()
1168
+ domainEntry * cache.DomainCacheEntry
1169
+ workflowID string
1170
+ timerTasks []persistence.Task
1171
+ expectError bool
1172
+ expectedErrorMsg string
1173
+ validateResults func (* testing.T , * contextImpl , []persistence.Task )
1174
+ }{
1175
+ {
1176
+ name : "Queue V2 enabled - simple case" ,
1177
+ setupContext : func (ctx * contextImpl ) {
1178
+ ctx .config .EnableTimerQueueV2 = func (shardID int ) bool { return true }
1179
+ ctx .scheduledTaskMaxReadLevelMap [cluster .TestCurrentClusterName ] = testReadCursorTime
1180
+ },
1181
+ domainEntry : cache .NewLocalDomainCacheEntryForTest (domainInfo , & persistence.DomainConfig {}, cluster .TestCurrentClusterName ),
1182
+ workflowID : testWorkflowID ,
1183
+ timerTasks : []persistence.Task {
1184
+ & persistence.UserTimerTask {
1185
+ TaskData : persistence.TaskData {
1186
+ VisibilityTimestamp : testFutureTime ,
1187
+ },
1188
+ },
1189
+ },
1190
+ expectError : false ,
1191
+ validateResults : func (t * testing.T , ctx * contextImpl , tasks []persistence.Task ) {
1192
+ require .Len (t , tasks , 1 )
1193
+ // Task should keep its future timestamp
1194
+ assert .Equal (t , testFutureTime , tasks [0 ].GetVisibilityTimestamp ())
1195
+ },
1196
+ },
1197
+ {
1198
+ name : "Task timestamp before read cursor - should adjust timestamp" ,
1199
+ setupContext : func (ctx * contextImpl ) {
1200
+ ctx .config .EnableTimerQueueV2 = func (shardID int ) bool { return true }
1201
+ ctx .scheduledTaskMaxReadLevelMap [cluster .TestCurrentClusterName ] = testReadCursorTime
1202
+ ctx .logger = testlogger .New (s .T ())
1203
+ },
1204
+ domainEntry : cache .NewLocalDomainCacheEntryForTest (domainInfo , & persistence.DomainConfig {}, cluster .TestCurrentClusterName ),
1205
+ workflowID : testWorkflowID ,
1206
+ timerTasks : []persistence.Task {
1207
+ & persistence.UserTimerTask {
1208
+ TaskData : persistence.TaskData {
1209
+ VisibilityTimestamp : testVisibilityTime ,
1210
+ },
1211
+ },
1212
+ },
1213
+ expectError : false ,
1214
+ validateResults : func (t * testing.T , ctx * contextImpl , tasks []persistence.Task ) {
1215
+ require .Len (t , tasks , 1 )
1216
+ // Timestamp should have been adjusted to read cursor + precision
1217
+ expectedTime := testReadCursorTime .Add (persistence .DBTimestampMinPrecision )
1218
+ assert .Equal (t , expectedTime , tasks [0 ].GetVisibilityTimestamp ())
1219
+ },
1220
+ },
1221
+ {
1222
+ name : "Task timestamp before current time - should use current time as base" ,
1223
+ setupContext : func (ctx * contextImpl ) {
1224
+ ctx .config .EnableTimerQueueV2 = func (shardID int ) bool { return true }
1225
+ // Set read cursor to past time
1226
+ ctx .scheduledTaskMaxReadLevelMap [cluster .TestCurrentClusterName ] = testTimeNow .Add (- 2 * time .Hour )
1227
+ },
1228
+ domainEntry : cache .NewLocalDomainCacheEntryForTest (domainInfo , & persistence.DomainConfig {}, cluster .TestCurrentClusterName ),
1229
+ workflowID : testWorkflowID ,
1230
+ timerTasks : []persistence.Task {
1231
+ & persistence.UserTimerTask {
1232
+ TaskData : persistence.TaskData {
1233
+ VisibilityTimestamp : testVisibilityTime ,
1234
+ },
1235
+ },
1236
+ },
1237
+ expectError : false ,
1238
+ validateResults : func (t * testing.T , ctx * contextImpl , tasks []persistence.Task ) {
1239
+ require .Len (t , tasks , 1 )
1240
+ // Should use current time as base for adjustment
1241
+ expectedTime := testTimeNow .Add (persistence .DBTimestampMinPrecision )
1242
+ assert .Equal (t , expectedTime , tasks [0 ].GetVisibilityTimestamp ())
1243
+ },
1244
+ },
1245
+ }
1246
+
1247
+ for _ , tt := range tests {
1248
+ s .Run (tt .name , func () {
1249
+ // Reset context to clean state
1250
+ context := s .newContext ()
1251
+
1252
+ // Apply test-specific setup
1253
+ if tt .setupContext != nil {
1254
+ tt .setupContext (context )
1255
+ }
1256
+
1257
+ // Execute the method under test
1258
+ err := context .allocateTimerIDsLocked (tt .domainEntry , tt .workflowID , tt .timerTasks )
1259
+
1260
+ // Validate results
1261
+ if tt .expectError {
1262
+ s .Error (err )
1263
+ if tt .expectedErrorMsg != "" {
1264
+ s .Contains (err .Error (), tt .expectedErrorMsg )
1265
+ }
1266
+ } else {
1267
+ s .NoError (err )
1268
+ if tt .validateResults != nil {
1269
+ tt .validateResults (s .T (), context , tt .timerTasks )
1270
+ }
1271
+ }
1272
+ })
1273
+ }
1274
+ }
0 commit comments