Skip to content

Commit a71043f

Browse files
anchitjttd2089
andauthored
Fix IConsumer breaking change (#2071)
* fix: remove breaking change to IConsumer A method was added to the IConsumer interface by #2027 that resulted in a breaking change in the 2.1 release. This replaces the interface method and the implementation on Consumer with an extension method on IConsumer to ensure that existing implementations of IConsumer remain unbroken and have the same functionality as the default Consumer implementation. * Add CHANGELOG * Remove unused import --------- Co-authored-by: ttd2089 <[email protected]>
1 parent 39f4206 commit a71043f

File tree

4 files changed

+60
-27
lines changed

4 files changed

+60
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
## Fixes
1313

1414
- Fix backwards compatability of TopicPartitionOffset constructor. ([drinehimer](https://github.com/drinehimer), #2066)
15+
- Fix IConsumer breaking change. ([ttd2089](https://github.com/ttd2089), #2071)
1516

1617

1718
# 2.1.1

src/Confluent.Kafka/Consumer.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private void RebalanceCallback(
263263
{
264264
try
265265
{
266-
assignmentWithPositions.Add(PositionTopicPartitionOffset(tp));
266+
assignmentWithPositions.Add(this.PositionTopicPartitionOffset(tp));
267267
}
268268
catch
269269
{
@@ -509,24 +509,17 @@ public List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitio
509509

510510
/// <inheritdoc/>
511511
public Offset Position(TopicPartition partition)
512-
{
513-
return PositionTopicPartitionOffset(partition).Offset;
514-
}
515-
516-
/// <inheritdoc/>
517-
public TopicPartitionOffset PositionTopicPartitionOffset(TopicPartition partition)
518512
{
519513
try
520514
{
521-
return kafkaHandle.Position(new List<TopicPartition> { partition }).First();
515+
return kafkaHandle.Position(new List<TopicPartition> { partition }).First().Offset;
522516
}
523517
catch (TopicPartitionOffsetException e)
524518
{
525519
throw new KafkaException(e.Results[0].Error);
526520
}
527521
}
528522

529-
530523
/// <inheritdoc/>
531524
public List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
532525
// TODO: use a librdkafka queue for this.

src/Confluent.Kafka/IConsumer.cs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -556,24 +556,6 @@ public interface IConsumer<TKey, TValue> : IClient
556556
/// Thrown if the request failed.
557557
/// </exception>
558558
Offset Position(TopicPartition partition);
559-
560-
561-
/// <summary>
562-
/// Gets the current position (offset) for the
563-
/// specified topic / partition.
564-
///
565-
/// The offset field of each requested partition
566-
/// will be set to the offset of the last consumed
567-
/// message + 1, or Offset.Unset in case there was
568-
/// no previous message consumed by this consumer.
569-
///
570-
/// The returned TopicPartitionOffset contains the leader epoch
571-
/// too.
572-
/// </summary>
573-
/// <exception cref="Confluent.Kafka.KafkaException">
574-
/// Thrown if the request failed.
575-
/// </exception>
576-
TopicPartitionOffset PositionTopicPartitionOffset(TopicPartition partition);
577559

578560

579561
/// <summary>
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2018 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
using System.Linq;
19+
20+
21+
namespace Confluent.Kafka
22+
{
23+
/// <summary>
24+
/// Common extension methods for <see cref="IConsumer{TKey, TValue}"/> implementations.
25+
/// </summary>
26+
public static class IConsumerExtensions
27+
{
28+
/// <summary>
29+
/// Gets the current position (offset) for the
30+
/// specified topic / partition.
31+
///
32+
/// The offset field of each requested partition
33+
/// will be set to the offset of the last consumed
34+
/// message + 1, or Offset.Unset in case there was
35+
/// no previous message consumed by this consumer.
36+
///
37+
/// The returned TopicPartitionOffset contains the leader epoch
38+
/// too.
39+
/// </summary>
40+
/// <exception cref="Confluent.Kafka.KafkaException">
41+
/// Thrown if the request failed.
42+
/// </exception>
43+
public static TopicPartitionOffset PositionTopicPartitionOffset<TKey, TValue>(
44+
this IConsumer<TKey, TValue> consumer,
45+
TopicPartition partition)
46+
{
47+
try
48+
{
49+
return consumer.Handle.LibrdkafkaHandle.Position(new List<TopicPartition> { partition }).First();
50+
}
51+
catch (TopicPartitionOffsetException e)
52+
{
53+
throw new KafkaException(e.Results[0].Error);
54+
}
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)