Skip to content

Commit 9826d79

Browse files
authored
KIP-320: Allow fetchers to detect (#2027)
and handle log truncation. .NET changes
1 parent 5acbd3d commit 9826d79

File tree

13 files changed

+260
-40
lines changed

13 files changed

+260
-40
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
- Added SetSaslCredentials. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored
77
SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1980).
88
- Changed the way the `_SCHEMA` filed is accessed internally from reflecting the static field to accessing it from the instance ([AlexeyRaga](https://github.com/AlexeyRaga)).
9+
- [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation): add offset leader epoch fields to the TopicPartitionOffset,
10+
TopicPartitionOffsetError and ConsumeResult classes (#2027).
911

1012

1113
## Fixes

src/Confluent.Kafka/ConsumeResult.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public class ConsumeResult<TKey, TValue>
4141
/// </summary>
4242
public Offset Offset { get; set; }
4343

44+
/// <summary>
45+
/// The offset leader epoch (optional).
46+
/// </summary>
47+
public int? LeaderEpoch { get; set; }
48+
4449
/// <summary>
4550
/// The TopicPartition associated with the message.
4651
/// </summary>
@@ -54,13 +59,15 @@ public TopicPartitionOffset TopicPartitionOffset
5459
{
5560
get
5661
{
57-
return new TopicPartitionOffset(Topic, Partition, Offset);
62+
return new TopicPartitionOffset(Topic, Partition, Offset,
63+
LeaderEpoch);
5864
}
5965
set
6066
{
6167
Topic = value.Topic;
6268
Partition = value.Partition;
6369
Offset = value.Offset;
70+
LeaderEpoch = value.LeaderEpoch;
6471
}
6572
}
6673

src/Confluent.Kafka/Consumer.cs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private void RebalanceCallback(
263263
{
264264
try
265265
{
266-
assignmentWithPositions.Add(new TopicPartitionOffset(tp, Position(tp)));
266+
assignmentWithPositions.Add(PositionTopicPartitionOffset(tp));
267267
}
268268
catch
269269
{
@@ -437,7 +437,8 @@ public void Unassign()
437437

438438
/// <inheritdoc/>
439439
public void StoreOffset(ConsumeResult<TKey, TValue> result)
440-
=> StoreOffset(new TopicPartitionOffset(result.TopicPartition, result.Offset + 1));
440+
=> StoreOffset(new TopicPartitionOffset(result.TopicPartition,
441+
result.Offset + 1, result.LeaderEpoch));
441442

442443

443444
/// <inheritdoc/>
@@ -474,7 +475,8 @@ public void Commit(ConsumeResult<TKey, TValue> result)
474475
throw new InvalidOperationException("Attempt was made to commit offset corresponding to an empty consume result");
475476
}
476477

477-
Commit(new [] { new TopicPartitionOffset(result.TopicPartition, result.Offset + 1) });
478+
Commit(new [] { new TopicPartitionOffset(result.TopicPartition, result.Offset + 1,
479+
result.LeaderEpoch) });
478480
}
479481

480482

@@ -507,10 +509,16 @@ public List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitio
507509

508510
/// <inheritdoc/>
509511
public Offset Position(TopicPartition partition)
512+
{
513+
return PositionTopicPartitionOffset(partition).Offset;
514+
}
515+
516+
/// <inheritdoc/>
517+
public TopicPartitionOffset PositionTopicPartitionOffset(TopicPartition partition)
510518
{
511519
try
512520
{
513-
return kafkaHandle.Position(new List<TopicPartition> { partition }).First().Offset;
521+
return kafkaHandle.Position(new List<TopicPartition> { partition }).First();
514522
}
515523
catch (TopicPartitionOffsetException e)
516524
{
@@ -800,7 +808,9 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
800808
{
801809
return new ConsumeResult<TKey, TValue>
802810
{
803-
TopicPartitionOffset = new TopicPartitionOffset(topic, msg.partition, msg.offset),
811+
TopicPartitionOffset = new TopicPartitionOffset(topic,
812+
msg.partition, msg.offset,
813+
Librdkafka.message_leader_epoch(msgPtr)),
804814
Message = null,
805815
IsPartitionEOF = true
806816
};
@@ -845,7 +855,9 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
845855
throw new ConsumeException(
846856
new ConsumeResult<byte[], byte[]>
847857
{
848-
TopicPartitionOffset = new TopicPartitionOffset(topic, msg.partition, msg.offset),
858+
TopicPartitionOffset = new TopicPartitionOffset(topic,
859+
msg.partition, msg.offset,
860+
Librdkafka.message_leader_epoch(msgPtr)),
849861
Message = new Message<byte[], byte[]>
850862
{
851863
Timestamp = timestamp,
@@ -876,7 +888,9 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
876888
throw new ConsumeException(
877889
new ConsumeResult<byte[], byte[]>
878890
{
879-
TopicPartitionOffset = new TopicPartitionOffset(topic, msg.partition, msg.offset),
891+
TopicPartitionOffset = new TopicPartitionOffset(topic,
892+
msg.partition, msg.offset,
893+
Librdkafka.message_leader_epoch(msgPtr)),
880894
Message = new Message<byte[], byte[]>
881895
{
882896
Timestamp = timestamp,
@@ -908,7 +922,9 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
908922
throw new ConsumeException(
909923
new ConsumeResult<byte[], byte[]>
910924
{
911-
TopicPartitionOffset = new TopicPartitionOffset(topic, msg.partition, msg.offset),
925+
TopicPartitionOffset = new TopicPartitionOffset(topic,
926+
msg.partition, msg.offset,
927+
Librdkafka.message_leader_epoch(msgPtr)),
912928
Message = new Message<byte[], byte[]>
913929
{
914930
Timestamp = timestamp,
@@ -924,7 +940,9 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
924940

925941
return new ConsumeResult<TKey, TValue>
926942
{
927-
TopicPartitionOffset = new TopicPartitionOffset(topic, msg.partition, msg.offset),
943+
TopicPartitionOffset = new TopicPartitionOffset(topic,
944+
msg.partition, msg.offset,
945+
Librdkafka.message_leader_epoch(msgPtr)),
928946
Message = new Message<TKey, TValue>
929947
{
930948
Timestamp = timestamp,

src/Confluent.Kafka/IConsumer.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,24 @@ public interface IConsumer<TKey, TValue> : IClient
556556
/// Thrown if the request failed.
557557
/// </exception>
558558
Offset Position(TopicPartition partition);
559+
560+
561+
/// <summary>
562+
/// Gets the current position (offset) for the
563+
/// specified topic / partition.
564+
///
565+
/// The offset field of each requested partition
566+
/// will be set to the offset of the last consumed
567+
/// message + 1, or Offset.Unset in case there was
568+
/// no previous message consumed by this consumer.
569+
///
570+
/// The returned TopicPartitionOffset contains the leader epoch
571+
/// too.
572+
/// </summary>
573+
/// <exception cref="Confluent.Kafka.KafkaException">
574+
/// Thrown if the request failed.
575+
/// </exception>
576+
TopicPartitionOffset PositionTopicPartitionOffset(TopicPartition partition);
559577

560578

561579
/// <summary>

src/Confluent.Kafka/Impl/LibRdKafka.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ static bool SetDelegates(Type nativeMethodsClass)
179179
_message_timestamp = (messageTimestampDelegate)methods.Single(m => m.Name == "rd_kafka_message_timestamp").CreateDelegate(typeof(messageTimestampDelegate));
180180
_message_headers = (messageHeadersDelegate)methods.Single(m => m.Name == "rd_kafka_message_headers").CreateDelegate(typeof(messageHeadersDelegate));
181181
_message_status = (Func<IntPtr, PersistenceStatus>)methods.Single(m => m.Name == "rd_kafka_message_status").CreateDelegate(typeof(Func<IntPtr, PersistenceStatus>));
182+
_message_leader_epoch = (messageLeaderEpoch)methods.Single(m => m.Name == "rd_kafka_message_leader_epoch").CreateDelegate(typeof(messageLeaderEpoch));
182183
_message_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_message_destroy").CreateDelegate(typeof(Action<IntPtr>));
183184
_conf_new = (Func<SafeConfigHandle>)methods.Single(m => m.Name == "rd_kafka_conf_new").CreateDelegate(typeof(Func<SafeConfigHandle>));
184185
_conf_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_conf_destroy").CreateDelegate(typeof(Action<IntPtr>));
@@ -208,6 +209,8 @@ static bool SetDelegates(Type nativeMethodsClass)
208209
_topic_conf_set_partitioner_cb = (Action<IntPtr, PartitionerDelegate>)methods.Single(m => m.Name == "rd_kafka_topic_conf_set_partitioner_cb").CreateDelegate(typeof(Action<IntPtr, PartitionerDelegate>));
209210
_topic_conf_set_opaque = (Action<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_topic_conf_set_opaque").CreateDelegate(typeof(Action<IntPtr, IntPtr>));
210211
_topic_partition_available = (Func<IntPtr, int, bool>)methods.Single(m => m.Name == "rd_kafka_topic_partition_available").CreateDelegate(typeof(Func<IntPtr, int, bool>));
212+
_topic_partition_get_leader_epoch = (Func<IntPtr, int>)methods.Single(m => m.Name == "rd_kafka_topic_partition_get_leader_epoch").CreateDelegate(typeof(Func<IntPtr, int>));
213+
_topic_partition_set_leader_epoch = (Action<IntPtr, int>)methods.Single(m => m.Name == "rd_kafka_topic_partition_set_leader_epoch").CreateDelegate(typeof(Action<IntPtr, int>));
211214
_init_transactions = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_init_transactions").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
212215
_begin_transaction = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_begin_transaction").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
213216
_commit_transaction = (Func<IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_commit_transaction").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
@@ -247,6 +250,7 @@ static bool SetDelegates(Type nativeMethodsClass)
247250
_pause_partitions = (Func<IntPtr, IntPtr, ErrorCode>)methods.Single(m => m.Name == "rd_kafka_pause_partitions").CreateDelegate(typeof(Func<IntPtr, IntPtr, ErrorCode>));
248251
_resume_partitions = (Func<IntPtr, IntPtr, ErrorCode>)methods.Single(m => m.Name == "rd_kafka_resume_partitions").CreateDelegate(typeof(Func<IntPtr, IntPtr, ErrorCode>));
249252
_seek = (Func<IntPtr, int, long, IntPtr, ErrorCode>)methods.Single(m => m.Name == "rd_kafka_seek").CreateDelegate(typeof(Func<IntPtr, int, long, IntPtr, ErrorCode>));
253+
_seek_partitions = (Func<IntPtr, IntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_seek_partitions").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr, IntPtr>));
250254
_position = (Func<IntPtr, IntPtr, ErrorCode>)methods.Single(m => m.Name == "rd_kafka_position").CreateDelegate(typeof(Func<IntPtr, IntPtr, ErrorCode>));
251255
_produceva = (Produceva)methods.Single(m => m.Name == "rd_kafka_produceva").CreateDelegate(typeof(Produceva));
252256
_flush = (Flush)methods.Single(m => m.Name == "rd_kafka_flush").CreateDelegate(typeof(Flush));
@@ -782,6 +786,10 @@ internal static ErrorCode header_get_all(
782786
private static messageHeadersDelegate _message_headers;
783787
internal static ErrorCode message_headers(IntPtr rkmessage, out IntPtr hdrs) => _message_headers(rkmessage, out hdrs);
784788

789+
internal delegate int messageLeaderEpoch(IntPtr rkmessage);
790+
private static messageLeaderEpoch _message_leader_epoch;
791+
internal static int message_leader_epoch(IntPtr rkmessage) => _message_leader_epoch(rkmessage);
792+
785793
private static Action<IntPtr> _message_destroy;
786794
internal static void message_destroy(IntPtr rkmessage) => _message_destroy(rkmessage);
787795

@@ -901,6 +909,15 @@ internal static void topic_conf_set_partitioner_cb(
901909
internal static bool topic_partition_available(IntPtr rkt, int partition)
902910
=> _topic_partition_available(rkt, partition);
903911

912+
913+
private static Func<IntPtr, int> _topic_partition_get_leader_epoch;
914+
internal static int topic_partition_get_leader_epoch(IntPtr rkt)
915+
=> _topic_partition_get_leader_epoch(rkt);
916+
917+
private static Action<IntPtr, int> _topic_partition_set_leader_epoch;
918+
internal static void topic_partition_set_leader_epoch(IntPtr rkt, int leader_epoch)
919+
=> _topic_partition_set_leader_epoch(rkt, leader_epoch);
920+
904921
private static Func<IntPtr, IntPtr, IntPtr> _init_transactions;
905922
internal static IntPtr init_transactions(IntPtr rk, IntPtr timeout)
906923
=> _init_transactions(rk, timeout);
@@ -1064,6 +1081,10 @@ internal static ErrorCode resume_partitions(IntPtr rk, IntPtr partitions)
10641081
private static Func<IntPtr, int, long, IntPtr, ErrorCode> _seek;
10651082
internal static ErrorCode seek(IntPtr rkt, int partition, long offset, IntPtr timeout_ms)
10661083
=> _seek(rkt, partition, offset, timeout_ms);
1084+
1085+
private static Func<IntPtr, IntPtr, IntPtr, IntPtr> _seek_partitions;
1086+
internal static IntPtr seek_partitions(IntPtr rkt, IntPtr partitions, IntPtr timeout_ms)
1087+
=> _seek_partitions(rkt, partitions, timeout_ms);
10671088

10681089
private static Func<IntPtr, IntPtr, IntPtr, ErrorCode> _committed;
10691090
internal static ErrorCode committed(IntPtr rk, IntPtr partitions, IntPtr timeout_ms)

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ internal static extern ErrorCode rd_kafka_message_headers(
119119
internal static extern PersistenceStatus rd_kafka_message_status(
120120
/* rd_kafka_message_t * */ IntPtr rkmessage);
121121

122+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
123+
internal static extern int rd_kafka_message_leader_epoch(
124+
/* rd_kafka_message_t * */ IntPtr rkmessage);
125+
122126
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
123127
internal static extern void rd_kafka_message_destroy(
124128
/* rd_kafka_message_t * */ IntPtr rkmessage);
@@ -244,6 +248,14 @@ internal static extern void rd_kafka_topic_conf_set_partitioner_cb(
244248
internal static extern bool rd_kafka_topic_partition_available(
245249
IntPtr rkt, int partition);
246250

251+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
252+
internal static extern int rd_kafka_topic_partition_get_leader_epoch(
253+
IntPtr rkt);
254+
255+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
256+
internal static extern void rd_kafka_topic_partition_set_leader_epoch(
257+
IntPtr rkt, int leader_epoch);
258+
247259
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
248260
internal static extern IntPtr rd_kafka_init_transactions(
249261
IntPtr rk, IntPtr timeout_ms);
@@ -405,6 +417,10 @@ internal static extern ErrorCode rd_kafka_resume_partitions(
405417
internal static extern ErrorCode rd_kafka_seek(
406418
IntPtr rkt, int partition, long offset, IntPtr timeout_ms);
407419

420+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
421+
internal static extern IntPtr rd_kafka_seek_partitions(
422+
IntPtr rkt, IntPtr partitions, IntPtr timeout_ms);
423+
408424
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
409425
internal static extern ErrorCode rd_kafka_committed(
410426
IntPtr rk, IntPtr partitions, IntPtr timeout_ms);

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ internal static extern ErrorCode rd_kafka_message_headers(
123123
internal static extern PersistenceStatus rd_kafka_message_status(
124124
/* rd_kafka_message_t * */ IntPtr rkmessage);
125125

126+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
127+
internal static extern int rd_kafka_message_leader_epoch(
128+
/* rd_kafka_message_t * */ IntPtr rkmessage);
129+
126130
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
127131
internal static extern void rd_kafka_message_destroy(
128132
/* rd_kafka_message_t * */ IntPtr rkmessage);
@@ -248,6 +252,14 @@ internal static extern void rd_kafka_topic_conf_set_partitioner_cb(
248252
internal static extern bool rd_kafka_topic_partition_available(
249253
IntPtr rkt, int partition);
250254

255+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
256+
internal static extern int rd_kafka_topic_partition_get_leader_epoch(
257+
IntPtr rkt);
258+
259+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
260+
internal static extern void rd_kafka_topic_partition_set_leader_epoch(
261+
IntPtr rkt, int leader_epoch);
262+
251263
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
252264
internal static extern IntPtr rd_kafka_init_transactions(
253265
IntPtr rk, IntPtr timeout_ms);
@@ -409,6 +421,10 @@ internal static extern ErrorCode rd_kafka_resume_partitions(
409421
internal static extern ErrorCode rd_kafka_seek(
410422
IntPtr rkt, int partition, long offset, IntPtr timeout_ms);
411423

424+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
425+
internal static extern IntPtr rd_kafka_seek_partitions(
426+
IntPtr rkt, IntPtr partitions, IntPtr timeout_ms);
427+
412428
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
413429
internal static extern ErrorCode rd_kafka_committed(
414430
IntPtr rk, IntPtr partitions, IntPtr timeout_ms);

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos6.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ internal static extern ErrorCode rd_kafka_message_headers(
123123
internal static extern PersistenceStatus rd_kafka_message_status(
124124
/* rd_kafka_message_t * */ IntPtr rkmessage);
125125

126+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
127+
internal static extern int rd_kafka_message_leader_epoch(
128+
/* rd_kafka_message_t * */ IntPtr rkmessage);
129+
126130
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
127131
internal static extern void rd_kafka_message_destroy(
128132
/* rd_kafka_message_t * */ IntPtr rkmessage);
@@ -248,6 +252,14 @@ internal static extern void rd_kafka_topic_conf_set_partitioner_cb(
248252
internal static extern bool rd_kafka_topic_partition_available(
249253
IntPtr rkt, int partition);
250254

255+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
256+
internal static extern int rd_kafka_topic_partition_get_leader_epoch(
257+
IntPtr rkt);
258+
259+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
260+
internal static extern void rd_kafka_topic_partition_set_leader_epoch(
261+
IntPtr rkt, int leader_epoch);
262+
251263
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
252264
internal static extern IntPtr rd_kafka_init_transactions(
253265
IntPtr rk, IntPtr timeout_ms);
@@ -409,6 +421,10 @@ internal static extern ErrorCode rd_kafka_resume_partitions(
409421
internal static extern ErrorCode rd_kafka_seek(
410422
IntPtr rkt, int partition, long offset, IntPtr timeout_ms);
411423

424+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
425+
internal static extern IntPtr rd_kafka_seek_partitions(
426+
IntPtr rkt, IntPtr partitions, IntPtr timeout_ms);
427+
412428
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
413429
internal static extern ErrorCode rd_kafka_committed(
414430
IntPtr rk, IntPtr partitions, IntPtr timeout_ms);

0 commit comments

Comments
 (0)