@@ -254,79 +254,6 @@ func TestCleanupStaleShardStats(t *testing.T) {
254254
255255}
256256
257- func TestCleanupStaleShardStats (t * testing.T ) {
258- t .Run ("stale shard stats are deleted" , func (t * testing.T ) {
259- mocks := setupProcessorTest (t , config .NamespaceTypeFixed )
260- defer mocks .ctrl .Finish ()
261- processor := mocks .factory .CreateProcessor (mocks .cfg , mocks .store , mocks .election ).(* namespaceProcessor )
262-
263- now := mocks .timeSource .Now ()
264-
265- heartbeats := map [string ]store.HeartbeatState {
266- "exec-active" : {LastHeartbeat : now .Unix (), Status : types .ExecutorStatusACTIVE },
267- "exec-stale" : {LastHeartbeat : now .Add (- 2 * time .Second ).Unix ()},
268- }
269-
270- assignments := map [string ]store.AssignedState {
271- "exec-active" : {
272- AssignedShards : map [string ]* types.ShardAssignment {
273- "shard-1" : {Status : types .AssignmentStatusREADY },
274- "shard-2" : {Status : types .AssignmentStatusREADY },
275- },
276- },
277- "exec-stale" : {
278- AssignedShards : map [string ]* types.ShardAssignment {
279- "shard-3" : {Status : types .AssignmentStatusREADY },
280- },
281- },
282- }
283-
284- shardStats := map [string ]store.ShardStatistics {
285- "shard-1" : {SmoothedLoad : 1.0 , LastUpdateTime : now .Unix (), LastMoveTime : now .Unix ()},
286- "shard-2" : {SmoothedLoad : 2.0 , LastUpdateTime : now .Unix (), LastMoveTime : now .Unix ()},
287- "shard-3" : {SmoothedLoad : 3.0 , LastUpdateTime : now .Add (- 2 * time .Second ).Unix (), LastMoveTime : now .Add (- 2 * time .Second ).Unix ()},
288- }
289-
290- namespaceState := & store.NamespaceState {
291- Executors : heartbeats ,
292- ShardAssignments : assignments ,
293- ShardStats : shardStats ,
294- }
295-
296- gomock .InOrder (
297- mocks .store .EXPECT ().GetState (gomock .Any (), mocks .cfg .Name ).Return (namespaceState , nil ),
298- mocks .election .EXPECT ().Guard ().Return (store .NopGuard ()),
299- mocks .store .EXPECT ().DeleteShardStats (gomock .Any (), mocks .cfg .Name , []string {"shard-3" }, gomock .Any ()).Return (nil ),
300- )
301- processor .cleanupStaleShardStats (context .Background ())
302- })
303-
304- t .Run ("recent shard stats are preserved" , func (t * testing.T ) {
305- mocks := setupProcessorTest (t , config .NamespaceTypeFixed )
306- defer mocks .ctrl .Finish ()
307- processor := mocks .factory .CreateProcessor (mocks .cfg , mocks .store , mocks .election ).(* namespaceProcessor )
308-
309- now := mocks .timeSource .Now ()
310-
311- expiredExecutor := now .Add (- 2 * time .Second ).Unix ()
312- state := & store.NamespaceState {
313- Executors : map [string ]store.HeartbeatState {
314- "exec-stale" : {LastHeartbeat : expiredExecutor },
315- },
316- ShardAssignments : map [string ]store.AssignedState {},
317- ShardStats : map [string ]store.ShardStatistics {
318- "shard-1" : {SmoothedLoad : 5.0 , LastUpdateTime : now .Unix (), LastMoveTime : now .Unix ()},
319- },
320- }
321-
322- mocks .store .EXPECT ().GetState (gomock .Any (), mocks .cfg .Name ).Return (state , nil )
323- processor .cleanupStaleShardStats (context .Background ())
324-
325- // No delete expected since stats are recent.
326- })
327-
328- }
329-
330257func TestRebalance_StoreErrors (t * testing.T ) {
331258 mocks := setupProcessorTest (t , config .NamespaceTypeFixed )
332259 defer mocks .ctrl .Finish ()
0 commit comments