Skip to content

Commit 71d4ee0

Browse files
committed
Merged PR 712962: Add a gRPC ClusterState implementation
This PR adds a ClusterState storage implementation that is updated via gRPC using protobuf-net
1 parent efa369e commit 71d4ee0

File tree

18 files changed

+538
-211
lines changed

18 files changed

+538
-211
lines changed

Public/Src/Cache/ContentStore/Distributed/ContentLocations/MachineLocation.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ public readonly record struct MachineLocation
1818
{
1919
public const string GrpcUriSchemePrefix = "grpc://";
2020

21+
public static MachineLocation Invalid { get; } = new(string.Empty);
22+
2123
/// <summary>
2224
/// Gets whether the current machine location represents valid data
2325
/// </summary>
2426
[JsonIgnore]
25-
public bool IsValid => Path != null;
27+
public bool IsValid => !string.IsNullOrEmpty(Path);
2628

2729
/// <summary>
2830
/// Gets the path representation of the machine location
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
7+
using BuildXL.Cache.ContentStore.Distributed.NuCache;
8+
using BuildXL.Cache.ContentStore.Distributed.NuCache.ClusterStateManagement;
9+
using BuildXL.Cache.ContentStore.Interfaces.Results;
10+
using BuildXL.Cache.ContentStore.Interfaces.Time;
11+
using BuildXL.Cache.ContentStore.Tracing;
12+
using BuildXL.Cache.ContentStore.Tracing.Internal;
13+
using BuildXL.Cache.ContentStore.Utils;
14+
using BuildXL.Cache.Host.Configuration;
15+
16+
#nullable enable
17+
18+
namespace BuildXL.Cache.ContentStore.Distributed.Ephemeral;
19+
20+
public class GrpcClusterStateStorageClient : GrpcCodeFirstClient<IGrpcClusterStateStorage>, IClusterStateStorage
21+
{
22+
public record Configuration(TimeSpan OperationTimeout, RetryPolicyConfiguration RetryPolicy);
23+
24+
protected override Tracer Tracer { get; } = new(nameof(GrpcClusterStateStorageClient));
25+
26+
public GrpcClusterStateStorageClient(Configuration configuration, IClientAccessor<IGrpcClusterStateStorage> accessor)
27+
: base(accessor, CreateRetryPolicy(configuration.RetryPolicy), SystemClock.Instance, configuration.OperationTimeout)
28+
{
29+
}
30+
31+
private static IRetryPolicy CreateRetryPolicy(RetryPolicyConfiguration configurationRetryPolicy)
32+
{
33+
return configurationRetryPolicy.AsRetryPolicy(_ => true,
34+
// We use an absurdly high retry count because the actual operation timeout is controlled through
35+
// PerformOperationAsync in ExecuteAsync.
36+
1_000_000);
37+
}
38+
39+
public Task<Result<IClusterStateStorage.HeartbeatOutput>> HeartbeatAsync(OperationContext context, IClusterStateStorage.HeartbeatInput request)
40+
{
41+
return ExecuteAsync(
42+
context,
43+
async (context, options, service) => Result.Success(await service.HeartbeatAsync(request, options)),
44+
extraEndMessage: _ => string.Empty);
45+
}
46+
47+
public Task<Result<ClusterStateMachine>> ReadStateAsync(OperationContext context)
48+
{
49+
return ExecuteAsync(
50+
context,
51+
async (context, options, service) => Result.Success(await service.ReadStateAsync(options)),
52+
extraEndMessage: _ => string.Empty);
53+
}
54+
55+
public Task<Result<IClusterStateStorage.RegisterMachineOutput>> RegisterMachinesAsync(OperationContext context, IClusterStateStorage.RegisterMachineInput request)
56+
{
57+
return ExecuteAsync(
58+
context,
59+
async (context, options, service) => Result.Success(await service.RegisterMachinesAsync(request, options)),
60+
extraEndMessage: _ => string.Empty);
61+
}
62+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Threading.Tasks;
5+
using BuildXL.Cache.ContentStore.Distributed.MetadataService;
6+
using BuildXL.Cache.ContentStore.Distributed.NuCache;
7+
using BuildXL.Cache.ContentStore.Distributed.NuCache.ClusterStateManagement;
8+
using BuildXL.Cache.ContentStore.Interfaces.Logging;
9+
using BuildXL.Cache.ContentStore.Interfaces.Results;
10+
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
11+
using BuildXL.Cache.ContentStore.Logging;
12+
using BuildXL.Cache.ContentStore.Tracing;
13+
using BuildXL.Cache.ContentStore.Tracing.Internal;
14+
using BuildXL.Cache.ContentStore.Utils;
15+
using ProtoBuf.Grpc;
16+
17+
#nullable enable
18+
19+
namespace BuildXL.Cache.ContentStore.Distributed.Ephemeral;
20+
21+
public class GrpcClusterStateStorageService : StartupShutdownComponentBase, IGrpcClusterStateStorage
22+
{
23+
protected override Tracer Tracer { get; } = new(nameof(GrpcClusterStateStorageService));
24+
25+
private readonly IClusterStateStorage _clusterStateStorage;
26+
27+
private ILogger _logger = NullLogger.Instance;
28+
29+
public GrpcClusterStateStorageService(IClusterStateStorage clusterStateStorage)
30+
{
31+
_clusterStateStorage = clusterStateStorage;
32+
LinkLifetime(_clusterStateStorage);
33+
}
34+
35+
protected override Task<BoolResult> StartupComponentAsync(OperationContext context)
36+
{
37+
_logger = context.TracingContext.Logger;
38+
return base.StartupComponentAsync(context);
39+
}
40+
41+
public Task<IClusterStateStorage.RegisterMachineOutput> RegisterMachinesAsync(IClusterStateStorage.RegisterMachineInput request, CallContext callContext = default)
42+
{
43+
var operationContext = CreateOperationContext(callContext);
44+
return _clusterStateStorage.RegisterMachinesAsync(operationContext, request).ThrowIfFailureAsync();
45+
}
46+
47+
public Task<IClusterStateStorage.HeartbeatOutput> HeartbeatAsync(IClusterStateStorage.HeartbeatInput request, CallContext callContext = default)
48+
{
49+
var operationContext = CreateOperationContext(callContext);
50+
return _clusterStateStorage.HeartbeatAsync(operationContext, request).ThrowIfFailureAsync();
51+
}
52+
53+
public Task<ClusterStateMachine> ReadStateAsync(CallContext callContext = default)
54+
{
55+
var operationContext = CreateOperationContext(callContext);
56+
return _clusterStateStorage.ReadStateAsync(operationContext).ThrowIfFailureAsync();
57+
}
58+
59+
private OperationContext CreateOperationContext(CallContext callContext)
60+
{
61+
var contextId = MetadataServiceSerializer.TryGetContextId(callContext.RequestHeaders);
62+
var tracingContext = contextId != null
63+
? new Context(contextId, _logger)
64+
: new Context(_logger);
65+
66+
var operationContext = new OperationContext(tracingContext, callContext.CancellationToken);
67+
return operationContext;
68+
}
69+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Threading.Tasks;
5+
using BuildXL.Cache.ContentStore.Distributed.NuCache;
6+
using BuildXL.Cache.ContentStore.Distributed.NuCache.ClusterStateManagement;
7+
using ProtoBuf.Grpc;
8+
using ProtoBuf.Grpc.Configuration;
9+
10+
#nullable enable
11+
12+
namespace BuildXL.Cache.ContentStore.Distributed.Ephemeral;
13+
14+
/// <summary>
15+
/// This interface serves as a code-first service declaration for protobuf-net gRPC. Clients may talk to a service
16+
/// implementing this interface, which behind it servers a <see cref="IClusterStateStorage"/>.
17+
/// </summary>
18+
[Service("Cache.ClusterState")]
19+
public interface IGrpcClusterStateStorage
20+
{
21+
Task<IClusterStateStorage.RegisterMachineOutput> RegisterMachinesAsync(IClusterStateStorage.RegisterMachineInput request, CallContext callContext = default);
22+
23+
Task<IClusterStateStorage.HeartbeatOutput> HeartbeatAsync(IClusterStateStorage.HeartbeatInput request, CallContext callContext = default);
24+
25+
Task<ClusterStateMachine> ReadStateAsync(CallContext callContext = default);
26+
}

Public/Src/Cache/ContentStore/Distributed/MetadataService/ClientGlobalCacheStore.cs

Lines changed: 10 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
using System;
54
using System.Collections.Generic;
65
using System.Linq;
7-
using System.Runtime.CompilerServices;
86
using System.Threading.Tasks;
97
using BuildXL.Cache.ContentStore.Distributed.NuCache;
108
using BuildXL.Cache.ContentStore.Hashing;
@@ -15,8 +13,6 @@
1513
using BuildXL.Cache.ContentStore.Utils;
1614
using BuildXL.Cache.MemoizationStore.Interfaces.Results;
1715
using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
18-
using BuildXL.Utilities.Core.Tracing;
19-
using Grpc.Core;
2016

2117
#nullable enable
2218

@@ -25,104 +21,31 @@ namespace BuildXL.Cache.ContentStore.Distributed.MetadataService
2521
/// <summary>
2622
/// A global content metadata store client which routes requests to a remote machine.
2723
/// </summary>
28-
public class ClientGlobalCacheStore : StartupShutdownComponentBase, IGlobalCacheStore
24+
public class ClientGlobalCacheStore : GrpcCodeFirstClient<IGlobalCacheService>, IGlobalCacheStore
2925
{
3026
/// <inheritdoc />
3127
public override bool AllowMultipleStartupAndShutdowns => true;
3228

3329
/// <inheritdoc />
34-
protected override Tracer Tracer { get; } = new Tracer(nameof(ClientGlobalCacheStore));
35-
36-
private readonly IClientAccessor<IGlobalCacheService> _serviceClientFactory;
37-
38-
private readonly ClientContentMetadataStoreConfiguration _configuration;
39-
40-
private readonly IClock _clock;
41-
42-
private readonly IRetryPolicy _retryPolicy;
30+
protected override Tracer Tracer { get; } = new(nameof(ClientGlobalCacheStore));
4331

4432
public ClientGlobalCacheStore(
4533
IClientAccessor<IGlobalCacheService> metadataServiceClientFactory,
4634
ClientContentMetadataStoreConfiguration configuration)
35+
: base(metadataServiceClientFactory, CreateRetryPolicy(configuration), SystemClock.Instance, configuration.OperationTimeout)
4736
{
48-
_serviceClientFactory = metadataServiceClientFactory;
49-
_configuration = configuration;
50-
_clock = SystemClock.Instance;
37+
}
5138

52-
_retryPolicy = RetryPolicyFactory.GetExponentialPolicy(
39+
private static IRetryPolicy CreateRetryPolicy(ClientContentMetadataStoreConfiguration configuration)
40+
{
41+
return RetryPolicyFactory.GetExponentialPolicy(
5342
_ => true,
5443
// We use an absurdly high retry count because the actual operation timeout is controlled through
5544
// PerformOperationAsync in ExecuteAsync.
5645
1_000_000,
57-
_configuration.RetryMinimumWaitTime,
58-
_configuration.RetryMaximumWaitTime,
59-
_configuration.RetryDelta);
60-
61-
LinkLifetime(_serviceClientFactory);
62-
}
63-
64-
private async Task<TResult> ExecuteAsync<TResult>(
65-
OperationContext originalContext,
66-
Func<OperationContext, CallOptions, IGlobalCacheService, Task<TResult>> executeAsync,
67-
Func<TResult, string?> extraEndMessage,
68-
string? extraStartMessage = null,
69-
[CallerMemberName] string caller = null!)
70-
where TResult : ResultBase
71-
{
72-
var attempt = -1;
73-
using var contextWithShutdown = TrackShutdown(originalContext);
74-
var context = contextWithShutdown.Context;
75-
var callerAttempt = $"{caller}_Attempt";
76-
77-
return await context.PerformOperationWithTimeoutAsync(
78-
Tracer,
79-
context =>
80-
{
81-
var callOptions = new CallOptions(
82-
headers: new Metadata()
83-
{
84-
MetadataServiceSerializer.CreateContextIdHeaderEntry(context.TracingContext.TraceId)
85-
},
86-
deadline: _clock.UtcNow + _configuration.OperationTimeout,
87-
cancellationToken: context.Token);
88-
89-
return _retryPolicy.ExecuteAsync(async () =>
90-
{
91-
await Task.Yield();
92-
93-
attempt++;
94-
95-
var stopwatch = StopwatchSlim.Start();
96-
var clientCreationTime = TimeSpan.Zero;
97-
98-
var result = await context.PerformOperationAsync(Tracer, () =>
99-
{
100-
return _serviceClientFactory.UseAsync(context, service =>
101-
{
102-
clientCreationTime = stopwatch.Elapsed;
103-
104-
return executeAsync(context, callOptions, service);
105-
});
106-
},
107-
extraStartMessage: extraStartMessage,
108-
extraEndMessage: r => $"Attempt=[{attempt}] ClientCreationTimeMs=[{clientCreationTime.TotalMilliseconds}] {extraEndMessage(r)}",
109-
caller: callerAttempt,
110-
traceErrorsOnly: true);
111-
112-
await Task.Yield();
113-
114-
// Because we capture exceptions inside the PerformOperation, we need to make sure that they
115-
// get propagated for the retry policy to kick in.
116-
result.RethrowIfFailure();
117-
118-
return result;
119-
}, context.Token);
120-
},
121-
caller: caller,
122-
traceErrorsOnly: true,
123-
extraStartMessage: extraStartMessage,
124-
extraEndMessage: r => $"Attempts=[{attempt + 1}] {extraEndMessage(r)}",
125-
timeout: _configuration.OperationTimeout);
46+
configuration.RetryMinimumWaitTime,
47+
configuration.RetryMaximumWaitTime,
48+
configuration.RetryDelta);
12649
}
12750

12851
public Task<Result<IReadOnlyList<ContentLocationEntry>>> GetBulkAsync(OperationContext context, IReadOnlyList<ShortHash> contentHashes)
@@ -243,15 +166,5 @@ public Task<Result<SerializedMetadataEntry>> GetContentHashListAsync(OperationCo
243166
// TODO: What to log here?
244167
extraEndMessage: r => r.GetValueOrDefault()?.ToString());
245168
}
246-
247-
public Task<Result<MachineMapping>> RegisterMachineAsync(OperationContext context, MachineLocation machineLocation)
248-
{
249-
throw new NotImplementedException($"Attempt to use {nameof(ClientGlobalCacheStore)} for machine registration is unsupported");
250-
}
251-
252-
public Task<BoolResult> ForceRegisterMachineAsync(OperationContext context, MachineMapping mapping)
253-
{
254-
throw new NotImplementedException($"Attempt to use {nameof(ClientGlobalCacheStore)} for machine registration is unsupported");
255-
}
256169
}
257170
}

0 commit comments

Comments
 (0)