Skip to content

Commit 1bc8d2c

Browse files
gnjackjoelfoliveira
authored andcommitted
fix: use ConfigureAwait(false) on all awaits
1 parent 16f7851 commit 1bc8d2c

File tree

15 files changed

+61
-55
lines changed

15 files changed

+61
-55
lines changed

src/KafkaFlow/Batching/BatchConsumeMiddleware.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public BatchConsumeMiddleware(
3838

3939
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
4040
{
41-
await _dispatchSemaphore.WaitAsync();
41+
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
4242

4343
try
4444
{
@@ -59,7 +59,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
5959

6060
if (_batch.Count >= _batchSize)
6161
{
62-
await this.TriggerDispatchAndWaitAsync();
62+
await this.TriggerDispatchAndWaitAsync().ConfigureAwait(false);
6363
}
6464
}
6565

@@ -72,11 +72,11 @@ public void Dispose()
7272

7373
private async Task TriggerDispatchAndWaitAsync()
7474
{
75-
await _dispatchSemaphore.WaitAsync();
75+
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
7676
_dispatchTokenSource?.Cancel();
7777
_dispatchSemaphore.Release();
7878

79-
await (_dispatchTask ?? Task.CompletedTask);
79+
await (_dispatchTask ?? Task.CompletedTask).ConfigureAwait(false);
8080
}
8181

8282
private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)
@@ -92,7 +92,7 @@ private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)
9292

9393
private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate next)
9494
{
95-
await _dispatchSemaphore.WaitAsync();
95+
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
9696

9797
_dispatchTokenSource.Dispose();
9898
_dispatchTokenSource = null;

src/KafkaFlow/Clusters/ClusterManager.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn
8585

8686
foreach (var name in topicsName)
8787
{
88-
topicsMetadata.Add((name, await this.GetTopicMetadataAsync(name)));
88+
topicsMetadata.Add((name, await this.GetTopicMetadataAsync(name).ConfigureAwait(false)));
8989
}
9090

9191
var topics =
@@ -98,7 +98,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn
9898
.ToList();
9999

100100
var result = await _lazyAdminClient.Value.ListConsumerGroupOffsetsAsync(
101-
new[] { new ConsumerGroupTopicPartitions(consumerGroup, topics) });
101+
new[] { new ConsumerGroupTopicPartitions(consumerGroup, topics) }).ConfigureAwait(false);
102102

103103
if (!result.Any())
104104
{
@@ -125,7 +125,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
125125
})
126126
.ToArray();
127127

128-
await _lazyAdminClient.Value.CreateTopicsAsync(topics);
128+
await _lazyAdminClient.Value.CreateTopicsAsync(topics).ConfigureAwait(false);
129129
}
130130
catch (CreateTopicsException exception)
131131
{

src/KafkaFlow/Consumers/Consumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
163163
try
164164
{
165165
this.EnsureConsumer();
166-
await _flowManager.BlockHeartbeat(cancellationToken);
166+
await _flowManager.BlockHeartbeat(cancellationToken).ConfigureAwait(false);
167167
return _consumer.Consume(cancellationToken);
168168
}
169169
catch (OperationCanceledException)
@@ -176,7 +176,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
176176
"Max Poll Interval Exceeded",
177177
new { this.Configuration.ConsumerName });
178178

179-
await _maxPollIntervalExceeded.FireAsync();
179+
await _maxPollIntervalExceeded.FireAsync().ConfigureAwait(false);
180180
}
181181
catch (KafkaException ex) when (ex.Error.IsFatal)
182182
{
@@ -187,7 +187,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
187187

188188
this.InvalidateConsumer();
189189

190-
await Task.Delay(5000, cancellationToken);
190+
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
191191
}
192192
catch (Exception ex)
193193
{

src/KafkaFlow/Consumers/ConsumerManager.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ private void StartEvaluateWorkerCountTimer() => _evaluateWorkersCountTimer?.Chan
7070

7171
private async Task EvaluateWorkersCountAsync()
7272
{
73-
var newWorkersCount = await this.CalculateWorkersCount(this.Consumer.Assignment);
73+
var newWorkersCount = await this.CalculateWorkersCount(this.Consumer.Assignment).ConfigureAwait(false);
7474

7575
if (newWorkersCount == this.WorkerPool.CurrentWorkersCount)
7676
{
7777
return;
7878
}
7979

80-
await this.ChangeWorkersCountAsync(newWorkersCount);
80+
await this.ChangeWorkersCountAsync(newWorkersCount).ConfigureAwait(false);
8181
}
8282

8383
private async Task ChangeWorkersCountAsync(int workersCount)
@@ -86,10 +86,10 @@ private async Task ChangeWorkersCountAsync(int workersCount)
8686
{
8787
this.StopEvaluateWorkerCountTimer();
8888

89-
await this.Feeder.StopAsync();
90-
await this.WorkerPool.StopAsync();
89+
await this.Feeder.StopAsync().ConfigureAwait(false);
90+
await this.WorkerPool.StopAsync().ConfigureAwait(false);
9191

92-
await this.WorkerPool.StartAsync(this.Consumer.Assignment, workersCount);
92+
await this.WorkerPool.StartAsync(this.Consumer.Assignment, workersCount).ConfigureAwait(false);
9393
this.Feeder.Start();
9494

9595
this.StartEvaluateWorkerCountTimer();
@@ -155,7 +155,8 @@ private async Task<int> CalculateWorkersCount(IEnumerable<Confluent.Kafka.TopicP
155155
.Select(x => x.Partition.Value)
156156
.ToList()))
157157
.ToList()),
158-
_dependencyResolver);
158+
_dependencyResolver)
159+
.ConfigureAwait(false);
159160
}
160161
catch (Exception e)
161162
{

src/KafkaFlow/Consumers/ConsumerWorker.cs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,20 @@ public Task StartAsync(CancellationToken stopCancellationToken)
7575

7676
try
7777
{
78-
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(stopCancellationToken))
78+
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(stopCancellationToken).ConfigureAwait(false))
7979
{
8080
currentContext = context;
8181

8282
await this
8383
.ProcessMessageAsync(context, stopCancellationToken)
84-
.WithCancellation(stopCancellationToken, true);
84+
.WithCancellation(stopCancellationToken, true)
85+
.ConfigureAwait(false);
8586
}
8687
}
8788
catch (OperationCanceledException)
8889
{
8990
currentContext?.ConsumerContext.Discard();
90-
await this.DiscardBufferedContextsAsync();
91+
await this.DiscardBufferedContextsAsync().ConfigureAwait(false);
9192
}
9293
catch (Exception ex)
9394
{
@@ -101,13 +102,13 @@ await this
101102

102103
public async Task StopAsync()
103104
{
104-
await _workerStoppingEvent.FireAsync();
105+
await _workerStoppingEvent.FireAsync().ConfigureAwait(false);
105106

106107
_messagesBuffer.Writer.TryComplete();
107108

108-
await _backgroundTask;
109+
await _backgroundTask.ConfigureAwait(false);
109110

110-
await _workerStoppedEvent.FireAsync();
111+
await _workerStoppedEvent.FireAsync().ConfigureAwait(false);
111112
}
112113

113114
public void Dispose()
@@ -118,7 +119,7 @@ public void Dispose()
118119

119120
private async Task DiscardBufferedContextsAsync()
120121
{
121-
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(CancellationToken.None))
122+
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(CancellationToken.None).ConfigureAwait(false))
122123
{
123124
context.ConsumerContext.Discard();
124125
}
@@ -130,29 +131,29 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
130131
{
131132
try
132133
{
133-
await _globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context));
134+
await _globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)).ConfigureAwait(false);
134135

135136
_ = context.ConsumerContext.Completion.ContinueWith(
136137
async task =>
137138
{
138139
if (task.IsFaulted)
139140
{
140-
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception));
141+
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception)).ConfigureAwait(false);
141142
}
142143

143-
await _globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
144+
await _globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context)).ConfigureAwait(false);
144145
},
145146
CancellationToken.None);
146147

147-
await _middlewareExecutor.Execute(context, _ => Task.CompletedTask);
148+
await _middlewareExecutor.Execute(context, _ => Task.CompletedTask).ConfigureAwait(false);
148149
}
149150
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
150151
{
151152
context.ConsumerContext.ShouldStoreOffset = false;
152153
}
153154
catch (Exception ex)
154155
{
155-
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex));
156+
await _globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex)).ConfigureAwait(false);
156157

157158
_logHandler.Error(
158159
"Error processing message",
@@ -172,7 +173,7 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
172173
context.ConsumerContext.Complete();
173174
}
174175

175-
await _workerProcessingEnded.FireAsync(context);
176+
await _workerProcessingEnded.FireAsync(context).ConfigureAwait(false);
176177
}
177178
}
178179
catch (Exception ex)

src/KafkaFlow/Consumers/ConsumerWorkerPool.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public async Task StartAsync(IReadOnlyCollection<TopicPartition> partitions, int
6363
new NullOffsetManager() :
6464
new OffsetManager(_offsetCommitter, partitions);
6565

66-
await _offsetCommitter.StartAsync();
66+
await _offsetCommitter.StartAsync().ConfigureAwait(false);
6767

6868
this.CurrentWorkersCount = workersCount;
6969

@@ -94,7 +94,8 @@ await Task.WhenAll(
9494
_workers.Add(worker);
9595

9696
return worker.StartAsync(_stopCancellationTokenSource.Token);
97-
}));
97+
}))
98+
.ConfigureAwait(false);
9899

99100
_distributionStrategy = _distributionStrategyFactory(_consumerDependencyResolver);
100101
_distributionStrategy.Initialize(_workers.AsReadOnly());
@@ -129,24 +130,25 @@ public async Task StopAsync()
129130
_stopCancellationTokenSource.CancelAfter(_consumer.Configuration.WorkerStopTimeout);
130131
}
131132

132-
await Task.WhenAll(currentWorkers.Select(x => x.StopAsync()));
133+
await Task.WhenAll(currentWorkers.Select(x => x.StopAsync())).ConfigureAwait(false);
133134
await _offsetManager
134135
.WaitContextsCompletionAsync()
135-
.WithCancellation(_stopCancellationTokenSource.Token, false);
136+
.WithCancellation(_stopCancellationTokenSource.Token, false)
137+
.ConfigureAwait(false);
136138

137139
currentWorkers.ForEach(worker => worker.Dispose());
138140
_stopCancellationTokenSource?.Dispose();
139141

140142
_offsetManager = null;
141143

142-
await _workerPoolStoppedSubject.FireAsync();
144+
await _workerPoolStoppedSubject.FireAsync().ConfigureAwait(false);
143145

144-
await _offsetCommitter.StopAsync();
146+
await _offsetCommitter.StopAsync().ConfigureAwait(false);
145147
}
146148

147149
public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, CancellationToken stopCancellationToken)
148150
{
149-
await _startedTaskSource.Task;
151+
await _startedTaskSource.Task.ConfigureAwait(false);
150152

151153
var worker = (IConsumerWorker)await _distributionStrategy
152154
.GetWorkerAsync(
@@ -155,7 +157,7 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
155157
message.Topic,
156158
message.Partition.Value,
157159
message.Message.Key,
158-
stopCancellationToken));
160+
stopCancellationToken)).ConfigureAwait(false);
159161

160162
if (worker is null)
161163
{
@@ -166,7 +168,7 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
166168

167169
_offsetManager.Enqueue(context.ConsumerContext);
168170

169-
await worker.EnqueueAsync(context);
171+
await worker.EnqueueAsync(context).ConfigureAwait(false);
170172
}
171173

172174
private MessageContext CreateMessageContext(ConsumeResult<byte[], byte[]> message, IConsumerWorker worker)

src/KafkaFlow/Consumers/WorkerPoolFeeder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,6 @@ public async Task StopAsync()
7272
_stopTokenSource.Dispose();
7373
}
7474

75-
await (_feederTask ?? Task.CompletedTask);
75+
await (_feederTask ?? Task.CompletedTask).ConfigureAwait(false);
7676
}
7777
}

src/KafkaFlow/Consumers/WorkersBalancers/ConsumerLagWorkerBalancer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public ConsumerLagWorkerBalancer(
4141

4242
public async Task<int> GetWorkersCountAsync(WorkersCountContext context)
4343
{
44-
var workers = await this.CalculateAsync(context);
44+
var workers = await this.CalculateAsync(context).ConfigureAwait(false);
4545

4646
_logHandler.Info(
4747
"New workers count calculated",
@@ -97,13 +97,14 @@ private async Task<int> CalculateAsync(WorkersCountContext context)
9797
return DefaultWorkersCount;
9898
}
9999

100-
var topicsMetadata = await this.GetTopicsMetadataAsync(context);
100+
var topicsMetadata = await this.GetTopicsMetadataAsync(context).ConfigureAwait(false);
101101

102102
var lastOffsets = this.GetPartitionsLastOffset(context.ConsumerName, topicsMetadata);
103103

104104
var partitionsOffset = await _clusterManager.GetConsumerGroupOffsetsAsync(
105105
context.ConsumerGroupId,
106-
context.AssignedTopicsPartitions.Select(t => t.Name));
106+
context.AssignedTopicsPartitions.Select(t => t.Name))
107+
.ConfigureAwait(false);
107108

108109
var partitionsLag = CalculatePartitionsLag(lastOffsets, partitionsOffset);
109110
var instanceLag = CalculateMyPartitionsLag(context, partitionsLag);
@@ -156,7 +157,7 @@ private async Task<int> CalculateAsync(WorkersCountContext context)
156157

157158
foreach (var topic in context.AssignedTopicsPartitions)
158159
{
159-
topicsMetadata.Add((topic.Name, await _clusterManager.GetTopicMetadataAsync(topic.Name)));
160+
topicsMetadata.Add((topic.Name, await _clusterManager.GetTopicMetadataAsync(topic.Name).ConfigureAwait(false)));
160161
}
161162

162163
return topicsMetadata;

src/KafkaFlow/Extensions/ChannelExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public static async IAsyncEnumerable<T> ReadAllItemsAsync<T>(
1111
this ChannelReader<T> reader,
1212
[EnumeratorCancellation] CancellationToken cancellationToken = default)
1313
{
14-
while (await reader.WaitToReadAsync(cancellationToken))
14+
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
1515
{
1616
while (reader.TryRead(out var item))
1717
{

src/KafkaFlow/Extensions/TaskExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static async Task WithCancellation(
2121
return;
2222
}
2323

24-
await Task.WhenAny(task, tcs.Task);
24+
await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
2525

2626
void TrySetResult()
2727
{

0 commit comments

Comments
 (0)