Skip to content

Commit 0747e6b

Browse files
authored
move get/query watermark offset methods to consumer (#791)
* move get/query watermark offset methods to consumer * removed check for handle type * review changes
1 parent ca82c65 commit 0747e6b

File tree

6 files changed

+58
-79
lines changed

6 files changed

+58
-79
lines changed

src/Confluent.Kafka/AdminClient.cs

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -562,52 +562,6 @@ public GroupInfo ListGroup(string group, TimeSpan timeout)
562562
=> kafkaHandle.ListGroup(group, timeout.TotalMillisecondsAsInt());
563563

564564

565-
/// <summary>
566-
/// Get the last cached low (oldest available/beginning) and high (newest/end)
567-
/// offsets for a topic/partition.
568-
/// </summary>
569-
/// <remarks>
570-
/// This method is only available on instances constructed from a Consumer
571-
/// handle. The low offset is updated periodically (if statistics.interval.ms
572-
/// is set) while the high offset is updated on each fetched message set from
573-
/// the broker. If there is no cached offset (either low or high, or both) then
574-
/// Offset.Invalid will be returned for the respective offset.
575-
/// </remarks>
576-
/// <param name="topicPartition">
577-
/// The topic/partition of interest.
578-
/// </param>
579-
/// <returns>
580-
/// The requested WatermarkOffsets (see that class for additional documentation).
581-
/// </returns>
582-
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
583-
{
584-
if (!Handle.Owner.GetType().Name.Contains("Consumer"))
585-
{
586-
throw new InvalidOperationException(
587-
"GetWatermarkOffsets is only available on AdminClient instances constructed from a Consumer handle.");
588-
}
589-
return kafkaHandle.GetWatermarkOffsets(topicPartition.Topic, topicPartition.Partition);
590-
}
591-
592-
593-
/// <summary>
594-
/// Query the Kafka cluster for low (oldest available/beginning) and high (newest/end)
595-
/// offsets for the specified topic/partition (blocking).
596-
/// </summary>
597-
/// <param name="topicPartition">
598-
/// The topic/partition of interest.
599-
/// </param>
600-
/// <param name="timeout">
601-
/// The maximum period of time the call may block.
602-
/// </param>
603-
/// <returns>
604-
/// The requested WatermarkOffsets (see that class for additional documentation).
605-
/// </returns>
606-
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
607-
=> kafkaHandle.QueryWatermarkOffsets(topicPartition.Topic, topicPartition.Partition, timeout.TotalMillisecondsAsInt());
608-
609-
610-
611565
/// <summary>
612566
/// Query the cluster for metadata.
613567
///

src/Confluent.Kafka/Consumer.cs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,45 @@ public List<TopicPartitionOffset> Position(IEnumerable<TopicPartition> partition
600600
/// </exception>
601601
public List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
602602
// TODO: use a librdkafka queue for this.
603-
=> kafkaHandle.OffsetsForTimes(timestampsToSearch, timeout.TotalMillisecondsAsInt());
603+
=> kafkaHandle.OffsetsForTimes(timestampsToSearch, timeout.TotalMillisecondsAsInt());
604+
605+
606+
/// <summary>
607+
/// Get the last cached low (oldest available/beginning) and high (newest/end)
608+
/// offsets for a topic/partition. Does not block.
609+
/// </summary>
610+
/// <remarks>
611+
/// The low offset is updated periodically (if statistics.interval.ms
612+
/// is set) while the high offset is updated on each fetched message set from
613+
/// the broker. If there is no cached offset (either low or high, or both) then
614+
/// Offset.Invalid will be returned for the respective offset.
615+
/// </remarks>
616+
/// <param name="topicPartition">
617+
/// The topic/partition of interest.
618+
/// </param>
619+
/// <returns>
620+
/// The requested WatermarkOffsets (see that class for additional documentation).
621+
/// </returns>
622+
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
623+
=> kafkaHandle.GetWatermarkOffsets(topicPartition.Topic, topicPartition.Partition);
624+
625+
626+
/// <summary>
627+
/// Query the Kafka cluster for low (oldest available/beginning) and high (newest/end)
628+
/// offsets for the specified topic/partition. This is a blocking call - always contacts
629+
/// the cluster for the required information.
630+
/// </summary>
631+
/// <param name="topicPartition">
632+
/// The topic/partition of interest.
633+
/// </param>
634+
/// <param name="timeout">
635+
/// The maximum period of time the call may block.
636+
/// </param>
637+
/// <returns>
638+
/// The requested WatermarkOffsets (see that class for additional documentation).
639+
/// </returns>
640+
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
641+
=> kafkaHandle.QueryWatermarkOffsets(topicPartition.Topic, topicPartition.Partition, timeout.TotalMillisecondsAsInt());
604642

605643

606644
/// <summary>

src/Confluent.Kafka/IAdminClient.cs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,6 @@ public interface IAdminClient : IClient
4040
GroupInfo ListGroup(string group, TimeSpan timeout);
4141

4242

43-
/// <summary>
44-
/// Refer to <see cref="Confluent.Kafka.AdminClient.GetWatermarkOffsets(TopicPartition)" />
45-
/// </summary>
46-
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition);
47-
48-
49-
/// <summary>
50-
/// Refer to <see cref="Confluent.Kafka.AdminClient.QueryWatermarkOffsets(TopicPartition, TimeSpan)" />
51-
/// </summary>
52-
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout);
53-
54-
5543
/// <summary>
5644
/// Refer to <see cref="Confluent.Kafka.AdminClient.GetMetadata(string, TimeSpan)" />
5745
/// </summary>

src/Confluent.Kafka/IConsumer.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,19 @@ public interface IConsumer<TKey, TValue> : IClient
169169
/// <summary>
170170
/// Refer to <see cref="Confluent.Kafka.Consumer{TKey,TValue}.OffsetsForTimes(IEnumerable{TopicPartitionTimestamp}, TimeSpan)" />
171171
/// </summary>
172-
List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout);
172+
List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout);
173+
174+
175+
/// <summary>
176+
/// Refer to <see cref="Confluent.Kafka.Consumer{TKey,TValue}.GetWatermarkOffsets(TopicPartition)" />
177+
/// </summary>
178+
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition);
179+
180+
181+
/// <summary>
182+
/// Refer to <see cref="Confluent.Kafka.Consumer{TKey,TValue}.QueryWatermarkOffsets(TopicPartition, TimeSpan)" />
183+
/// </summary>
184+
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout);
173185

174186

175187
/// <summary>

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Handles.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,9 @@ public static void Producer_Handles(string bootstrapServers, string singlePartit
7878
Assert.Equal(44.0, r7.Key);
7979
Assert.Equal(234.4, r7.Value);
8080

81-
var offsets = adminClient.QueryWatermarkOffsets(new TopicPartition(topic.Name, 0), TimeSpan.FromSeconds(10));
82-
Assert.Equal(0, offsets.Low);
83-
Assert.Equal(7, offsets.High);
84-
81+
var topicMetadata = adminClient.GetMetadata(singlePartitionTopic, TimeSpan.FromSeconds(10));
82+
Assert.Single(topicMetadata.Topics);
83+
8584
// implicitly check this does not throw.
8685
}
8786

test/Confluent.Kafka.IntegrationTests/Tests/WatermarkOffsets.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public static partial class Tests
2828
{
2929

3030
/// <summary>
31-
/// Tests for GetWatermarkOffsets and QueryWatermarkOffsets on producer and consumer.
31+
/// Tests for GetWatermarkOffsets and QueryWatermarkOffsets.
3232
/// </summary>
3333
[Theory, MemberData(nameof(KafkaParameters))]
3434
public static void WatermarkOffsets(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
@@ -45,17 +45,6 @@ public static void WatermarkOffsets(string bootstrapServers, string singlePartit
4545
{
4646
dr = producer.ProduceAsync(singlePartitionTopic, new Message<Null, string> { Value = testString }).Result;
4747
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); // this isn't necessary.
48-
49-
var queryOffsets = adminClient.QueryWatermarkOffsets(new TopicPartition(singlePartitionTopic, 0), TimeSpan.FromSeconds(20));
50-
Assert.NotEqual(queryOffsets.Low, Offset.Invalid);
51-
Assert.NotEqual(queryOffsets.High, Offset.Invalid);
52-
53-
// TODO: can anything be said about the high watermark offset c.f. dr.Offset?
54-
// I have seen queryOffsets.High < dr.Offset and also queryOffsets.High = dr.Offset + 1.
55-
// The former only once (or was I in error?). request.required.acks has a default value
56-
// of 1, so with only one broker, I assume the former should never happen.
57-
// Console.WriteLine($"Query Offsets: [{queryOffsets.Low} {queryOffsets.High}]. DR Offset: {dr.Offset}");
58-
Assert.True(queryOffsets.Low < queryOffsets.High);
5948
}
6049

6150
var consumerConfig = new ConsumerConfig
@@ -66,18 +55,17 @@ public static void WatermarkOffsets(string bootstrapServers, string singlePartit
6655
};
6756

6857
using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
69-
using (var adminClient = new AdminClient(consumer.Handle))
7058
{
7159
consumer.Assign(new List<TopicPartitionOffset>() { dr.TopicPartitionOffset });
7260
var record = consumer.Consume(TimeSpan.FromSeconds(10));
7361
Assert.NotNull(record.Message);
7462

75-
var getOffsets = adminClient.GetWatermarkOffsets(dr.TopicPartition);
63+
var getOffsets = consumer.GetWatermarkOffsets(dr.TopicPartition);
7664
Assert.Equal(getOffsets.Low, Offset.Invalid);
7765
// the offset of the next message to be read.
7866
Assert.Equal(getOffsets.High, dr.Offset + 1);
7967

80-
var queryOffsets = adminClient.QueryWatermarkOffsets(dr.TopicPartition, TimeSpan.FromSeconds(20));
68+
var queryOffsets = consumer.QueryWatermarkOffsets(dr.TopicPartition, TimeSpan.FromSeconds(20));
8169
Assert.NotEqual(queryOffsets.Low, Offset.Invalid);
8270
Assert.Equal(getOffsets.High, queryOffsets.High);
8371
}

0 commit comments

Comments
 (0)