Skip to content

Commit e0bf18d

Browse files
authored
Implement TaskManager (#1498)
* add task manager implementation * Add test for replica to stop background collect task * add calls to resume suspend maintenance tasks * separate primary only from generic background tasks * misc fixes * fix formatting * loggerFactory null check * simplify task manager implementation * addressing comments * no allocation lookup placement category * add verbose log message * update RegisterAndRun * pass global cts in when waiting for task manager tasks to complete * check if TaskManager disposed before using global cts * add TaskManager tests * fix test * addressing comments and adding more tests * add test cleanup with exception * bump version * add Allure tests for task manager
1 parent 6c8a81b commit e0bf18d

File tree

16 files changed

+720
-33
lines changed

16 files changed

+720
-33
lines changed

Version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- VersionPrefix property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.92</VersionPrefix>
4+
<VersionPrefix>1.0.93</VersionPrefix>
55
</PropertyGroup>
66
</Project>

libs/cluster/Server/Failover/ReplicaFailoverSession.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ private bool TakeOverAsPrimary()
145145
if (!clusterProvider.replicationManager.InitializeCheckpointStore())
146146
logger?.LogWarning("Failed acquiring latest memory checkpoint metadata at {method}", nameof(TakeOverAsPrimary));
147147
_ = clusterProvider.BumpAndWaitForEpochTransition();
148+
149+
// Resume all background maintenance that were possibly shutdown when this node became a replica
150+
clusterProvider.storeWrapper.StartPrimaryTasks();
148151
}
149152
finally
150153
{

libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ async Task<string> TryBeginReplicaSync(bool downgradeLock)
8080
if (!disklessSync)
8181
storeWrapper.Reset();
8282

83+
// Suspend background tasks that may interfere with AOF
84+
await storeWrapper.SuspendPrimaryOnlyTasks();
85+
8386
// Send request to primary
8487
// Primary will initiate background task and start sending checkpoint data
8588
//

libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ async Task<string> ReplicaSyncAttachTask(bool downgradeLock)
122122
// Reset the database in preparation for connecting to primary
123123
storeWrapper.Reset();
124124

125+
// Suspend background tasks that may interfere with AOF
126+
await storeWrapper.SuspendPrimaryOnlyTasks();
127+
125128
// Send request to primary
126129
// Primary will initiate background task and start sending checkpoint data
127130
//

libs/cluster/Session/ReplicaOfCommand.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ private bool TryREPLICAOF(out bool invalidParameters)
4545
clusterProvider.replicationManager.TryUpdateForFailover();
4646
clusterProvider.replicationManager.ResetReplayIterator();
4747
UnsafeBumpAndWaitForEpochTransition();
48+
clusterProvider.storeWrapper.StartPrimaryTasks();
4849
}
4950
finally
5051
{

libs/server/Databases/MultiDatabaseManager.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,6 @@ public override bool GrowIndexesIfNeeded(CancellationToken token = default)
524524
{
525525
var activeDbIdsMapSize = activeDbIds.ActualSize;
526526
var activeDbIdsMapSnapshot = activeDbIds.Map;
527-
528527
var databasesMapSnapshot = databases.Map;
529528

530529
for (var i = 0; i < activeDbIdsMapSize; i++)

libs/server/Resp/RespServerSessionSlotVerify.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
2121
bool NetworkKeyArraySlotVerify(Span<ArgSlice> keys, bool readOnly, int count = -1)
2222
=> clusterSession != null && clusterSession.NetworkKeyArraySlotVerify(keys, readOnly, SessionAsking, ref dcurr, ref dend, count);
2323

24+
/// <summary>
25+
/// Validate if this command can be served based on the current slot assignment
26+
/// </summary>
27+
/// <param name="cmd"></param>
28+
/// <returns></returns>
2429
bool CanServeSlot(RespCommand cmd)
2530
{
2631
Debug.Assert(clusterSession != null);

libs/server/StoreWrapper.cs

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ public sealed class StoreWrapper
130130
/// </summary>
131131
internal readonly LuaTimeoutManager luaTimeoutManager;
132132

133+
/// <summary>
134+
/// Background task manager instance
135+
/// </summary>
136+
internal readonly TaskManager taskManager;
137+
133138
private IDatabaseManager databaseManager;
134139
SingleWriterMultiReaderLock databaseManagerLock;
135140

@@ -151,7 +156,9 @@ public sealed class StoreWrapper
151156

152157
internal readonly CancellationTokenSource ctsCommit;
153158

154-
// True if StoreWrapper instance is disposed
159+
/// <summary>
160+
/// True if StoreWrapper instance is disposed
161+
/// </summary>
155162
bool disposed;
156163

157164
/// <summary>
@@ -197,6 +204,7 @@ public StoreWrapper(
197204
this.sessionLogger = loggerFactory?.CreateLogger("Session");
198205
this.accessControlList = accessControlList;
199206
this.GarnetObjectSerializer = new GarnetObjectSerializer(this.customCommandManager);
207+
this.taskManager = new TaskManager(loggerFactory?.CreateLogger("TaskManager"));
200208
this.loggingFrequency = TimeSpan.FromSeconds(serverOptions.LoggingFrequency);
201209

202210
logger?.LogTrace("StoreWrapper logging frequency: {loggingFrequency} seconds.", this.loggingFrequency);
@@ -654,7 +662,7 @@ async Task AutoCheckpointBasedOnAofSizeLimit(long aofSizeLimit, CancellationToke
654662
}
655663
}
656664

657-
async Task CommitTask(int commitFrequencyMs, ILogger logger = null, CancellationToken token = default)
665+
async Task CommitTask(int commitFrequencyMs, CancellationToken token = default, ILogger logger = null)
658666
{
659667
try
660668
{
@@ -768,9 +776,12 @@ async Task ExpiredKeyDeletionScanTask(int expiredKeyDeletionScanFrequencySecs, C
768776
public (long numExpiredKeysFound, long totalRecordsScanned) ExpiredKeyDeletionScan(int dbId)
769777
=> databaseManager.ExpiredKeyDeletionScan(dbId);
770778

771-
/// <summary>Grows indexes of both main store and object store if current size is too small.</summary>
779+
/// <summary>
780+
/// Grows indexes of both main store and object store if current size is too small.
781+
/// </summary>
772782
/// <param name="token"></param>
773-
private async void IndexAutoGrowTask(CancellationToken token)
783+
/// <returns></returns>
784+
private async Task IndexAutoGrowTask(CancellationToken token)
774785
{
775786
try
776787
{
@@ -799,37 +810,16 @@ internal void Start()
799810
clusterProvider?.Start();
800811
luaTimeoutManager?.Start();
801812

802-
if (serverOptions.AofSizeLimit.Length > 0)
803-
{
804-
var aofSizeLimitBytes = 1L << serverOptions.AofSizeLimitSizeBits();
805-
Task.Run(async () => await AutoCheckpointBasedOnAofSizeLimit(aofSizeLimitBytes, ctsCommit.Token, logger));
806-
}
807-
808-
if (serverOptions.CommitFrequencyMs > 0 && serverOptions.EnableAOF)
813+
// Start background maintenance tasks that should run only on the primary
814+
if (clusterProvider == null || clusterProvider.IsPrimary())
809815
{
810-
Task.Run(async () => await CommitTask(serverOptions.CommitFrequencyMs, logger, ctsCommit.Token));
816+
StartPrimaryTasks();
811817
}
812818

813-
if (serverOptions.CompactionFrequencySecs > 0 && serverOptions.CompactionType != LogCompactionType.None)
814-
{
815-
Task.Run(async () => await CompactionTask(serverOptions.CompactionFrequencySecs, ctsCommit.Token));
816-
}
817-
818-
if (serverOptions.ExpiredObjectCollectionFrequencySecs > 0)
819-
{
820-
Task.Run(async () => await ObjectCollectTask(serverOptions.ExpiredObjectCollectionFrequencySecs, ctsCommit.Token));
821-
}
822-
823-
if (serverOptions.ExpiredKeyDeletionScanFrequencySecs > 0)
824-
{
825-
Task.Run(async () => await ExpiredKeyDeletionScanTask(serverOptions.ExpiredKeyDeletionScanFrequencySecs, ctsCommit.Token));
826-
}
827-
828-
if (serverOptions.AdjustedIndexMaxCacheLines > 0 || serverOptions.AdjustedObjectStoreIndexMaxCacheLines > 0)
829-
{
830-
Task.Run(() => IndexAutoGrowTask(ctsCommit.Token));
831-
}
819+
// Start generic node tasks
820+
StartGenericNodeTasks();
832821

822+
// Start object size trackers
833823
databaseManager.StartObjectSizeTrackers(ctsCommit.Token);
834824
}
835825

@@ -914,9 +904,63 @@ public void Dispose()
914904
monitor?.Dispose();
915905
luaTimeoutManager?.Dispose();
916906
ctsCommit?.Cancel();
907+
taskManager.Dispose();
917908
databaseManager.Dispose();
918909

919910
ctsCommit?.Dispose();
920911
}
912+
913+
/// <summary>
914+
/// Suspend background task that may interfere with the replicas AOF
915+
/// </summary>
916+
/// <returns></returns>
917+
public async Task SuspendPrimaryOnlyTasks()
918+
{
919+
await taskManager.Cancel(TaskPlacementCategory.Primary);
920+
}
921+
922+
/// <summary>
923+
/// Start background maintenance tasks that should only run when this node is a primary
924+
/// </summary>
925+
/// <returns></returns>
926+
public void StartPrimaryTasks()
927+
{
928+
if (serverOptions.AofSizeLimit.Length > 0)
929+
{
930+
var aofSizeLimitBytes = 1L << serverOptions.AofSizeLimitSizeBits();
931+
taskManager.RegisterAndRun(TaskType.AofSizeLimitTask, (token) => AutoCheckpointBasedOnAofSizeLimit(aofSizeLimitBytes, token, logger));
932+
}
933+
934+
if (serverOptions.CommitFrequencyMs > 0 && serverOptions.EnableAOF)
935+
{
936+
taskManager.RegisterAndRun(TaskType.CommitTask, (token) => CommitTask(serverOptions.CommitFrequencyMs, token, logger));
937+
}
938+
939+
if (serverOptions.CompactionFrequencySecs > 0 && serverOptions.CompactionType != LogCompactionType.None)
940+
{
941+
taskManager.RegisterAndRun(TaskType.CompactionTask, (token) => CompactionTask(serverOptions.CompactionFrequencySecs, token));
942+
}
943+
944+
if (serverOptions.ExpiredObjectCollectionFrequencySecs > 0)
945+
{
946+
taskManager.RegisterAndRun(TaskType.ObjectCollectTask, (token) => ObjectCollectTask(serverOptions.ExpiredObjectCollectionFrequencySecs, token));
947+
}
948+
949+
if (serverOptions.ExpiredKeyDeletionScanFrequencySecs > 0)
950+
{
951+
taskManager.RegisterAndRun(TaskType.ExpiredKeyDeletionTask, (token) => ExpiredKeyDeletionScanTask(serverOptions.ExpiredKeyDeletionScanFrequencySecs, token));
952+
}
953+
}
954+
955+
/// <summary>
956+
/// Start background maintenance generic tasks
957+
/// </summary>
958+
public void StartGenericNodeTasks()
959+
{
960+
if (serverOptions.AdjustedIndexMaxCacheLines > 0 || serverOptions.AdjustedObjectStoreIndexMaxCacheLines > 0)
961+
{
962+
taskManager.RegisterAndRun(TaskType.IndexAutoGrowTask, (token) => IndexAutoGrowTask(token));
963+
}
964+
}
921965
}
922966
}

0 commit comments

Comments
 (0)