Skip to content

Commit 07de95e

Browse files
mahajanadhityaemasabanchitj
authored
[KIP-396] ListOffsets (#2086)
--------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Anchit Jain <[email protected]>
1 parent 58b5291 commit 07de95e

18 files changed

+861
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
## Enhancements
44

55
- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
6+
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
7+
Return authorized operations in describe responses (#2021, @jainruchir).
8+
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484): Added support for ListOffsets Admin API (#2086).
69
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
710
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
811
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
9-
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
10-
Return authorized operations in describe responses (#2021, @jainruchir).
1112

1213

1314
# 2.2.0

examples/AdminClient/Program.cs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,87 @@ static List<UserScramCredentialAlteration> ParseUserScramCredentialAlterations(
242242
return alterations;
243243
}
244244

245+
static Tuple<IsolationLevel, List<TopicPartitionOffsetSpec>> ParseListOffsetsArgs(string[] args)
246+
{
247+
if (args.Length == 0)
248+
{
249+
Console.WriteLine("usage: .. <bootstrapServers> list-offsets <isolation_level> " +
250+
"<topic1> <partition1> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
251+
Environment.ExitCode = 1;
252+
return null;
253+
}
254+
255+
var isolationLevel = Enum.Parse<IsolationLevel>(args[0]);
256+
var topicPartitionOffsetSpecs = new List<TopicPartitionOffsetSpec>();
257+
for (int i = 1; i < args.Length;)
258+
{
259+
if (args.Length < i+3)
260+
{
261+
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
262+
}
263+
264+
string topic = args[i];
265+
var partition = Int32.Parse(args[i + 1]);
266+
var offsetSpec = args[i + 2];
267+
if (offsetSpec == "TIMESTAMP")
268+
{
269+
if (args.Length < i+4)
270+
{
271+
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
272+
}
273+
274+
var timestamp = Int64.Parse(args[i + 3]);
275+
i = i + 1;
276+
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
277+
{
278+
TopicPartition = new TopicPartition(topic, new Partition(partition)),
279+
OffsetSpec = OffsetSpec.ForTimestamp(timestamp)
280+
});
281+
}
282+
else if (offsetSpec == "MAX_TIMESTAMP")
283+
{
284+
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
285+
{
286+
TopicPartition = new TopicPartition(topic, new Partition(partition)),
287+
OffsetSpec = OffsetSpec.MaxTimestamp()
288+
});
289+
}
290+
else if (offsetSpec == "EARLIEST")
291+
{
292+
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
293+
{
294+
TopicPartition = new TopicPartition(topic, new Partition(partition)),
295+
OffsetSpec = OffsetSpec.Earliest()
296+
});
297+
}
298+
else if (offsetSpec == "LATEST")
299+
{
300+
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
301+
{
302+
TopicPartition = new TopicPartition(topic, new Partition(partition)),
303+
OffsetSpec = OffsetSpec.Latest()
304+
});
305+
}
306+
else
307+
{
308+
throw new ArgumentException(
309+
"offsetSpec can be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP T1.");
310+
}
311+
i = i + 3;
312+
}
313+
return Tuple.Create(isolationLevel, topicPartitionOffsetSpecs);
314+
}
315+
316+
static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
317+
{
318+
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
319+
{
320+
Console.WriteLine(" ListOffsetsResultInfo:");
321+
Console.WriteLine($" TopicPartitionOffsetError: {listOffsetsResultInfo.TopicPartitionOffsetError}");
322+
Console.WriteLine($" Timestamp: {listOffsetsResultInfo.Timestamp}");
323+
}
324+
}
325+
245326
static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
246327
{
247328
List<AclBinding> aclBindings;
@@ -793,6 +874,38 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
793874
}
794875
}
795876

877+
static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) {
878+
879+
var listOffsetsArgs = ParseListOffsetsArgs(commandArgs);
880+
if (listOffsetsArgs == null) { return; }
881+
882+
var isolationLevel = listOffsetsArgs.Item1;
883+
var topicPartitionOffsets = listOffsetsArgs.Item2;
884+
885+
var timeout = TimeSpan.FromSeconds(30);
886+
ListOffsetsOptions options = new ListOffsetsOptions(){ RequestTimeout = timeout, IsolationLevel = isolationLevel };
887+
888+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
889+
{
890+
try
891+
{
892+
var listOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsets, options);
893+
Console.WriteLine("ListOffsetsResult:");
894+
PrintListOffsetsResultInfos(listOffsetsResult.ListOffsetsResultInfos);
895+
}
896+
catch (ListOffsetsException e)
897+
{
898+
Console.WriteLine("ListOffsetsReport:");
899+
Console.WriteLine($" Error: {e.Error}");
900+
PrintListOffsetsResultInfos(e.Result.ListOffsetsResultInfos);
901+
}
902+
catch (KafkaException e)
903+
{
904+
Console.WriteLine($"An error occurred listing offsets: {e}");
905+
Environment.ExitCode = 1;
906+
}
907+
}
908+
}
796909
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
797910
{
798911
foreach (var topic in topicDescriptions)
@@ -956,12 +1069,11 @@ public static async Task Main(string[] args)
9561069
Console.WriteLine(
9571070
"usage: .. <bootstrapServers> " + String.Join("|", new string[] {
9581071
"list-groups", "metadata", "library-version", "create-topic", "create-acls",
959-
"describe-acls", "delete-acls",
9601072
"list-consumer-groups", "describe-consumer-groups",
9611073
"list-consumer-group-offsets", "alter-consumer-group-offsets",
9621074
"incremental-alter-configs", "describe-user-scram-credentials",
9631075
"alter-user-scram-credentials", "describe-topics",
964-
"describe-cluster"
1076+
"describe-cluster", "list-offsets"
9651077
}) +
9661078
" ..");
9671079
Environment.ExitCode = 1;
@@ -1022,6 +1134,9 @@ public static async Task Main(string[] args)
10221134
case "describe-cluster":
10231135
await DescribeClusterAsync(bootstrapServers, commandArgs);
10241136
break;
1137+
case "list-offsets":
1138+
await ListOffsetsAsync(bootstrapServers, commandArgs);
1139+
break;
10251140
default:
10261141
Console.WriteLine($"unknown command: {command}");
10271142
break;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka.Admin
19+
{
20+
/// <summary>
21+
/// Represents an error occurred while listing offsets.
22+
/// </summary>
23+
public class ListOffsetsException : KafkaException
24+
{
25+
/// <summary>
26+
/// Initializes a new instance of ListOffsetsException.
27+
/// </summary>
28+
/// <param name="result">
29+
/// The result corresponding to all partitions in the request
30+
/// (whether or not they were in error). At least one of these
31+
/// topic partiton in result will be in error.
32+
/// </param>
33+
public ListOffsetsException(ListOffsetsReport result)
34+
: base(new Error(ErrorCode.Local_Partial,
35+
"An error occurred in list offsets, check individual topic partiton in result."))
36+
{
37+
Result = result;
38+
}
39+
40+
/// <summary>
41+
/// The result corresponding to all partitions in the request
42+
/// (whether or not they were in error). At least one of these
43+
/// results will be in error.
44+
/// </summary>
45+
public ListOffsetsReport Result { get; }
46+
}
47+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the "AdminClient.ListOffsetsAsync" method.
24+
/// </summary>
25+
public class ListOffsetsOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
37+
/// <summary>
38+
/// Isolation level to fetch the offset with.
39+
/// Applies to the whole request.
40+
///
41+
/// Default: ReadUncommitted
42+
/// </summary>
43+
public IsolationLevel IsolationLevel { get; set; } = IsolationLevel.ReadUncommitted;
44+
}
45+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
using System.Text;
19+
using System.Linq;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents the result of a ListOffsets request (including error status).
26+
/// </summary>
27+
public class ListOffsetsReport
28+
{
29+
/// <summary>
30+
/// Result information for all the partitions queried
31+
/// with ListOffsets. At least one of these
32+
/// results will be in error.
33+
/// </summary>
34+
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }
35+
36+
/// <summary>
37+
/// Operation error status.
38+
/// </summary>
39+
public Error Error { get; set; }
40+
41+
/// <summary>
42+
/// Returns a JSON representation of the object.
43+
/// </summary>
44+
/// <returns>
45+
/// A JSON representation of the object.
46+
/// </returns>
47+
public override string ToString()
48+
{
49+
var result = new StringBuilder();
50+
result.Append($"{{\"ListOffsetsResultInfos\": [");
51+
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
52+
result.Append($"], \"Error\": \"{Error.Code}\"}}");
53+
return result.ToString();
54+
}
55+
}
56+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
using System.Text;
19+
using System.Linq;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents the result of a ListOffsets request.
26+
/// </summary>
27+
public class ListOffsetsResult
28+
{
29+
/// <summary>
30+
/// Result information for all the partitions queried
31+
/// with ListOffsets.
32+
/// </summary>
33+
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }
34+
35+
36+
/// <summary>
37+
/// Returns a JSON representation of the object.
38+
/// </summary>
39+
/// <returns>
40+
/// A JSON representation of the object.
41+
/// </returns>
42+
public override string ToString()
43+
{
44+
var result = new StringBuilder();
45+
result.Append($"{{\"ListOffsetsResultInfos\": [");
46+
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
47+
result.Append("]}");
48+
return result.ToString();
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)