Skip to content

Commit 8212364

Browse files
authored
[KIP-460] Elect leader api implemented (#2320)
1 parent dc5aff2 commit 8212364

File tree

15 files changed

+680
-1
lines changed

15 files changed

+680
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* References librdkafka.redist 2.6.0. Refer to the [librdkafka v2.6.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0) for more information.
66
* [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#2323).
7+
* [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#2320)
78

89

910
# 2.5.3

examples/AdminClient/Program.cs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,31 @@ static bool ParseListConsumerGroupsArgs(string[] commandArgs,
393393
}
394394
}
395395

396+
static Tuple<ElectionType, List<TopicPartition>> ParseElectLeadersArgs(string[] args)
397+
{
398+
if ((args.Length -1 ) % 2 != 0)
399+
{
400+
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders <electionType> <topic1> <partition1> ..");
401+
Environment.ExitCode = 1;
402+
return null;
403+
}
404+
405+
var electionType = Enum.Parse<ElectionType>(args[0]);
406+
var partitions = new List<TopicPartition>();
407+
if(args.Length == 1)
408+
{
409+
partitions = null;
410+
return Tuple.Create(electionType, partitions);
411+
}
412+
for (int i = 1; i < args.Length; i += 2)
413+
{
414+
var topic = args[i];
415+
var partition = Int32.Parse(args[i + 1]);
416+
partitions.Add(new TopicPartition(topic, partition));
417+
}
418+
return Tuple.Create(electionType, partitions);
419+
}
420+
396421
static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
397422
{
398423
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
@@ -403,6 +428,20 @@ static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsR
403428
}
404429
}
405430

431+
static void PrintElectLeaderResults(List<TopicPartitionError> topicPartitions)
432+
{
433+
Console.WriteLine($"ElectLeaders response has {topicPartitions.Count} partition(s):");
434+
foreach (var partitionResult in topicPartitions)
435+
{
436+
if (!partitionResult.Error.IsError)
437+
Console.WriteLine($"Election successful in {partitionResult.Topic} {partitionResult.Partition}");
438+
else
439+
Console.WriteLine($"Election failed in {partitionResult.Topic} {partitionResult.Partition}: " +
440+
$"Code: {partitionResult.Error.Code}" +
441+
$", Reason: {partitionResult.Error.Reason}");
442+
}
443+
}
444+
406445
static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
407446
{
408447
List<AclBinding> aclBindings;
@@ -978,6 +1017,41 @@ static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs
9781017
}
9791018
}
9801019
}
1020+
1021+
static async Task ElectLeadersAsync(string bootstrapServers, string[] commandArgs)
1022+
{
1023+
if (commandArgs.Length < 3 && (commandArgs.Length - 1) % 2 != 0)
1024+
{
1025+
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders <electionType> <topic1> <partition1> ..");
1026+
Environment.ExitCode = 1;
1027+
return;
1028+
}
1029+
1030+
var req = ParseElectLeadersArgs(commandArgs);
1031+
var electionType = req.Item1;
1032+
var partitions = req.Item2;
1033+
var timeout = TimeSpan.FromSeconds(30);
1034+
ElectLeadersOptions options = new ElectLeadersOptions() { RequestTimeout = timeout };
1035+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
1036+
{
1037+
try
1038+
{
1039+
var result = await adminClient.ElectLeadersAsync(electionType, partitions, options);
1040+
PrintElectLeaderResults(result.TopicPartitions);
1041+
1042+
}
1043+
catch (ElectLeadersException e)
1044+
{
1045+
Console.WriteLine("One or more elect leaders operations failed.");
1046+
PrintElectLeaderResults(e.Results.TopicPartitions);
1047+
}
1048+
catch (KafkaException e)
1049+
{
1050+
Console.WriteLine($"An error occurred electing leaders: {e}");
1051+
Environment.ExitCode = 1;
1052+
}
1053+
}
1054+
}
9811055
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
9821056
{
9831057
foreach (var topic in topicDescriptions)
@@ -1146,7 +1220,7 @@ public static async Task Main(string[] args)
11461220
"list-consumer-group-offsets", "alter-consumer-group-offsets",
11471221
"incremental-alter-configs", "describe-user-scram-credentials",
11481222
"alter-user-scram-credentials", "describe-topics",
1149-
"describe-cluster", "list-offsets"
1223+
"describe-cluster", "list-offsets", "elect-leaders"
11501224
}) +
11511225
" ..");
11521226
Environment.ExitCode = 1;
@@ -1210,6 +1284,9 @@ public static async Task Main(string[] args)
12101284
case "list-offsets":
12111285
await ListOffsetsAsync(bootstrapServers, commandArgs);
12121286
break;
1287+
case "elect-leaders":
1288+
await ElectLeadersAsync(bootstrapServers, commandArgs);
1289+
break;
12131290
default:
12141291
Console.WriteLine($"unknown command: {command}");
12151292
break;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Linq;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Represents an error that occured during the ElectLeaders operation.
24+
/// </summary>
25+
public class ElectLeadersException : KafkaException
26+
{
27+
/// <summary>
28+
/// Initializes a new instance of ElectLeadersException.
29+
/// </summary>
30+
/// <param name="report">
31+
/// The result of the ElectLeaders operation.
32+
/// </param>
33+
public ElectLeadersException(ElectLeadersReport report)
34+
: base(new Error(ErrorCode.Local_Partial,
35+
"Some errors occurred electing leaders: [" +
36+
string.Join(", ", report.TopicPartitions.Where(tp => tp.Error.IsError)) +
37+
"]"))
38+
{
39+
this.Results = report;
40+
}
41+
42+
/// <summary>
43+
/// Gets the results of the ElectLeaders operation.
44+
/// </summary>
45+
public ElectLeadersReport Results { get; }
46+
}
47+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the "AdminClient.ElectLeaders" method.
24+
/// </summary>
25+
public class ElectLeadersOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
37+
/// <summary>
38+
/// The broker's operation timeout - the maximum time to wait for
39+
/// ElectLeaders before returning a result to the application.
40+
/// If set to null, will return immediately upon triggering election.
41+
///
42+
/// Default: null
43+
/// </summary>
44+
public TimeSpan? OperationTimeout { get; set; }
45+
}
46+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
using System.Text;
19+
using System.Linq;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents the result of an elect leaders request (including error status).
26+
/// </summary>
27+
public class ElectLeadersReport
28+
{
29+
/// <summary>
30+
/// First error encountered in TopicPartitions.
31+
/// </summary>
32+
internal Error Error { get; set; }
33+
34+
/// <summary>
35+
/// Individual partition results.
36+
/// At least one of these will be in error.
37+
/// </summary>
38+
public List<TopicPartitionError> TopicPartitions { get; set; }
39+
40+
/// <summary>
41+
/// A Json representation of the object.
42+
/// </summary>
43+
/// <returns>
44+
/// A Json representation of the object.
45+
/// </returns>
46+
public override string ToString()
47+
{
48+
var result = new StringBuilder();
49+
result.Append($"{{\"TopicPartitions\": [");
50+
result.Append(string.Join(",", TopicPartitions.Select(b => $" {b.ToString()}")));
51+
result.Append($"]}}");
52+
return result.ToString();
53+
}
54+
}
55+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
using System.Text;
19+
using System.Linq;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Result information for all Partitions queried
26+
/// in an ElectLeaderRequest.
27+
/// </summary>
28+
public class ElectLeadersResult
29+
{
30+
/// <summary>
31+
/// Individual partition results.
32+
/// </summary>
33+
public List<TopicPartitionError> TopicPartitions { get; set; }
34+
35+
/// <summary>
36+
/// A Json representation of the object.
37+
/// </summary>
38+
/// <returns>
39+
/// A Json representation of the object.
40+
/// </returns>
41+
public override string ToString()
42+
{
43+
var result = new StringBuilder();
44+
result.Append($"{{ \"TopicPartitions\": [");
45+
result.Append(string.Join(",", TopicPartitions.Select(b => $" {b.ToString()}")));
46+
result.Append($"]}}");
47+
return result.ToString();
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)