Skip to content

Commit de5b77d

Browse files
AlexeyRagabrmagadutra
authored andcommitted
chore: simplify and nano-optimise batch producer
1 parent b4519d4 commit de5b77d

File tree

4 files changed

+22
-86
lines changed

4 files changed

+22
-86
lines changed

src/KafkaFlow/Producers/BatchProduceContext.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,11 @@ namespace KafkaFlow.Producers;
1212
/// </summary>
1313
internal sealed class BatchProduceContext
1414
{
15-
private readonly ConcurrentBag<BatchMessageEntry> _entries = new();
15+
private readonly ConcurrentQueue<BatchMessageEntry> _entries = new();
1616

1717
public IReadOnlyList<BatchMessageEntry> GetEntries() =>
1818
_entries.OrderBy(x => x.BatchIndex).ToList();
1919

20-
public IReadOnlyList<BatchMessageEntry> GetSuccessfulEntries() =>
21-
_entries.Where(e => e.IsSuccess).OrderBy(e => e.BatchIndex).ToList();
22-
23-
public IReadOnlyList<BatchMessageEntry> GetFailedEntries() =>
24-
_entries.Where(e => !e.IsSuccess).OrderBy(e => e.BatchIndex).ToList();
25-
2620
/// <summary>
2721
/// Registers a message that completed middleware processing successfully.
2822
/// </summary>
@@ -32,7 +26,7 @@ public void Register(
3226
Message<byte[], byte[]> messageToProduce,
3327
IMessageContext context)
3428
{
35-
_entries.Add(new BatchMessageEntry(batchIndex, item, true, messageToProduce, context, null));
29+
_entries.Enqueue(new BatchMessageEntry(batchIndex, item, true, messageToProduce, context, null));
3630
}
3731

3832
/// <summary>
@@ -46,7 +40,7 @@ public void RegisterFailure(int batchIndex, BatchProduceItem item, Exception exc
4640
Error = new Error(ErrorCode.Local_Fail, exception.Message),
4741
Status = PersistenceStatus.NotPersisted,
4842
};
49-
_entries.Add(new BatchMessageEntry(batchIndex, item, false, null, null, exception));
43+
_entries.Enqueue(new BatchMessageEntry(batchIndex, item, false, null, null, exception));
5044
}
5145
}
5246

src/KafkaFlow/Producers/BatchProduceException.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace KafkaFlow.Producers;
55

66
/// <summary>
7-
/// Exception thrown by <see cref="M:BatchProduceExtension.BatchProduceAsync"/>
7+
/// Exception thrown by <see cref="M:MessageProducer.BatchProduceAsync"/>
88
/// </summary>
99
public class BatchProduceException : Exception
1010
{

src/KafkaFlow/Producers/MessageProducer.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -211,17 +211,17 @@ public async Task<IReadOnlyCollection<BatchProduceItem>> BatchProduceAsync(
211211
// All items are now registered (either success with message, or failure with DeliveryReport set)
212212
await Task.WhenAll(middlewareTasks).ConfigureAwait(false);
213213

214-
// If instructed, throw if any message failed during middleware processing
215-
var unsuccessful = batchContext.GetFailedEntries();
216-
if (unsuccessful.Count > 0 && throwIfAnyProduceFail)
214+
var allEntries = batchContext.GetEntries();
215+
var successfulEntries = allEntries.Where(e => e.IsSuccess).ToList();
216+
217+
var errors = allEntries.Where(e => !e.IsSuccess).Select(e => e.Error).ToList();
218+
219+
if (errors.Count > 0 && throwIfAnyProduceFail)
217220
{
218-
var exceptions = unsuccessful.Select(e => e.Error).ToList();
219-
var aggregateException = new AggregateException("One or more messages failed during middleware processing", exceptions);
221+
var aggregateException = new AggregateException("One or more messages failed during middleware processing", errors);
220222
throw new BatchProduceException(items, aggregateException);
221223
}
222224

223-
// Produce the messages that passed middleware
224-
var successfulEntries = batchContext.GetSuccessfulEntries();
225225
if (successfulEntries.Count > 0)
226226
{
227227
try
@@ -247,7 +247,7 @@ public async Task<IReadOnlyCollection<BatchProduceItem>> BatchProduceAsync(
247247
}
248248
}
249249

250-
return batchContext.GetEntries().Select(e => e.Item).ToList();
250+
return allEntries.Select(e => e.Item).ToList();
251251
}
252252

253253
private async Task ExecuteMessageWithBatchContextAsync(
@@ -483,7 +483,13 @@ void DeliveryHandler(DeliveryReport<byte[], byte[]> report)
483483
{
484484
FillContextWithResultMetadata(entry.Context, report);
485485
}
486-
486+
}
487+
catch (Exception ex)
488+
{
489+
_logHandler.Error("Batch Produce Delivery Handler Error", ex, new { Report = report });
490+
}
491+
finally
492+
{
487493
// Set the DeliveryReport directly on the item
488494
entry.Item.DeliveryReport = report;
489495

@@ -492,10 +498,6 @@ void DeliveryHandler(DeliveryReport<byte[], byte[]> report)
492498
completionSource.TrySetResult(true);
493499
}
494500
}
495-
catch (Exception ex)
496-
{
497-
_logHandler.Error("Batch Produce Delivery Handler Error", ex, new { Report = report });
498-
}
499501
}
500502

501503
try

tests/KafkaFlow.UnitTests/BatchProduce/BatchProduceContextTests.cs

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -88,64 +88,6 @@ public void GetEntries_ShouldReturnAllEntriesSortedByBatchIndex()
8888
entries[2].BatchIndex.Should().Be(2);
8989
}
9090

91-
[TestMethod]
92-
public void GetSuccessfulEntries_ShouldReturnOnlySuccessfulEntriesSorted()
93-
{
94-
// Arrange
95-
var context = new BatchProduceContext();
96-
var item = CreateBatchProduceItem("test-topic");
97-
var message = CreateMessage();
98-
var messageContext = Mock.Of<IMessageContext>();
99-
100-
context.Register(0, item, message, messageContext);
101-
context.RegisterFailure(1, item, new Exception("Error 1"));
102-
context.Register(2, item, message, messageContext);
103-
context.RegisterFailure(3, item, new Exception("Error 2"));
104-
context.Register(4, item, message, messageContext);
105-
106-
// Act
107-
var successful = context.GetSuccessfulEntries();
108-
109-
// Assert
110-
successful.Should().HaveCount(3);
111-
successful[0].BatchIndex.Should().Be(0);
112-
successful[0].IsSuccess.Should().BeTrue();
113-
successful[1].BatchIndex.Should().Be(2);
114-
successful[1].IsSuccess.Should().BeTrue();
115-
successful[2].BatchIndex.Should().Be(4);
116-
successful[2].IsSuccess.Should().BeTrue();
117-
}
118-
119-
[TestMethod]
120-
public void GetFailedEntries_ShouldReturnOnlyFailedEntriesSorted()
121-
{
122-
// Arrange
123-
var context = new BatchProduceContext();
124-
var item = CreateBatchProduceItem("test-topic");
125-
var message = CreateMessage();
126-
var messageContext = Mock.Of<IMessageContext>();
127-
var error1 = new InvalidOperationException("Error 1");
128-
var error2 = new ArgumentException("Error 2");
129-
130-
context.Register(0, item, message, messageContext);
131-
context.RegisterFailure(1, item, error1);
132-
context.Register(2, item, message, messageContext);
133-
context.RegisterFailure(3, item, error2);
134-
context.Register(4, item, message, messageContext);
135-
136-
// Act
137-
var failed = context.GetFailedEntries();
138-
139-
// Assert
140-
failed.Should().HaveCount(2);
141-
failed[0].BatchIndex.Should().Be(1);
142-
failed[0].IsSuccess.Should().BeFalse();
143-
failed[0].Error.Should().BeSameAs(error1);
144-
failed[1].BatchIndex.Should().Be(3);
145-
failed[1].IsSuccess.Should().BeFalse();
146-
failed[1].Error.Should().BeSameAs(error2);
147-
}
148-
14991
[TestMethod]
15092
public void Register_ShouldBeThreadSafe()
15193
{
@@ -189,7 +131,7 @@ public void RegisterFailure_ShouldBeThreadSafe()
189131
Task.WaitAll(tasks);
190132

191133
// Assert
192-
var entries = context.GetFailedEntries();
134+
var entries = context.GetEntries().Where(e => !e.IsSuccess).ToList();
193135
entries.Should().HaveCount(100);
194136
entries.Select(e => e.BatchIndex).Should().BeEquivalentTo(Enumerable.Range(0, 100));
195137
}
@@ -222,14 +164,12 @@ public void MixedOperations_ShouldBeThreadSafe()
222164

223165
// Assert
224166
var allEntries = context.GetEntries();
225-
var successful = context.GetSuccessfulEntries();
226-
var failed = context.GetFailedEntries();
167+
var successful = allEntries.Where(e => e.IsSuccess).ToList();
168+
var failed = allEntries.Where(e => !e.IsSuccess).ToList();
227169

228170
allEntries.Should().HaveCount(100);
229171
successful.Should().HaveCount(50);
230172
failed.Should().HaveCount(50);
231-
successful.All(e => e.IsSuccess).Should().BeTrue();
232-
failed.All(e => !e.IsSuccess).Should().BeTrue();
233173
}
234174

235175
private static Message<byte[], byte[]> CreateMessage()

0 commit comments

Comments
 (0)