Skip to content

Commit 1f04822

Browse files
committed
Merge branch '1.0.x'
2 parents 6611a4f + 887926d commit 1f04822

32 files changed

+405
-409
lines changed

examples/Consumer/Program.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,17 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
6161
// Note: All handlers are called on the main .Consume thread.
6262
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
6363
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
64-
.SetRebalanceHandler((_, e) =>
64+
.SetPartitionsAssignedHandler((c, partitions) =>
6565
{
66-
if (e.IsAssignment)
67-
{
68-
Console.WriteLine($"Assigned partitions: [{string.Join(", ", e.Partitions)}]");
69-
// possibly override the default partition assignment behavior:
70-
// consumer.Assign(...)
71-
}
72-
else
73-
{
74-
Console.WriteLine($"Revoked partitions: [{string.Join(", ", e.Partitions)}]");
75-
// consumer.Unassign()
76-
}
66+
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
67+
// possibly manually specify start offsets or override the partition assignment provided by
68+
// the consumer group by returning a list of topic/partition/offsets to assign to, e.g.:
69+
//
70+
// return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));
71+
})
72+
.SetPartitionsRevokedHandler((c, partitions) =>
73+
{
74+
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
7775
})
7876
.Build())
7977
{

src/Confluent.Kafka/Consumer.cs

Lines changed: 90 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ internal class Config
4242
internal Action<LogMessage> logHandler;
4343
internal Action<string> statisticsHandler;
4444
internal Action<CommittedOffsets> offsetsCommittedHandler;
45-
internal Action<RebalanceEvent> rebalanceHandler;
45+
internal Func<List<TopicPartition>, IEnumerable<TopicPartitionOffset>> partitionsAssignedHandler;
46+
internal Func<List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> partitionsRevokedHandler;
4647
}
4748

4849
private IDeserializer<TKey> keyDeserializer;
@@ -110,7 +111,8 @@ private void LogCallback(IntPtr rk, SyslogLevel level, string fac, string buf)
110111
logHandler?.Invoke(new LogMessage(Util.Marshal.PtrToStringUTF8(Librdkafka.name(rk)), level, fac, buf));
111112
}
112113

113-
private Action<RebalanceEvent> rebalanceHandler;
114+
private Func<List<TopicPartition>, IEnumerable<TopicPartitionOffset>> partitionsAssignedHandler;
115+
private Func<List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> partitionsRevokedHandler;
114116
private Librdkafka.RebalanceDelegate rebalanceDelegate;
115117
private void RebalanceCallback(
116118
IntPtr rk,
@@ -129,40 +131,66 @@ private void RebalanceCallback(
129131
throw new Exception("Unexpected rebalance callback on disposed kafkaHandle");
130132
}
131133

132-
// Note: The contract with librdkafka requires the application to acknowledge rebalances by calling Assign.
133-
// To make the API less error prone, this is done automatically by the C# binding if required - the number
134-
// of times assign is called by user code is tracked, and if this is zero, then assign is called automatically.
135-
136134
if (err == ErrorCode.Local_AssignPartitions)
137135
{
138-
if (rebalanceHandler != null)
136+
if (partitionsAssignedHandler == null)
137+
{
138+
Assign(partitionAssignment.Select(p => new TopicPartitionOffset(p, Offset.Unset)));
139+
return;
140+
}
141+
142+
lock (assignCallCountLockObj) { assignCallCount = 0; }
143+
var assignTo = partitionsAssignedHandler(partitionAssignment);
144+
lock (assignCallCountLockObj)
139145
{
140-
lock (assignCallCountLockObj) { assignCallCount = 0; }
141-
rebalanceHandler(new RebalanceEvent(partitionAssignment, true));
142-
lock (assignCallCountLockObj)
146+
if (assignCallCount > 0)
143147
{
144-
if (assignCallCount > 0) { return; }
148+
throw new InvalidOperationException("Assign/Unassign must not be called in the partitions assigned handler.");
145149
}
146150
}
147-
Assign(partitionAssignment.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
151+
Assign(assignTo);
152+
return;
148153
}
149-
else if (err == ErrorCode.Local_RevokePartitions)
154+
155+
if (err == ErrorCode.Local_RevokePartitions)
150156
{
151-
if (rebalanceHandler != null)
157+
if (partitionsRevokedHandler == null)
158+
{
159+
Unassign();
160+
return;
161+
}
162+
163+
var assignmentWithPositions = new List<TopicPartitionOffset>();
164+
foreach (var tp in partitionAssignment)
152165
{
153-
lock (assignCallCountLockObj) { assignCallCount = 0; }
154-
rebalanceHandler(new RebalanceEvent(partitionAssignment, false));
155-
lock (assignCallCountLockObj)
166+
try
167+
{
168+
assignmentWithPositions.Add(new TopicPartitionOffset(tp, Position(tp)));
169+
}
170+
catch
156171
{
157-
if (assignCallCount > 0) { return; }
172+
assignmentWithPositions.Add(new TopicPartitionOffset(tp, Offset.Unset));
158173
}
159174
}
160-
Unassign();
161-
}
162-
else
163-
{
164-
throw new KafkaException(kafkaHandle.CreatePossiblyFatalError(err, null));
175+
176+
lock (assignCallCountLockObj) { assignCallCount = 0; }
177+
var assignTo = partitionsRevokedHandler(assignmentWithPositions);
178+
lock (assignCallCountLockObj)
179+
{
180+
if (assignCallCount > 0)
181+
{
182+
throw new InvalidOperationException("Assign/Unassign must not be called in the partitions revoked handler.");
183+
}
184+
}
185+
186+
// This distinction is important because calling Assign whilst the consumer is being
187+
// closed (which will generally trigger this callback) is disallowed.
188+
if (assignTo.Count() > 0) { Assign(assignTo); }
189+
else { Unassign(); }
190+
return;
165191
}
192+
193+
throw new KafkaException(kafkaHandle.CreatePossiblyFatalError(err, null));
166194
}
167195

168196
private Action<CommittedOffsets> offsetsCommittedHandler;
@@ -294,7 +322,7 @@ public void Assign(TopicPartition partition)
294322
/// the previous set will be replaced.
295323
/// </summary>
296324
/// <param name="partition">
297-
/// The partition to consume from. If an offset value of Offset.Invalid
325+
/// The partition to consume from. If an offset value of Offset.Unset
298326
/// (-1001) is specified, consumption will resume from the last committed
299327
/// offset, or according to the 'auto.offset.reset' configuration parameter
300328
/// if no offsets have been committed yet.
@@ -313,7 +341,7 @@ public void Assign(TopicPartitionOffset partition)
313341
/// </summary>
314342
/// <param name="partitions">
315343
/// The set of partitions to consume from. If an offset value of
316-
/// Offset.Invalid (-1001) is specified for a partition, consumption
344+
/// Offset.Unset (-1001) is specified for a partition, consumption
317345
/// will resume from the last committed offset on that partition, or
318346
/// according to the 'auto.offset.reset' configuration parameter if
319347
/// no offsets have been committed yet.
@@ -342,7 +370,7 @@ public void Assign(IEnumerable<TopicPartitionOffset> partitions)
342370
public void Assign(IEnumerable<TopicPartition> partitions)
343371
{
344372
lock (assignCallCountLockObj) { assignCallCount += 1; }
345-
kafkaHandle.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)).ToList());
373+
kafkaHandle.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Unset)).ToList());
346374
}
347375

348376

@@ -379,32 +407,37 @@ public void Unassign()
379407
/// Thrown if result is in error.
380408
/// </exception>
381409
public void StoreOffset(ConsumeResult<TKey, TValue> result)
382-
=> StoreOffsets(new[] { new TopicPartitionOffset(result.TopicPartition, result.Offset + 1) });
410+
=> StoreOffset(new TopicPartitionOffset(result.TopicPartition, result.Offset + 1));
383411

384412

385413
/// <summary>
386-
/// Store offsets for one or more partitions.
414+
/// Store offsets for a single partition.
387415
///
388416
/// The offset will be committed (written) to the offset store according
389-
/// to `auto.commit.interval.ms` or manual offset-less commit().
417+
/// to `auto.commit.interval.ms` or manual offset-less commit(). Calling
418+
/// this method in itself does not commit offsets, only store them for
419+
/// future commit.
390420
/// </summary>
391421
/// <remarks>
392422
/// `enable.auto.offset.store` must be set to "false" when using this API.
393423
/// </remarks>
394-
/// <param name="offsets">
395-
/// List of offsets to be commited.
424+
/// <param name="offset">
425+
/// The offset to be commited.
396426
/// </param>
397427
/// <exception cref="Confluent.Kafka.KafkaException">
398428
/// Thrown if the request failed.
399429
/// </exception>
400-
/// <exception cref="Confluent.Kafka.TopicPartitionOffsetException">
401-
/// Thrown if any of the constituent results is in error. The entire result
402-
/// (which may contain constituent results that are not in error) is available
403-
/// via the <see cref="Confluent.Kafka.TopicPartitionOffsetException.Results" />
404-
/// property of the exception.
405-
/// </exception>
406-
public void StoreOffsets(IEnumerable<TopicPartitionOffset> offsets)
407-
=> kafkaHandle.StoreOffsets(offsets);
430+
public void StoreOffset(TopicPartitionOffset offset)
431+
{
432+
try
433+
{
434+
kafkaHandle.StoreOffsets(new [] { offset });
435+
}
436+
catch (TopicPartitionOffsetException e)
437+
{
438+
throw new KafkaException(e.Results[0].Error);
439+
}
440+
}
408441

409442

410443
/// <summary>
@@ -523,10 +556,10 @@ public void Resume(IEnumerable<TopicPartition> partitions)
523556

524557

525558
/// <summary>
526-
/// Retrieve current committed offsets for the specified topic/partitions.
559+
/// Retrieve current committed offsets for the specified topic partitions.
527560
///
528561
/// The offset field of each requested partition will be set to the offset
529-
/// of the last consumed message, or Offset.Invalid in case there was
562+
/// of the last consumed message, or Offset.Unset in case there was
530563
/// no previous message, or, alternately a partition specific error may also
531564
/// be returned.
532565
/// </summary>
@@ -551,25 +584,26 @@ public List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitio
551584

552585

553586
/// <summary>
554-
/// Gets the current positions (offsets) for the specified topics + partitions.
587+
/// Gets the current position (offset) for the specified topic / partition.
555588
///
556589
/// The offset field of each requested partition will be set to the offset
557-
/// of the last consumed message + 1, or Offset.Invalid in case there was
590+
/// of the last consumed message + 1, or Offset.Unset in case there was
558591
/// no previous message consumed by this consumer.
559592
/// </summary>
560593
/// <exception cref="Confluent.Kafka.KafkaException">
561594
/// Thrown if the request failed.
562595
/// </exception>
563-
/// <exception cref="Confluent.Kafka.TopicPartitionOffsetException">
564-
/// Thrown if any of the constituent results is in error. The entire result
565-
/// (which may contain constituent results that are not in error) is available
566-
/// via the <see cref="Confluent.Kafka.TopicPartitionOffsetException.Results" />
567-
/// property of the exception.
568-
/// </exception>
569-
public List<TopicPartitionOffset> Position(IEnumerable<TopicPartition> partitions)
570-
// TODO: use a librdkafka queue for this.
571-
=> kafkaHandle.Position(partitions);
572-
596+
public Offset Position(TopicPartition partition)
597+
{
598+
try
599+
{
600+
return kafkaHandle.Position(new List<TopicPartition> { partition }).First().Offset;
601+
}
602+
catch (TopicPartitionOffsetException e)
603+
{
604+
throw new KafkaException(e.Results[0].Error);
605+
}
606+
}
573607

574608
/// <summary>
575609
/// Look up the offsets for the given partitions by timestamp. The returned
@@ -611,7 +645,7 @@ public List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTime
611645
/// The low offset is updated periodically (if statistics.interval.ms
612646
/// is set) while the high offset is updated on each fetched message set from
613647
/// the broker. If there is no cached offset (either low or high, or both) then
614-
/// Offset.Invalid will be returned for the respective offset.
648+
/// Offset.Unset will be returned for the respective offset.
615649
/// </remarks>
616650
/// <param name="topicPartition">
617651
/// The topic/partition of interest.
@@ -754,7 +788,8 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
754788
this.statisticsHandler = baseConfig.statisticsHandler;
755789
this.logHandler = baseConfig.logHandler;
756790
this.errorHandler = baseConfig.errorHandler;
757-
this.rebalanceHandler = baseConfig.rebalanceHandler;
791+
this.partitionsAssignedHandler = baseConfig.partitionsAssignedHandler;
792+
this.partitionsRevokedHandler = baseConfig.partitionsRevokedHandler;
758793
this.offsetsCommittedHandler = baseConfig.offsetsCommittedHandler;
759794

760795
Librdkafka.Initialize(null);

0 commit comments

Comments
 (0)