Skip to content

Commit de1537a

Browse files
authored
SetRebalanceHandler implementation (#763)
* SetRebalanceHandler implementation * review changes * review changes II * move Set methods from consumer -> builder
1 parent 6c3df62 commit de1537a

File tree

17 files changed

+457
-329
lines changed

17 files changed

+457
-329
lines changed

examples/Consumer/Program.cs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,22 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
6161
// Note: All handlers are called on the main .Consume thread.
6262
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
6363
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
64+
.SetRebalanceHandler((_, e) =>
65+
{
66+
if (e.IsAssignment)
67+
{
68+
Console.WriteLine($"Assigned partitions: [{string.Join(", ", e.Partitions)}]");
69+
// possibly override the default partition assignment behavior:
70+
// consumer.Assign(...)
71+
}
72+
else
73+
{
74+
Console.WriteLine($"Revoked partitions: [{string.Join(", ", e.Partitions)}]");
75+
// consumer.Unassign()
76+
}
77+
})
6478
.Build())
6579
{
66-
67-
// The partitions assigned handler is called when the consumer has been
68-
// notified of a new assignment set. You can use this callback to perform
69-
// actions such as retrieving offsets from an external source and manually
70-
// setting start offsets using the Assign method. You can even call Assign
71-
// with a different set of partitions than those in the assignment. If
72-
// you do not call Assign, the consumer will be automatically assigned to
73-
// the partitions of the assignment set and consumption will start from
74-
// last committed offsets or in accordance with the auto.offset.reset
75-
// configuration parameter for partitions where there is no committed offset.
76-
consumer.SetPartitionsAssignedHandler((_, tps) =>
77-
{
78-
Console.WriteLine($"Assigned partitions: [{string.Join(", ", tps)}], member id: {consumer.MemberId}");
79-
});
80-
81-
// Called when the consumer's current assignment set has been revoked.
82-
consumer.SetPartitionsRevokedHandler((_, tps) =>
83-
{
84-
Console.WriteLine($"Revoked partitions: [{string.Join(", ", tps)}]");
85-
});
86-
8780
consumer.Subscribe(topics);
8881

8982
while (!cancellationToken.IsCancellationRequested)

src/Confluent.Kafka/Consumer.cs

Lines changed: 48 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ internal class Config
4141
internal Action<Error> errorHandler;
4242
internal Action<LogMessage> logHandler;
4343
internal Action<string> statisticsHandler;
44+
internal Action<CommittedOffsets> offsetsCommittedHandler;
45+
internal Action<RebalanceEvent> rebalanceHandler;
4446
}
4547

4648
private IDeserializer<TKey> keyDeserializer;
@@ -70,6 +72,7 @@ internal class Config
7072
/// invocation of a rebalance callback event.
7173
/// </summary>
7274
private int assignCallCount = 0;
75+
private object assignCallCountLockObj = new object();
7376

7477
private bool enableHeaderMarshaling = true;
7578
private bool enableTimestampMarshaling = true;
@@ -107,24 +110,23 @@ private void LogCallback(IntPtr rk, SyslogLevel level, string fac, string buf)
107110
logHandler?.Invoke(new LogMessage(Util.Marshal.PtrToStringUTF8(Librdkafka.name(rk)), level, fac, buf));
108111
}
109112

110-
private Action<List<TopicPartition>> partitionsAssignedHandler;
111-
private Action<List<TopicPartition>> partitionsRevokedHandler;
113+
private Action<RebalanceEvent> rebalanceHandler;
112114
private Librdkafka.RebalanceDelegate rebalanceDelegate;
113115
private void RebalanceCallback(
114116
IntPtr rk,
115117
ErrorCode err,
116118
IntPtr partitions,
117119
IntPtr opaque)
118120
{
119-
var partitionList = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(partitions).Select(p => p.TopicPartition).ToList();
121+
var partitionAssignment = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(partitions).Select(p => p.TopicPartition).ToList();
120122

121123
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
122124
if (kafkaHandle.IsClosed)
123125
{
124126
// The RebalanceCallback should never be invoked as a side effect of Dispose.
125127
// If for some reason flow of execution gets here, something is badly wrong.
126128
// (and we have a closed librdkafka handle that is expecting an assign call...)
127-
throw new Exception("unexpected rebalance callback on disposed kafkaHandle");
129+
throw new Exception("Unexpected rebalance callback on disposed kafkaHandle");
128130
}
129131

130132
// Note: The contract with librdkafka requires the application to acknowledge rebalances by calling Assign.
@@ -133,30 +135,26 @@ private void RebalanceCallback(
133135

134136
if (err == ErrorCode.Local_AssignPartitions)
135137
{
136-
if (partitionsAssignedHandler != null)
138+
if (rebalanceHandler != null)
137139
{
138-
assignCallCount = 0;
139-
partitionsAssignedHandler(partitionList);
140-
if (assignCallCount == 1) { return; }
141-
if (assignCallCount > 1)
140+
lock (assignCallCountLockObj) { assignCallCount = 0; }
141+
rebalanceHandler(new RebalanceEvent(partitionAssignment, true));
142+
lock (assignCallCountLockObj)
142143
{
143-
throw new InvalidOperationException(
144-
$"Assign/Unassign was called {assignCallCount} times after OnPartitionsAssigned was raised. It must be called at most once.");
144+
if (assignCallCount > 0) { return; }
145145
}
146146
}
147-
Assign(partitionList.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
147+
Assign(partitionAssignment.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
148148
}
149149
else if (err == ErrorCode.Local_RevokePartitions)
150150
{
151-
if (partitionsRevokedHandler != null)
151+
if (rebalanceHandler != null)
152152
{
153-
assignCallCount = 0;
154-
partitionsRevokedHandler(partitionList);
155-
if (assignCallCount == 1) { return; }
156-
if (assignCallCount > 1)
153+
lock (assignCallCountLockObj) { assignCallCount = 0; }
154+
rebalanceHandler(new RebalanceEvent(partitionAssignment, false));
155+
lock (assignCallCountLockObj)
157156
{
158-
throw new InvalidOperationException(
159-
$"Assign/Unassign was called {assignCallCount} times after OnPartitionsAssigned was raised. It must be called at most once.");
157+
if (assignCallCount > 0) { return; }
160158
}
161159
}
162160
Unassign();
@@ -243,11 +241,13 @@ public List<string> Subscription
243241
/// in the group.
244242
/// </remarks>
245243
public void Subscribe(IEnumerable<string> topics)
246-
=> kafkaHandle.Subscribe(topics);
244+
{
245+
kafkaHandle.Subscribe(topics);
246+
}
247247

248248

249249
/// <summary>
250-
/// Update the subscription set to a single topic.
250+
/// Sets the subscription set to a single topic.
251251
///
252252
/// Any previous subscription will be unassigned and unsubscribed first.
253253
/// </summary>
@@ -269,10 +269,12 @@ public void Unsubscribe()
269269

270270

271271
/// <summary>
272-
/// Update the assignment set to a single <paramref name="partition" />.
272+
/// Sets the current set of assigned partitions (the set of partitions the
273+
/// consumer will consume from) to a single <paramref name="partition" />.
273274
///
274-
/// The assignment set is the complete set of partitions to consume
275-
/// from and will replace any previous assignment.
275+
/// Note: The newly specified set is the complete set of partitions to
276+
/// consume from. If the consumer is already assigned to a set of partitions,
277+
/// the previous set will be replaced.
276278
/// </summary>
277279
/// <param name="partition">
278280
/// The partition to consume from. Consumption will resume from the last
@@ -284,10 +286,12 @@ public void Assign(TopicPartition partition)
284286

285287

286288
/// <summary>
287-
/// Update the assignment set to a single <paramref name="partition" />.
289+
/// Sets the current set of assigned partitions (the set of partitions the
290+
/// consumer will consume from) to a single <paramref name="partition" />.
288291
///
289-
/// The assignment set is the complete set of partitions to consume
290-
/// from and will replace any previous assignment.
292+
/// Note: The newly specified set is the complete set of partitions to
293+
/// consume from. If the consumer is already assigned to a set of partitions,
294+
/// the previous set will be replaced.
291295
/// </summary>
292296
/// <param name="partition">
293297
/// The partition to consume from. If an offset value of Offset.Invalid
@@ -300,10 +304,12 @@ public void Assign(TopicPartitionOffset partition)
300304

301305

302306
/// <summary>
303-
/// Update the assignment set to <paramref name="partitions" />.
307+
/// Sets the current set of assigned partitions (the set of partitions the
308+
/// consumer will consume from) to <paramref name="partitions" />.
304309
///
305-
/// The assignment set is the complete set of partitions to consume
306-
/// from and will replace any previous assignment.
310+
/// Note: The newly specified set is the complete set of partitions to
311+
/// consume from. If the consumer is already assigned to a set of partitions,
312+
/// the previous set will be replaced.
307313
/// </summary>
308314
/// <param name="partitions">
309315
/// The set of partitions to consume from. If an offset value of
@@ -314,16 +320,18 @@ public void Assign(TopicPartitionOffset partition)
314320
/// </param>
315321
public void Assign(IEnumerable<TopicPartitionOffset> partitions)
316322
{
317-
assignCallCount += 1;
323+
lock (assignCallCountLockObj) { assignCallCount += 1; }
318324
kafkaHandle.Assign(partitions.ToList());
319325
}
320326

321327

322328
/// <summary>
323-
/// Update the assignment set to <paramref name="partitions" />.
329+
/// Sets the current set of assigned partitions (the set of partitions the
330+
/// consumer will consume from) to <paramref name="partitions" />.
324331
///
325-
/// The assignment set is the complete set of partitions to consume
326-
/// from and will replace any previous assignment.
332+
/// Note: The newly specified set is the complete set of partitions to
333+
/// consume from. If the consumer is already assigned to a set of partitions,
334+
/// the previous set will be replaced.
327335
/// </summary>
328336
/// <param name="partitions">
329337
/// The set of partitions to consume from. Consumption will resume
@@ -333,20 +341,21 @@ public void Assign(IEnumerable<TopicPartitionOffset> partitions)
333341
/// </param>
334342
public void Assign(IEnumerable<TopicPartition> partitions)
335343
{
336-
assignCallCount += 1;
344+
lock (assignCallCountLockObj) { assignCallCount += 1; }
337345
kafkaHandle.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)).ToList());
338346
}
339347

340348

341349
/// <summary>
342-
/// Stop consumption and remove the current assignment.
350+
/// Remove the current set of assigned partitions and stop consumption.
343351
/// </summary>
344352
public void Unassign()
345353
{
346-
assignCallCount += 1;
354+
lock (assignCallCountLockObj) { assignCallCount += 1; }
347355
kafkaHandle.Assign(null);
348356
}
349357

358+
350359
/// <summary>
351360
/// Store offsets for a single partition based on the topic/partition/offset
352361
/// of a consume result.
@@ -694,79 +703,15 @@ protected virtual void Dispose(bool disposing)
694703
}
695704

696705

697-
/// <summary>
698-
/// Set the partitions assigned handler.
699-
///
700-
/// If you do not call the <see cref="Confluent.Kafka.Consumer{TKey,TValue}.Assign(IEnumerable{TopicPartition})" />
701-
/// method (or another overload of this method) in this handler, or do not specify a partitions assigned handler,
702-
/// the consumer will be automatically assigned to the partition assignment set provided by the consumer group and
703-
/// consumption will resume from the last committed offset for each partition, or if there is no committed offset,
704-
/// in accordance with the `auto.offset.reset` configuration property. This default behavior will not occur if
705-
/// you call Assign yourself in the handler. The set of partitions you assign to is not required to match the
706-
/// assignment provided by the consumer group, but typically will.
707-
/// </summary>
708-
/// <remarks>
709-
/// Executes as a side-effect of the Consumer.Consume call (on the same thread).
710-
/// </remarks>
711-
public void SetPartitionsAssignedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartition>> value)
712-
{
713-
if (value == null)
714-
{
715-
partitionsAssignedHandler = null;
716-
return;
717-
}
718-
partitionsAssignedHandler = partitions => value(this, partitions);
719-
}
720-
721-
722-
/// <summary>
723-
/// Set the partitions revoked handler.
724-
///
725-
/// If you do not call the <see cref="Confluent.Kafka.Consumer{TKey,TValue}.Unassign" /> or
726-
/// <see cref="Confluent.Kafka.Consumer{TKey,TValue}.Assign(IEnumerable{TopicPartition})" />
727-
/// (or other overload) method in your handler, all partitions will be automatically
728-
/// unassigned. This default behavior will not occur if you call Unassign (or Assign)
729-
/// yourself.
730-
/// </summary>
731-
/// <remarks>
732-
/// Executes as a side-effect of the Consumer.Consume call (on the same thread).
733-
/// </remarks>
734-
public void SetPartitionsRevokedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartition>> value)
735-
{
736-
if (value == null)
737-
{
738-
partitionsRevokedHandler = null;
739-
return;
740-
}
741-
partitionsRevokedHandler = partitions => value(this, partitions);
742-
}
743-
744-
745-
/// <summary>
746-
/// A handler that is called to report the result of (automatic) offset
747-
/// commits. It is not called as a result of the use of the Commit method.
748-
/// </summary>
749-
/// <remarks>
750-
/// Executes as a side-effect of the Consumer.Consume call (on the same thread).
751-
/// </remarks>
752-
public void SetOffsetsCommittedHandler(Action<IConsumer<TKey, TValue>, CommittedOffsets> value)
753-
{
754-
if (value == null)
755-
{
756-
offsetsCommittedHandler = null;
757-
return;
758-
}
759-
offsetsCommittedHandler = offsets => value(this, offsets);
760-
}
761-
762-
763706
internal Consumer(ConsumerBuilder<TKey, TValue> builder)
764707
{
765708
var baseConfig = builder.ConstructBaseConfig(this);
766709

767710
this.statisticsHandler = baseConfig.statisticsHandler;
768711
this.logHandler = baseConfig.logHandler;
769712
this.errorHandler = baseConfig.errorHandler;
713+
this.rebalanceHandler = baseConfig.rebalanceHandler;
714+
this.offsetsCommittedHandler = baseConfig.offsetsCommittedHandler;
770715

771716
Librdkafka.Initialize(null);
772717

0 commit comments

Comments
 (0)