Skip to content

Commit b001e02

Browse files
authored
Add ISubchannelCallTracker (#1621)
1 parent 41b8e2e commit b001e02

File tree

9 files changed

+81
-52
lines changed

9 files changed

+81
-52
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
#if SUPPORT_LOAD_BALANCING
20+
21+
namespace Grpc.Net.Client.Balancer
22+
{
23+
/// <summary>
24+
/// An interface for tracking subchannel calls.
25+
/// </summary>
26+
public interface ISubchannelCallTracker
27+
{
28+
/// <summary>
29+
/// Called when a subchannel call is started after a load balance pick.
30+
/// </summary>
31+
void Start();
32+
33+
/// <summary>
34+
/// Called when a subchannel call is completed.
35+
/// </summary>
36+
/// <param name="context">The complete context.</param>
37+
void Complete(CompletionContext context);
38+
}
39+
}
40+
#endif

src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,17 @@ protected override async Task<HttpResponseMessage> SendAsync(
106106
request.SetOption(CurrentAddressKey, address);
107107
#endif
108108

109+
var responseMessageTask = base.SendAsync(request, cancellationToken);
110+
result.SubchannelCallTracker?.Start();
111+
109112
try
110113
{
111-
var responseMessage = await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
112-
114+
var responseMessage = await responseMessageTask.ConfigureAwait(false);
115+
113116
// TODO(JamesNK): This doesn't take into account long running streams.
114117
// If there is response content then we need to wait until it is read to the end
115118
// or the request is disposed.
116-
result.OnComplete(new CompletionContext
119+
result.SubchannelCallTracker?.Complete(new CompletionContext
117120
{
118121
Address = address
119122
});
@@ -122,7 +125,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
122125
}
123126
catch (Exception ex)
124127
{
125-
result.OnComplete(new CompletionContext
128+
result.SubchannelCallTracker?.Complete(new CompletionContext
126129
{
127130
Address = address,
128131
Error = ex

src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ public void UpdateState(BalancerState state)
302302
}
303303
}
304304

305-
public async ValueTask<(Subchannel Subchannel, BalancerAddress Address, Action<CompletionContext> OnComplete)> PickAsync(PickContext context, bool waitForReady, CancellationToken cancellationToken)
305+
public async ValueTask<(Subchannel Subchannel, BalancerAddress Address, ISubchannelCallTracker? SubchannelCallTracker)> PickAsync(PickContext context, bool waitForReady, CancellationToken cancellationToken)
306306
{
307307
SubchannelPicker? previousPicker = null;
308308

@@ -325,7 +325,7 @@ public void UpdateState(BalancerState state)
325325
if (address != null)
326326
{
327327
ConnectionManagerLog.PickResultSuccessful(Logger, subchannel.Id, address);
328-
return (subchannel, address, result.Complete);
328+
return (subchannel, address, result.SubchannelCallTracker);
329329
}
330330
else
331331
{

src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ namespace Grpc.Net.Client.Balancer.Internal
3232
/// </summary>
3333
internal interface ISubchannelTransport : IDisposable
3434
{
35-
void OnRequestComplete(CompletionContext context);
3635
BalancerAddress? CurrentAddress { get; }
3736

3837
#if NET5_0_OR_GREATER

src/Grpc.Net.Client/Balancer/Internal/PassiveSubchannelTransport.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ public PassiveSubchannelTransport(Subchannel subchannel)
4444

4545
public BalancerAddress? CurrentAddress => _currentAddress;
4646

47-
public void OnRequestComplete(CompletionContext context)
48-
{
49-
}
50-
5147
public void Disconnect()
5248
{
5349
_currentAddress = null;

src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,6 @@ public void Dispose()
343343
_disposed = true;
344344
}
345345
}
346-
347-
public void OnRequestComplete(CompletionContext context)
348-
{
349-
}
350346
}
351347

352348
internal static class SocketConnectivitySubchannelTransportLog

src/Grpc.Net.Client/Balancer/PickResult.cs

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,13 @@ namespace Grpc.Net.Client.Balancer
3232
/// </summary>
3333
public sealed class PickResult
3434
{
35-
private readonly Action<CompletionContext>? _onComplete;
36-
3735
[DebuggerStepThrough]
38-
private PickResult(PickResultType pickResultType, Subchannel? subchannel, Status status, Action<CompletionContext>? onComplete)
36+
private PickResult(PickResultType pickResultType, Subchannel? subchannel, Status status, ISubchannelCallTracker? subchannelCallTracker)
3937
{
4038
Type = pickResultType;
4139
Subchannel = subchannel;
4240
Status = status;
43-
_onComplete = onComplete;
41+
SubchannelCallTracker = subchannelCallTracker;
4442
}
4543

4644
/// <summary>
@@ -49,7 +47,7 @@ private PickResult(PickResultType pickResultType, Subchannel? subchannel, Status
4947
public PickResultType Type { get; }
5048

5149
/// <summary>
52-
/// The <see cref="Subchannel"/> provided by <see cref="ForSubchannel(Subchannel, Action{CompletionContext}?)"/>.
50+
/// The <see cref="Subchannel"/> provided by <see cref="ForSubchannel(Subchannel, ISubchannelCallTracker?)"/>.
5351
/// </summary>
5452
public Subchannel? Subchannel { get; }
5553

@@ -59,23 +57,9 @@ private PickResult(PickResultType pickResultType, Subchannel? subchannel, Status
5957
public Status Status { get; }
6058

6159
/// <summary>
62-
/// Called to notify the load balancer that a call is complete.
60+
/// The optional <see cref="SubchannelCallTracker"/> provided by <see cref="ForSubchannel(Subchannel, ISubchannelCallTracker?)"/>.
6361
/// </summary>
64-
/// <param name="context">The complete context.</param>
65-
public void Complete(CompletionContext context)
66-
{
67-
if (context == null)
68-
{
69-
throw new ArgumentNullException(nameof(context));
70-
}
71-
if (context.Address == null)
72-
{
73-
throw new ArgumentException($"Required {nameof(CompletionContext.Address)} value isn't set on the context.");
74-
}
75-
76-
_onComplete?.Invoke(context);
77-
Subchannel?.Transport.OnRequestComplete(context);
78-
}
62+
public ISubchannelCallTracker? SubchannelCallTracker { get; }
7963

8064
/// <summary>
8165
/// Create a <see cref="PickResult"/> that provides a <see cref="Balancer.Subchannel"/> to gRPC calls.
@@ -89,12 +73,12 @@ public void Complete(CompletionContext context)
8973
/// </para>
9074
/// </summary>
9175
/// <param name="subchannel">The picked subchannel.</param>
92-
/// <param name="onComplete">An optional callback to be notified of a call being completed.</param>
76+
/// <param name="subchannelCallTracker">An optional interface to track the subchannel call.</param>
9377
/// <returns>The pick result.</returns>
9478
[DebuggerStepThrough]
95-
public static PickResult ForSubchannel(Subchannel subchannel, Action<CompletionContext>? onComplete = null)
79+
public static PickResult ForSubchannel(Subchannel subchannel, ISubchannelCallTracker? subchannelCallTracker = null)
9680
{
97-
return new PickResult(PickResultType.Complete, subchannel, Status.DefaultSuccess, onComplete);
81+
return new PickResult(PickResultType.Complete, subchannel, Status.DefaultSuccess, subchannelCallTracker);
9882
}
9983

10084
/// <summary>
@@ -111,7 +95,7 @@ public static PickResult ForFailure(Status status)
11195
throw new ArgumentException("Error status code must not be OK.", nameof(status));
11296
}
11397

114-
return new PickResult(PickResultType.Fail, subchannel: null, status, onComplete: null);
98+
return new PickResult(PickResultType.Fail, subchannel: null, status, subchannelCallTracker: null);
11599
}
116100

117101
/// <summary>
@@ -128,7 +112,7 @@ public static PickResult ForDrop(Status status)
128112
throw new ArgumentException("Error status code must not be OK.", nameof(status));
129113
}
130114

131-
return new PickResult(PickResultType.Drop, subchannel: null, status, onComplete: null);
115+
return new PickResult(PickResultType.Drop, subchannel: null, status, subchannelCallTracker: null);
132116
}
133117

134118
/// <summary>
@@ -138,7 +122,7 @@ public static PickResult ForDrop(Status status)
138122
[DebuggerStepThrough]
139123
public static PickResult ForQueue()
140124
{
141-
return new PickResult(PickResultType.Queue, subchannel: null, Status.DefaultSuccess, onComplete: null);
125+
return new PickResult(PickResultType.Queue, subchannel: null, Status.DefaultSuccess, subchannelCallTracker: null);
142126
}
143127
}
144128

test/FunctionalTests/Balancer/LeastUsedBalancer.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,34 @@ public override PickResult Pick(PickContext context)
8888
Debug.Assert(leastUsedSubchannel != null);
8989
Debug.Assert(leastUsedCounter != null);
9090

91-
leastUsedCounter.Increment();
92-
93-
return PickResult.ForSubchannel(leastUsedSubchannel, c =>
94-
{
95-
leastUsedCounter.Decrement();
96-
});
91+
return PickResult.ForSubchannel(leastUsedSubchannel, new LeastUsedSubchannelTracker(leastUsedCounter));
9792
}
9893

9994
public override string ToString()
10095
{
10196
return string.Join(", ", _subchannels.Select(s => s.ToString()));
10297
}
10398

99+
private sealed class LeastUsedSubchannelTracker : ISubchannelCallTracker
100+
{
101+
private readonly AtomicCounter _counter;
102+
103+
public LeastUsedSubchannelTracker(AtomicCounter counter)
104+
{
105+
_counter = counter;
106+
}
107+
108+
public void Complete(CompletionContext context)
109+
{
110+
_counter.Decrement();
111+
}
112+
113+
public void Start()
114+
{
115+
_counter.Increment();
116+
}
117+
}
118+
104119
private sealed class AtomicCounter
105120
{
106121
private int _value;

test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransport.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,6 @@ public ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationTok
7070
return new ValueTask<Stream>(new MemoryStream());
7171
}
7272

73-
public void OnRequestComplete(CompletionContext context)
74-
{
75-
}
76-
7773
public void Disconnect()
7874
{
7975
CurrentAddress = null;

0 commit comments

Comments
 (0)