Skip to content

Commit 4ff4e29

Browse files
PrasanthV454emasabanchitj
authored
Incremental-alter-configs-implementation(KIP-339) (#2005)
requires broker version >= 2.3.0 --------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Anchit Jain <[email protected]>
1 parent 5a7a681 commit 4ff4e29

17 files changed

+711
-20
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# 2.2.0
22

3+
## Enhancements
4+
5+
- References librdkafka.redist 2.2.0. Refer to the [librdkafka v2.2.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0) for more information.
6+
- [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
7+
IncrementalAlterConfigs API (#2005).
8+
39
## Fixes
410

511
- Fix backwards compatability of TopicPartitionOffset constructor. ([drinehimer](https://github.com/drinehimer), #2066)

examples/AdminClient/Program.cs

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ static async Task AlterConsumerGroupOffsetsAsync(string bootstrapServers, string
306306

307307
var group = commandArgs[0];
308308
var tpoes = new List<TopicPartitionOffset>();
309-
for (int i = 1; i + 2 < commandArgs.Length; i += 3) {
309+
for (int i = 1; i + 2 < commandArgs.Length; i += 3)
310+
{
310311
try
311312
{
312313
var topic = commandArgs[i];
@@ -330,7 +331,8 @@ static async Task AlterConsumerGroupOffsetsAsync(string bootstrapServers, string
330331
{
331332
var results = await adminClient.AlterConsumerGroupOffsetsAsync(input);
332333
Console.WriteLine("Successfully altered offsets:");
333-
foreach(var groupResult in results) {
334+
foreach(var groupResult in results)
335+
{
334336
Console.WriteLine(groupResult);
335337
}
336338

@@ -363,7 +365,8 @@ static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[
363365

364366
var group = commandArgs[0];
365367
var tpes = new List<TopicPartition>();
366-
for (int i = 1; i + 1 < commandArgs.Length; i += 2) {
368+
for (int i = 1; i + 1 < commandArgs.Length; i += 2)
369+
{
367370
try
368371
{
369372
var topic = commandArgs[i];
@@ -391,7 +394,8 @@ static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[
391394
{
392395
var result = await adminClient.ListConsumerGroupOffsetsAsync(input);
393396
Console.WriteLine("Successfully listed offsets:");
394-
foreach(var groupResult in result) {
397+
foreach(var groupResult in result)
398+
{
395399
Console.WriteLine(groupResult);
396400
}
397401
}
@@ -412,7 +416,8 @@ static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[
412416
}
413417
}
414418

415-
static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) {
419+
static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
420+
{
416421
var timeout = TimeSpan.FromSeconds(30);
417422
var statesList = new List<ConsumerGroupState>();
418423
try
@@ -423,7 +428,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
423428
}
424429
if (commandArgs.Length > 1)
425430
{
426-
for (int i = 1; i < commandArgs.Length; i++) {
431+
for (int i = 1; i < commandArgs.Length; i++)
432+
{
427433
statesList.Add(Enum.Parse<ConsumerGroupState>(commandArgs[i]));
428434
}
429435
}
@@ -458,7 +464,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
458464
}
459465

460466

461-
static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) {
467+
static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
468+
{
462469
if (commandArgs.Length < 1)
463470
{
464471
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]");
@@ -501,6 +508,96 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
501508
}
502509
}
503510
}
511+
512+
static async Task IncrementalAlterConfigsAsync(string bootstrapServers, string[] commandArgs)
513+
{
514+
var timeout = TimeSpan.FromSeconds(30);
515+
var configResourceList = new Dictionary<ConfigResource, List<ConfigEntry>>();
516+
try
517+
{
518+
if (commandArgs.Length > 0)
519+
{
520+
timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0]));
521+
}
522+
if (((commandArgs.Length - 1) % 3) != 0)
523+
{
524+
throw new ArgumentException("invalid arguments length");
525+
}
526+
527+
for (int i = 1; i < commandArgs.Length; i+=3)
528+
{
529+
var resourceType = Enum.Parse<ResourceType>(commandArgs[i]);
530+
var resourceName = commandArgs[i + 1];
531+
var configs = commandArgs[i + 2];
532+
var configList = new List<ConfigEntry>();
533+
foreach (var config in configs.Split(";"))
534+
{
535+
var nameOpValue = config.Split("=");
536+
if (nameOpValue.Length != 2)
537+
{
538+
throw new ArgumentException($"invalid alteration name \"{config}\"");
539+
}
540+
541+
var name = nameOpValue[0];
542+
var opValue = nameOpValue[1].Split(":");
543+
if (opValue.Length != 2)
544+
{
545+
throw new ArgumentException($"invalid alteration value \"{nameOpValue[1]}\"");
546+
}
547+
548+
var op = Enum.Parse<AlterConfigOpType>(opValue[0]);
549+
var value = opValue[1];
550+
configList.Add(new ConfigEntry
551+
{
552+
Name = name,
553+
Value = value,
554+
IncrementalOperation = op
555+
});
556+
}
557+
var resource = new ConfigResource
558+
{
559+
Name = resourceName,
560+
Type = resourceType
561+
};
562+
configResourceList[resource] = configList;
563+
}
564+
}
565+
catch (Exception e) when (
566+
e is ArgumentException ||
567+
e is FormatException
568+
)
569+
{
570+
Console.WriteLine($"error: {e.Message}");
571+
Console.WriteLine("usage: .. <bootstrapServers> incremental-alter-configs [<timeout_seconds> <resource-type1> <resource-name1> <config-name1=op-type1:config-value1;config-name1=op-type1:config-value1> ...]");
572+
Environment.ExitCode = 1;
573+
return;
574+
}
575+
576+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
577+
{
578+
try
579+
{
580+
var alterResultList = await adminClient.IncrementalAlterConfigsAsync(configResourceList, new IncrementalAlterConfigsOptions() { RequestTimeout = timeout });
581+
foreach (var alterResult in alterResultList)
582+
{
583+
Console.WriteLine($"Resource {alterResult.ConfigResource} altered correctly");
584+
}
585+
}
586+
catch (IncrementalAlterConfigsException e)
587+
{
588+
foreach (var alterResult in e.Results)
589+
{
590+
Console.WriteLine($"Resource {alterResult.ConfigResource} had error: {alterResult.Error}");
591+
}
592+
Environment.ExitCode = 1;
593+
}
594+
catch (Exception e)
595+
{
596+
Console.WriteLine($"An error occurred altering configs incrementally: {e.Message}");
597+
Environment.ExitCode = 1;
598+
}
599+
}
600+
}
504601

505602
public static async Task Main(string[] args)
506603
{
@@ -511,7 +608,8 @@ public static async Task Main(string[] args)
511608
"list-groups", "metadata", "library-version", "create-topic", "create-acls",
512609
"describe-acls", "delete-acls",
513610
"list-consumer-groups", "describe-consumer-groups",
514-
"list-consumer-group-offsets", "alter-consumer-group-offsets"
611+
"list-consumer-group-offsets", "alter-consumer-group-offsets",
612+
"incremental-alter-configs"
515613
}) +
516614
" ..");
517615
Environment.ExitCode = 1;
@@ -557,6 +655,9 @@ public static async Task Main(string[] args)
557655
case "describe-consumer-groups":
558656
await DescribeConsumerGroupsAsync(bootstrapServers, commandArgs);
559657
break;
658+
case "incremental-alter-configs":
659+
await IncrementalAlterConfigsAsync(bootstrapServers, commandArgs);
660+
break;
560661
default:
561662
Console.WriteLine($"unknown command: {command}");
562663
break;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2018 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka.Admin
19+
{
20+
/// <summary>
21+
/// Enum of allowed AlterConfigOpType.
22+
/// </summary>
23+
public enum AlterConfigOpType : int
24+
{
25+
/// <summary>
26+
/// Sets/overwrites the configuration value.
27+
/// </summary>
28+
Set = 0,
29+
30+
/// <summary>
31+
/// Sets the configuration value to default or NULL.
32+
/// </summary>
33+
Delete = 1,
34+
35+
/// <summary>
36+
/// Appends the value to existing configuration values(only for list type values).
37+
/// </summary>
38+
Append = 2,
39+
40+
/// <summary>
41+
/// Subtracts the value from existing configuration values(only for list type values).
42+
/// </summary>
43+
Subtract = 3,
44+
};
45+
}

src/Confluent.Kafka/Admin/ConfigEntry.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,10 @@ public class ConfigEntry
3131
/// The config value.
3232
/// </summary>
3333
public string Value { get; set; }
34+
35+
/// <summary>
36+
/// Incremental operation to perform.
37+
/// </summary>
38+
public AlterConfigOpType IncrementalOperation { get; set; }
3439
}
3540
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Linq;
19+
using System.Collections.Generic;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents an error that occured during an incremental alter configs request.
26+
/// </summary>
27+
public class IncrementalAlterConfigsException : KafkaException
28+
{
29+
/// <summary>
30+
/// Initializes a new instance of IncrementalAlterConfigsException.
31+
/// </summary>
32+
/// <param name="results">
33+
/// The result corresponding to all ConfigResources in the request
34+
/// (whether or not they were in error). At least one of these
35+
/// results will be in error.
36+
/// </param>
37+
public IncrementalAlterConfigsException(List<IncrementalAlterConfigsReport> results)
38+
: base(new Error(ErrorCode.Local_Partial,
39+
"An error occurred incremental altering the following resources: [" +
40+
String.Join(", ", results.Where(r => r.Error.IsError).Select(r => r.ConfigResource)) +
41+
"]: [" + String.Join(", ", results.Where(r => r.Error.IsError).Select(r => r.Error)) +
42+
"]."))
43+
{
44+
Results = results;
45+
}
46+
47+
/// <summary>
48+
/// The result corresponding to all ConfigResources in the request
49+
/// (whether or not they were in error). At least one of these
50+
/// results will be in error.
51+
/// </summary>
52+
public List<IncrementalAlterConfigsReport> Results { get; }
53+
}
54+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the IncrementalAlterConfigs method.
24+
/// </summary>
25+
public class IncrementalAlterConfigsOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
37+
/// <summary>
38+
/// If true, the request should be validated only without altering
39+
/// the configs.
40+
///
41+
/// Default: false
42+
/// </summary>
43+
public bool ValidateOnly { get; set; } = false;
44+
}
45+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka.Admin
19+
{
20+
/// <summary>
21+
/// The result of incremental alter config request for a specific resource,
22+
/// when an error occurred.
23+
/// </summary>
24+
public class IncrementalAlterConfigsReport
25+
{
26+
/// <summary>
27+
/// The resource the result corresponds to.
28+
/// </summary>
29+
public ConfigResource ConfigResource;
30+
31+
/// <summary>
32+
/// The error (or success) of the incremental alter config request.
33+
/// </summary>
34+
public Error Error { get; set; }
35+
}
36+
}

0 commit comments

Comments
 (0)