Skip to content

Commit 851044d

Browse files
authored
Replace blocking Result with awaits (#2261)
1 parent 339abeb commit 851044d

File tree

8 files changed

+78
-68
lines changed

8 files changed

+78
-68
lines changed

src/Proto.Cluster/DefaultClusterContext.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ public DefaultClusterContext(Cluster cluster)
131131

132132
if (task.IsCompleted)
133133
{
134-
var untypedResult = MessageEnvelope.UnwrapMessage(task.Result);
134+
var result = await task.ConfigureAwait(false);
135+
var untypedResult = MessageEnvelope.UnwrapMessage(result);
135136

136137
if (untypedResult is DeadLetterResponse)
137138
{
@@ -158,7 +159,7 @@ public DefaultClusterContext(Cluster cluster)
158159

159160
if (typeof(T) == typeof(MessageEnvelope))
160161
{
161-
return (T)(object)MessageEnvelope.Wrap(task.Result);
162+
return (T)(object)MessageEnvelope.Wrap(result);
162163
}
163164

164165
Logger.LogError("Unexpected message. Was type {Type} but expected {ExpectedType}",

src/Proto.Cluster/Identity/IdentityActivatorProxy.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,15 @@ private Task Activate(ClusterIdentity identity, IContext context)
5252
if (context.Sender is not null)
5353
{
5454
context.ReenterAfter(target,
55-
task =>
55+
async task =>
5656
{
57-
var pid = task.IsCompletedSuccessfully ? task.Result : null;
58-
Respond(context, pid);
57+
PID? pid = null;
58+
if (task.IsCompletedSuccessfully)
59+
{
60+
pid = await task.ConfigureAwait(false);
61+
}
5962

60-
return Task.CompletedTask;
63+
Respond(context, pid);
6164
}
6265
);
6366
}
@@ -98,9 +101,13 @@ private Task ReplaceActivation(ClusterIdentity identity, PID replacedPid, IConte
98101
}
99102

100103
context.ReenterAfter(GetPid(identity, context.CancellationToken),
101-
task =>
104+
async task =>
102105
{
103-
var activation = task.IsCompletedSuccessfully ? task.Result : null;
106+
PID? activation = null;
107+
if (task.IsCompletedSuccessfully)
108+
{
109+
activation = await task.ConfigureAwait(false);
110+
}
104111

105112
// Check if retrieved PID is stale. Replace should be called after the original activation has been stopped,
106113
// but the identity might not have been purged from IdentityLookup yet.
@@ -118,7 +125,7 @@ private Task ReplaceActivation(ClusterIdentity identity, PID replacedPid, IConte
118125
context.ReenterAfter(Task.Delay(50 * attempt),
119126
() => ReplaceActivation(identity, replacedPid, context, attempt + 1));
120127

121-
return Task.CompletedTask;
128+
return;
122129
}
123130

124131
Logger.LogWarning(
@@ -127,8 +134,6 @@ private Task ReplaceActivation(ClusterIdentity identity, PID replacedPid, IConte
127134
}
128135

129136
Respond(context, activation);
130-
131-
return Task.CompletedTask;
132137
}
133138
);
134139

src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,14 @@ private async Task OnActivationTerminating(IContext context, ActivationTerminati
104104
}
105105
}
106106

107-
private Task OnActivationRequest(IContext context, ActivationRequest msg)
107+
private async Task OnActivationRequest(IContext context, ActivationRequest msg)
108108
{
109109
if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
110110
{
111111
//this identity already exists
112112
context.Respond(new ActivationResponse { Pid = existing });
113113

114-
return Task.CompletedTask;
114+
return;
115115
}
116116

117117
var clusterKind = _cluster.TryGetClusterKind(msg.Kind);
@@ -121,23 +121,21 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg)
121121
Logger.LogError("Failed to spawn {Kind}/{Identity}, kind not found for member", msg.Kind, msg.Identity);
122122
context.Respond(new ActivationResponse { Failed = true });
123123

124-
return Task.CompletedTask;
124+
return;
125125
}
126126

127127
if (clusterKind.CanSpawnIdentity is not null)
128128
{
129129
// Needs to check if the identity is allowed to spawn
130-
VerifyAndSpawn(msg, context, clusterKind);
130+
await VerifyAndSpawn(msg, context, clusterKind).ConfigureAwait(false);
131131
}
132132
else
133133
{
134134
Spawn(msg, context, clusterKind);
135135
}
136-
137-
return Task.CompletedTask;
138136
}
139137

140-
private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
138+
private async Task VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
141139
{
142140
var clusterIdentity = msg.ClusterIdentity;
143141

@@ -159,20 +157,22 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
159157

160158
if (canSpawn.IsCompleted)
161159
{
162-
OnSpawnDecided(msg, context, clusterKind, canSpawn.Result);
160+
var canSpawnIdentity = await canSpawn.AsTask().ConfigureAwait(false);
161+
OnSpawnDecided(msg, context, clusterKind, canSpawnIdentity);
163162

164163
return;
165164
}
166165

167166
_inFlightIdentityChecks.Add(clusterIdentity);
168167

169-
context.ReenterAfter(canSpawn.AsTask(), task =>
168+
context.ReenterAfter(canSpawn.AsTask(), async task =>
170169
{
171170
_inFlightIdentityChecks.Remove(clusterIdentity);
172171

173172
if (task.IsCompletedSuccessfully)
174173
{
175-
OnSpawnDecided(msg, context, clusterKind, task.Result);
174+
var canSpawnIdentity = await task.ConfigureAwait(false);
175+
OnSpawnDecided(msg, context, clusterKind, canSpawnIdentity);
176176
}
177177
else
178178
{
@@ -184,8 +184,6 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
184184
}
185185
);
186186
}
187-
188-
return Task.CompletedTask;
189187
}
190188
);
191189
}

src/Proto.Cluster/Identity/IdentityStorageWorker.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,13 @@ public Task ReceiveAsync(IContext context)
7171
{
7272
_inProgress.Add(clusterIdentity);
7373

74-
context.ReenterAfter(GetWithGlobalLock(context.Sender!, clusterIdentity), task =>
74+
context.ReenterAfter(GetWithGlobalLock(context.Sender!, clusterIdentity), async task =>
7575
{
7676
try
7777
{
78-
var response = task.Result;
78+
var response = await task.ConfigureAwait(false);
7979
context.Respond(response);
8080
RespondToWaitingRequests(context, clusterIdentity, response);
81-
82-
return Task.CompletedTask;
8381
}
8482
finally
8583
{

src/Proto.Cluster/Partition/PartitionIdentityActor.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -602,11 +602,9 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context)
602602
res, msg.ClusterIdentity);
603603
}
604604
// Just waits for the already in-progress activation to complete (or fail)
605-
context.ReenterAfter(res.Response.Task, task =>
605+
context.ReenterAfter(res.Response.Task, async task =>
606606
{
607-
context.Respond(task.Result);
608-
609-
return Task.CompletedTask;
607+
context.Respond(await task.ConfigureAwait(false));
610608
}
611609
);
612610

src/Proto.Cluster/Partition/PartitionPlacementActor.cs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,26 @@ await Task
131131
}
132132

133133
// Ensure that we only update last rebalanced topology when all members have received the current activations
134-
if (waitingRequests.All(task
135-
=> task.IsCompletedSuccessfully &&
136-
task.Result?.ProcessingState == IdentityHandoverAck.Types.State.Processed
137-
))
134+
var allProcessed = true;
135+
136+
foreach (var task in waitingRequests)
137+
{
138+
if (!task.IsCompletedSuccessfully)
139+
{
140+
allProcessed = false;
141+
break;
142+
}
143+
144+
var ack = await task.ConfigureAwait(false);
145+
146+
if (ack?.ProcessingState != IdentityHandoverAck.Types.State.Processed)
147+
{
148+
allProcessed = false;
149+
break;
150+
}
151+
}
152+
153+
if (allProcessed)
138154
{
139155
Logger.LogInformation("[PartitionPlacementActor] Completed rebalance publish for topology {TopologyHash}",
140156
msg.TopologyHash);
@@ -318,7 +334,7 @@ private Props AbortOnDeadLetter(CancellationTokenSource cts) =>
318334
}
319335
);
320336

321-
private Task OnActivationRequest(IContext context, ActivationRequest msg)
337+
private async Task OnActivationRequest(IContext context, ActivationRequest msg)
322338
{
323339
if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
324340
{
@@ -336,7 +352,7 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg)
336352

337353
context.Respond(response);
338354

339-
return Task.CompletedTask;
355+
return;
340356
}
341357

342358
var clusterKind = _cluster.TryGetClusterKind(msg.Kind);
@@ -350,23 +366,21 @@ private Task OnActivationRequest(IContext context, ActivationRequest msg)
350366
TopologyHash = msg.TopologyHash
351367
});
352368

353-
return Task.CompletedTask;
369+
return;
354370
}
355371

356372
if (clusterKind.CanSpawnIdentity is not null)
357373
{
358374
// Needs to check if the identity is allowed to spawn
359-
VerifyAndSpawn(msg, context, clusterKind);
375+
await VerifyAndSpawn(msg, context, clusterKind).ConfigureAwait(false);
360376
}
361377
else
362378
{
363379
Spawn(msg, context, clusterKind);
364380
}
365-
366-
return Task.CompletedTask;
367381
}
368382

369-
private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
383+
private async Task VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
370384
{
371385
var clusterIdentity = msg.ClusterIdentity;
372386

@@ -389,20 +403,22 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
389403

390404
if (canSpawn.IsCompleted)
391405
{
392-
OnSpawnDecided(msg, context, clusterKind, canSpawn.Result);
406+
var canSpawnIdentity = await canSpawn.AsTask().ConfigureAwait(false);
407+
OnSpawnDecided(msg, context, clusterKind, canSpawnIdentity);
393408

394409
return;
395410
}
396411

397412
_inFlightIdentityChecks.Add(clusterIdentity);
398413

399-
context.ReenterAfter(canSpawn.AsTask(), task =>
414+
context.ReenterAfter(canSpawn.AsTask(), async task =>
400415
{
401416
_inFlightIdentityChecks.Remove(clusterIdentity);
402417

403418
if (task.IsCompletedSuccessfully)
404419
{
405-
OnSpawnDecided(msg, context, clusterKind, task.Result);
420+
var canSpawnIdentity = await task.ConfigureAwait(false);
421+
OnSpawnDecided(msg, context, clusterKind, canSpawnIdentity);
406422
}
407423
else
408424
{
@@ -415,8 +431,6 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
415431
}
416432
);
417433
}
418-
419-
return Task.CompletedTask;
420434
}
421435
);
422436
}

src/Proto.Cluster/PartitionActivator/PartitionActivatorActor.cs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private Task OnActivationTerminating(ActivationTerminating msg)
149149
return Task.CompletedTask;
150150
}
151151

152-
private Task OnActivationRequest(ActivationRequest msg, IContext context)
152+
private async Task OnActivationRequest(ActivationRequest msg, IContext context)
153153
{
154154
//who owns this?
155155
var ownerAddress = _manager.Selector.GetOwnerAddress(msg.ClusterIdentity);
@@ -167,7 +167,7 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context)
167167

168168
context.Forward(ownerPid);
169169

170-
return Task.CompletedTask;
170+
return;
171171
}
172172

173173
if (_actors.TryGetValue(msg.ClusterIdentity, out var existing))
@@ -185,18 +185,16 @@ private Task OnActivationRequest(ActivationRequest msg, IContext context)
185185
if (clusterKind.CanSpawnIdentity is not null)
186186
{
187187
// Needs to check if the identity is allowed to spawn
188-
VerifyAndSpawn(msg, context, clusterKind);
188+
await VerifyAndSpawn(msg, context, clusterKind).ConfigureAwait(false);
189189
}
190190
else
191191
{
192192
Spawn(msg, context, clusterKind);
193193
}
194194
}
195-
196-
return Task.CompletedTask;
197195
}
198196

199-
private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
197+
private async Task VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedClusterKind clusterKind)
200198
{
201199
var clusterIdentity = msg.ClusterIdentity;
202200

@@ -219,20 +217,22 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
219217

220218
if (canSpawn.IsCompleted)
221219
{
222-
OnSpawnDecided(msg, context, clusterKind, canSpawn.Result);
220+
var canSpawnIdentity = await canSpawn.AsTask().ConfigureAwait(false);
221+
OnSpawnDecided(msg, context, clusterKind, canSpawnIdentity);
223222

224223
return;
225224
}
226225

227226
_inFlightIdentityChecks.Add(clusterIdentity);
228227

229-
context.ReenterAfter(canSpawn.AsTask(), task =>
228+
context.ReenterAfter(canSpawn.AsTask(), async task =>
230229
{
231230
_inFlightIdentityChecks.Remove(clusterIdentity);
232231

233232
if (task.IsCompletedSuccessfully)
234233
{
235-
OnSpawnDecided(msg, context, clusterKind, task.Result);
234+
var canSpawnIdentity = await task.ConfigureAwait(false);
235+
OnSpawnDecided(msg, context, clusterKind, canSpawnIdentity);
236236
}
237237
else
238238
{
@@ -244,8 +244,6 @@ private void VerifyAndSpawn(ActivationRequest msg, IContext context, ActivatedCl
244244
}
245245
);
246246
}
247-
248-
return Task.CompletedTask;
249247
}
250248
);
251249
}

0 commit comments

Comments
 (0)