Skip to content

Commit aaa2f19

Browse files
authored
producer poll (#856)
* producer poll * + integration test, move check outside lock * Local -> _ * Review feedback changes
1 parent 8407393 commit aaa2f19

File tree

5 files changed

+241
-22
lines changed

5 files changed

+241
-22
lines changed

src/Confluent.Kafka/IProducer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,7 @@ void BeginProduce(
179179

180180

181181
/// <summary>
182-
/// Poll for callback events. Typically, you should not
183-
/// call this method. Only call on producer instances
184-
/// where background polling has been disabled.
182+
/// Poll for callback events.
185183
/// </summary>
186184
/// <param name="timeout">
187185
/// The maximum period of time to block if
@@ -190,7 +188,10 @@ void BeginProduce(
190188
/// because this operation cannot be cancelled.
191189
/// </param>
192190
/// <returns>
193-
/// Returns the number of events served.
191+
/// Returns the number of events served since
192+
/// the last call to this method or if this
193+
/// method has not yet been called, over the
194+
/// lifetime of the producer.
194195
/// </returns>
195196
int Poll(TimeSpan timeout);
196197

src/Confluent.Kafka/Producer.cs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ private SafeKafkaHandle KafkaHandle
7878
private Task callbackTask;
7979
private CancellationTokenSource callbackCts;
8080

81+
private int eventsServedCount = 0;
82+
private object pollSyncObj = new object();
83+
8184
private Task StartPollTask(CancellationToken ct)
8285
=> Task.Factory.StartNew(() =>
8386
{
@@ -86,7 +89,17 @@ private Task StartPollTask(CancellationToken ct)
8689
while (true)
8790
{
8891
ct.ThrowIfCancellationRequested();
89-
ownedKafkaHandle.Poll((IntPtr)cancellationDelayMaxMs);
92+
int eventsServedCount_ = ownedKafkaHandle.Poll((IntPtr)cancellationDelayMaxMs);
93+
94+
// note: lock {} is equivalent to Monitor.Enter then Monitor.Exit
95+
if (eventsServedCount_ > 0)
96+
{
97+
lock (pollSyncObj)
98+
{
99+
this.eventsServedCount += eventsServedCount_;
100+
Monitor.Pulse(pollSyncObj);
101+
}
102+
}
90103
}
91104
}
92105
catch (OperationCanceledException) {}
@@ -218,6 +231,7 @@ private void ProduceImpl(
218231
}
219232
}
220233

234+
ErrorCode err;
221235
if (this.enableDeliveryReports && deliveryHandler != null)
222236
{
223237
// Passes the TaskCompletionSource to the delivery report callback via the msg_opaque pointer
@@ -229,7 +243,7 @@ private void ProduceImpl(
229243
var gch = GCHandle.Alloc(deliveryHandler);
230244
var ptr = GCHandle.ToIntPtr(gch);
231245

232-
var err = KafkaHandle.Produce(
246+
err = KafkaHandle.Produce(
233247
topic,
234248
val, valOffset, valLength,
235249
key, keyOffset, keyLength,
@@ -240,25 +254,25 @@ private void ProduceImpl(
240254

241255
if (err != ErrorCode.NoError)
242256
{
257+
// note: freed in the delivery handler callback otherwise.
243258
gch.Free();
244-
throw new KafkaException(KafkaHandle.CreatePossiblyFatalError(err, null));
245259
}
246260
}
247261
else
248262
{
249-
var err = KafkaHandle.Produce(
263+
err = KafkaHandle.Produce(
250264
topic,
251265
val, valOffset, valLength,
252266
key, keyOffset, keyLength,
253267
partition.Value,
254268
timestamp.UnixTimestampMs,
255269
headers,
256270
IntPtr.Zero);
271+
}
257272

258-
if (err != ErrorCode.NoError)
259-
{
260-
throw new KafkaException(KafkaHandle.CreatePossiblyFatalError(err, null));
261-
}
273+
if (err != ErrorCode.NoError)
274+
{
275+
throw new KafkaException(KafkaHandle.CreatePossiblyFatalError(err, null));
262276
}
263277
}
264278

@@ -268,12 +282,22 @@ private void ProduceImpl(
268282
/// </summary>
269283
public int Poll(TimeSpan timeout)
270284
{
271-
if (!manualPoll)
285+
if (manualPoll)
272286
{
273-
throw new InvalidOperationException("Poll method called, but manual polling is not enabled.");
287+
return this.KafkaHandle.Poll((IntPtr)timeout.TotalMillisecondsAsInt());
274288
}
275289

276-
return this.KafkaHandle.Poll((IntPtr)timeout.TotalMillisecondsAsInt());
290+
lock (pollSyncObj)
291+
{
292+
if (eventsServedCount == 0)
293+
{
294+
Monitor.Wait(pollSyncObj, timeout);
295+
}
296+
297+
var result = eventsServedCount;
298+
eventsServedCount = 0;
299+
return result;
300+
}
277301
}
278302

279303

test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
using System.Threading.Tasks;
1919
using System.Threading;
2020
using System.Linq;
21-
using System.Collections.Generic;
2221

2322

2423
namespace Confluent.Kafka.Benchmark
@@ -76,6 +75,13 @@ private static long BenchmarkProducerImpl(
7675
var msgCount = nMessages;
7776
Action<DeliveryReport<Null, byte[]>> deliveryHandler = (DeliveryReport<Null, byte[]> deliveryReport) =>
7877
{
78+
if (deliveryReport.Error.IsError)
79+
{
80+
// Not interested in benchmark results in the (unlikely) event there is an error.
81+
Console.WriteLine($"A error occured producing a message: {deliveryReport.Error.Reason}");
82+
Environment.Exit(1); // note: exceptions do not currently propagate to calling code from a deliveryHandler method.
83+
}
84+
7985
if (--msgCount == 0)
8086
{
8187
autoEvent.Set();
@@ -84,19 +90,62 @@ private static long BenchmarkProducerImpl(
8490

8591
for (int i = 0; i < nMessages; i += 1)
8692
{
87-
producer.BeginProduce(topic, new Message<Null, byte[]> { Value = val, Headers = headers }, deliveryHandler);
93+
try
94+
{
95+
producer.BeginProduce(topic, new Message<Null, byte[]> { Value = val, Headers = headers }, deliveryHandler);
96+
}
97+
catch (ProduceException<Null, byte[]> ex)
98+
{
99+
if (ex.Error.Code == ErrorCode.Local_QueueFull)
100+
{
101+
producer.Poll(TimeSpan.FromSeconds(1));
102+
i -= 1;
103+
}
104+
else
105+
{
106+
throw;
107+
}
108+
}
88109
}
89110

90-
autoEvent.WaitOne();
111+
while (true)
112+
{
113+
if (autoEvent.WaitOne(TimeSpan.FromSeconds(1)))
114+
{
115+
break;
116+
}
117+
Console.WriteLine(msgCount);
118+
}
91119
}
92120
else
93121
{
94-
var tasks = new Task[nMessages];
95-
for (int i = 0; i < nMessages; i += 1)
122+
try
123+
{
124+
var tasks = new Task[nMessages];
125+
for (int i = 0; i < nMessages; i += 1)
126+
{
127+
tasks[i] = producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = val, Headers = headers });
128+
if (tasks[i].IsFaulted)
129+
{
130+
if (((ProduceException<Null, byte[]>)tasks[i].Exception.InnerException).Error.Code == ErrorCode.Local_QueueFull)
131+
{
132+
producer.Poll(TimeSpan.FromSeconds(1));
133+
i -= 1;
134+
}
135+
else
136+
{
137+
// unexpected, abort benchmark test.
138+
throw tasks[i].Exception;
139+
}
140+
}
141+
}
142+
143+
Task.WaitAll(tasks);
144+
}
145+
catch (AggregateException ex)
96146
{
97-
tasks[i] = producer.ProduceAsync(topic, new Message<Null, byte[]> { Value = val, Headers = headers });
147+
Console.WriteLine(ex.Message);
98148
}
99-
Task.WaitAll(tasks);
100149
}
101150

102151
var duration = DateTime.Now.Ticks - startTime;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2019 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using System.Diagnostics;
21+
using System.Threading.Tasks;
22+
using Xunit;
23+
24+
25+
namespace Confluent.Kafka.IntegrationTests
26+
{
27+
/// <summary>
28+
/// Test Producer.Poll when producer in automatic poll mode.
29+
/// </summary>
30+
public partial class Tests
31+
{
32+
[Theory, MemberData(nameof(KafkaParameters))]
33+
public void Producer_Poll(string bootstrapServers)
34+
{
35+
LogToFile("start Producer_Poll");
36+
37+
using (var tempTopic = new TemporaryTopic(bootstrapServers, 1))
38+
using (var producer = new ProducerBuilder<Null, string>(new ProducerConfig { BootstrapServers = bootstrapServers }).Build())
39+
{
40+
var r = producer.ProduceAsync(tempTopic.Name, new Message<Null, string> { Value = "a message" }).Result;
41+
Assert.True(r.Status == PersistenceStatus.Persisted);
42+
43+
// should be no events to serve and this should block for 500ms.
44+
var sw = new Stopwatch();
45+
sw.Start();
46+
// Note: Poll returns the number of events served since the last
47+
// call to Poll (or if the poll method hasn't beeen called, over
48+
// the lifetime of the producer).
49+
Assert.True(producer.Poll(TimeSpan.FromMilliseconds(500)) >= 1);
50+
Assert.True(sw.ElapsedMilliseconds < 1);
51+
Assert.Equal(0, producer.Poll(TimeSpan.FromMilliseconds(500)));
52+
Assert.True(sw.ElapsedMilliseconds >= 500);
53+
54+
sw.Reset();
55+
sw.Start();
56+
producer.BeginProduce(tempTopic.Name, new Message<Null, string> { Value = "a message 2" }, dr => Assert.False(dr.Error.IsError));
57+
// should block until the callback for the BeginProduce call executes.
58+
Assert.Equal(1, producer.Poll(TimeSpan.FromSeconds(4)));
59+
Assert.True(sw.ElapsedMilliseconds > 0);
60+
Assert.True(sw.ElapsedMilliseconds < 3500);
61+
}
62+
63+
Assert.Equal(0, Library.HandleCount);
64+
LogToFile("end Producer_Poll");
65+
}
66+
}
67+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2019 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using System.Diagnostics;
21+
using System.Threading.Tasks;
22+
using Xunit;
23+
24+
25+
namespace Confluent.Kafka.IntegrationTests
26+
{
27+
/// <summary>
28+
///
29+
/// </summary>
30+
public partial class Tests
31+
{
32+
[Theory, MemberData(nameof(KafkaParameters))]
33+
public void Producer_Poll_Backoff(string bootstrapServers)
34+
{
35+
LogToFile("start Producer_Poll_Backoff");
36+
37+
var pConfig = new ProducerConfig
38+
{
39+
BootstrapServers = bootstrapServers,
40+
QueueBufferingMaxMessages = 10,
41+
LingerMs = 100
42+
};
43+
44+
using (var tempTopic = new TemporaryTopic(bootstrapServers, 1))
45+
using (var producer = new ProducerBuilder<Null, string>(pConfig).Build())
46+
{
47+
// test timing around producer.Poll.
48+
Stopwatch sw = new Stopwatch();
49+
sw.Start();
50+
var exceptionCount = 0;
51+
for (int i=0; i<11; ++i)
52+
{
53+
try
54+
{
55+
producer.BeginProduce(tempTopic.Name, new Message<Null, string> { Value = "a message" });
56+
}
57+
catch (ProduceException<Null, string> ex)
58+
{
59+
exceptionCount += 1;
60+
Assert.Equal(ErrorCode.Local_QueueFull, ex.Error.Code);
61+
var served = producer.Poll(TimeSpan.FromSeconds(4));
62+
Assert.True(served >= 1);
63+
var elapsed = sw.ElapsedMilliseconds;
64+
Assert.True(elapsed > 100); // linger.ms
65+
Assert.True(elapsed < 4000);
66+
}
67+
}
68+
Assert.Equal(1, exceptionCount);
69+
70+
producer.Flush();
71+
}
72+
73+
Assert.Equal(0, Library.HandleCount);
74+
LogToFile("end Producer_Poll_Backoff");
75+
}
76+
77+
}
78+
}

0 commit comments

Comments
 (0)