Skip to content

Commit ae25659

Browse files
3schwartzMatt Howlett
andauthored
Admin delete groups functionality (#1831)
* Add admin client functionality to delete consumer groups * Corrected spelling * Corrected line ending * Resolved comments * Fix file line ending * Fix line ending * Small test alignment and remove deletion of option pointer * Fix copyright header * Aligned test * Avoid destroy pointer twice * Fix comments * Remove spaces * Remove obsolete sleep * Removed unused * Change to plural * Plural fixes Co-authored-by: Matt Howlett <[email protected]>
1 parent 5203939 commit ae25659

File tree

12 files changed

+517
-12
lines changed

12 files changed

+517
-12
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the DeleteGroups method.
24+
/// </summary>
25+
public class DeleteGroupsOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
37+
/// <summary>
38+
/// The broker's operation timeout - the maximum time to wait for
39+
/// DeleteRecordsAsync before returning a result to the application.
40+
/// If set to null, will return immediately upon triggering record
41+
/// deletion.
42+
///
43+
/// Default: null
44+
/// </summary>
45+
public TimeSpan? OperationTimeout { get; set; }
46+
}
47+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
namespace Confluent.Kafka.Admin
18+
{
19+
/// <summary>
20+
/// The result of a request related to a group.
21+
/// </summary>
22+
public class DeleteGroupReport
23+
{
24+
/// <summary>
25+
/// The group.
26+
/// </summary>
27+
public string Group { get; set; }
28+
29+
/// <summary>
30+
/// The error (or success) of the group relevant for the request.
31+
/// </summary>
32+
public Error Error { get; set; }
33+
}
34+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Linq;
19+
using System.Collections.Generic;
20+
21+
22+
namespace Confluent.Kafka.Admin
23+
{
24+
/// <summary>
25+
/// Represents an error that occured during a delete groups request.
26+
/// </summary>
27+
public class DeleteGroupsException : KafkaException
28+
{
29+
/// <summary>
30+
/// Initializes a new DeleteGroupsException.
31+
/// </summary>
32+
/// <param name="results">
33+
/// The result corresponding to all groups in the request
34+
/// (whether or not they were in error). At least one of these
35+
/// results will be in error.
36+
/// </param>
37+
public DeleteGroupsException(List<DeleteGroupReport> results)
38+
: base(new Error(ErrorCode.Local_Partial,
39+
"An error occurred deleting groups: [" +
40+
String.Join(", ", results.Where(r => r.Error.IsError).Select(r => r.Group)) +
41+
"]: [" + String.Join(", ", results.Where(r => r.Error.IsError).Select(r => r.Error)) +
42+
"]."))
43+
{
44+
Results = results;
45+
}
46+
47+
/// <summary>
48+
/// The result corresponding to all groups in the request
49+
/// (whether or not they were in error). At least one of these
50+
/// results will be in error.
51+
/// </summary>
52+
public List<DeleteGroupReport> Results { get; }
53+
}
54+
}

src/Confluent.Kafka/AdminClient.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,20 @@ private List<DescribeConfigsReport> extractResultConfigs(IntPtr configResourcesP
123123
return result;
124124
}
125125

126+
private static List<DeleteGroupReport> extractDeleteGroupsReport(IntPtr eventPtr)
127+
{
128+
IntPtr groupsResultPtr = Librdkafka.DeleteGroups_result_groups(eventPtr, out UIntPtr resultCountPtr);
129+
int groupsResultCount = (int)resultCountPtr;
130+
IntPtr[] groupsResultPtrArr = new IntPtr[groupsResultCount];
131+
Marshal.Copy(groupsResultPtr, groupsResultPtrArr, 0, groupsResultCount);
132+
133+
return groupsResultPtrArr.Select(groupResultPtr => new DeleteGroupReport
134+
{
135+
Group = PtrToStringUTF8(Librdkafka.group_result_name(groupResultPtr)),
136+
Error = new Error(Librdkafka.group_result_error(groupResultPtr), false)
137+
}).ToList();
138+
}
139+
126140
private List<CreateAclReport> extractCreateAclReports(IntPtr aclResultsPtr, int aclResultsCount)
127141
{
128142
IntPtr[] aclsResultsPtrArr = new IntPtr[aclResultsCount];
@@ -289,6 +303,32 @@ private Task StartPollTask(CancellationToken ct)
289303
}
290304
break;
291305

306+
case Librdkafka.EventType.DeleteGroups_Result:
307+
{
308+
if (errorCode != ErrorCode.NoError)
309+
{
310+
Task.Run(() =>
311+
((TaskCompletionSource<List<DeleteGroupReport>>)adminClientResult).TrySetException(
312+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
313+
break;
314+
}
315+
316+
var result = extractDeleteGroupsReport(eventPtr);
317+
318+
if(result.Any(r => r.Error.IsError))
319+
{
320+
Task.Run(() =>
321+
((TaskCompletionSource<List<DeleteGroupReport>>)adminClientResult).TrySetException(
322+
new DeleteGroupsException(result)));
323+
}
324+
else
325+
{
326+
Task.Run(() =>
327+
((TaskCompletionSource<List<DeleteGroupReport>>)adminClientResult).TrySetResult(result));
328+
}
329+
}
330+
break;
331+
292332
case Librdkafka.EventType.CreatePartitions_Result:
293333
{
294334
if (errorCode != ErrorCode.NoError)
@@ -527,6 +567,7 @@ private Task StartPollTask(CancellationToken ct)
527567
{ Librdkafka.EventType.AlterConfigs_Result, typeof(TaskCompletionSource<List<AlterConfigsReport>>) },
528568
{ Librdkafka.EventType.CreatePartitions_Result, typeof(TaskCompletionSource<List<CreatePartitionsReport>>) },
529569
{ Librdkafka.EventType.DeleteRecords_Result, typeof(TaskCompletionSource<List<DeleteRecordsResult>>) },
570+
{ Librdkafka.EventType.DeleteGroups_Result, typeof(TaskCompletionSource<List<DeleteGroupReport>>) },
530571
{ Librdkafka.EventType.CreateAcls_Result, typeof(TaskCompletionSource<Null>) },
531572
{ Librdkafka.EventType.DescribeAcls_Result, typeof(TaskCompletionSource<DescribeAclsResult>) },
532573
{ Librdkafka.EventType.DeleteAcls_Result, typeof(TaskCompletionSource<List<DeleteAclsResult>>) },
@@ -603,6 +644,19 @@ public Task DeleteTopicsAsync(IEnumerable<string> topics, DeleteTopicsOptions op
603644
return completionSource.Task;
604645
}
605646

647+
/// <summary>
648+
/// Refer to <see cref="Confluent.Kafka.IAdminClient.DeleteGroupsAsync(IList{string}, DeleteGroupsOptions)" />
649+
/// </summary>
650+
public Task DeleteGroupsAsync(IList<string> groups, DeleteGroupsOptions options = null)
651+
{
652+
var completionSource = new TaskCompletionSource<List<DeleteGroupReport>>();
653+
var gch = GCHandle.Alloc(completionSource);
654+
Handle.LibrdkafkaHandle.DeleteGroups(
655+
groups, options, resultQueue,
656+
GCHandle.ToIntPtr(gch));
657+
return completionSource.Task;
658+
}
659+
606660
/// <summary>
607661
/// Refer to <see cref="Confluent.Kafka.IAdminClient.CreatePartitionsAsync(IEnumerable{PartitionsSpecification}, CreatePartitionsOptions)" />
608662
/// </summary>

src/Confluent.Kafka/IAdminClient.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,19 @@ public interface IAdminClient : IClient
102102
Task CreatePartitionsAsync(
103103
IEnumerable<PartitionsSpecification> partitionsSpecifications, CreatePartitionsOptions options = null);
104104

105+
/// <summary>
106+
/// Delete a set of groups.
107+
/// </summary>
108+
/// <param name="groups">
109+
/// The group names to delete.
110+
/// </param>
111+
/// <param name="options">
112+
/// The options to use when deleting groups.
113+
/// </param>
114+
/// <returns>
115+
/// The results of the delete group requests.
116+
/// </returns>
117+
Task DeleteGroupsAsync(IList<string> groups, DeleteGroupsOptions options = null);
105118

106119
/// <summary>
107120
/// Delete a set of topics. This operation is not

src/Confluent.Kafka/Impl/LibRdKafka.cs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,12 @@ internal enum AdminOp
5959
CreatePartitions= 3,
6060
AlterConfigs = 4,
6161
DescribeConfigs = 5,
62+
DeleteRecords = 6,
63+
DeleteGroups = 7,
64+
DeleteConsumerGroupOffsets = 8,
6265
CreateAcls = 9,
6366
DescribeAcls = 10,
64-
DeleteAcls = 11,
67+
DeleteAcls = 11
6568
}
6669

6770
public enum EventType : int
@@ -279,6 +282,12 @@ static bool SetDelegates(Type nativeMethodsClass)
279282
_DeleteTopics = (Action<IntPtr, IntPtr[], UIntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_DeleteTopics").CreateDelegate(typeof(Action<IntPtr, IntPtr[], UIntPtr, IntPtr, IntPtr>));
280283
_DeleteTopics_result_topics = (_DeleteTopics_result_topics_delegate)methods.Single(m => m.Name == "rd_kafka_DeleteTopics_result_topics").CreateDelegate(typeof(_DeleteTopics_result_topics_delegate));
281284

285+
_DeleteGroup_new = (Func<string, IntPtr>)methods.Single(m => m.Name == "rd_kafka_DeleteGroup_new").CreateDelegate(typeof(Func<string, IntPtr>));
286+
_DeleteGroup_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_DeleteGroup_destroy").CreateDelegate(typeof(Action<IntPtr>));
287+
288+
_DeleteGroups = (Action<IntPtr, IntPtr[], UIntPtr, IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_DeleteGroups").CreateDelegate(typeof(Action<IntPtr, IntPtr[], UIntPtr, IntPtr, IntPtr>));
289+
_DeleteGroups_result_groups = (_DeleteGroups_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_DeleteGroups_result_groups").CreateDelegate(typeof(_DeleteGroups_result_groups_delegate));
290+
282291
_DeleteRecords_new = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_DeleteRecords_new").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
283292
_DeleteRecords_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_DeleteRecords_destroy").CreateDelegate(typeof(Action<IntPtr>));
284293

@@ -346,6 +355,9 @@ static bool SetDelegates(Type nativeMethodsClass)
346355
_topic_result_error_string = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_topic_result_error_string").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
347356
_topic_result_name = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_topic_result_name").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
348357

358+
_group_result_name = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_group_result_name").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
359+
_group_result_error = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_group_result_error").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
360+
349361
_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_destroy").CreateDelegate(typeof(Action<IntPtr>));
350362
_destroy_flags = (Action<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_destroy_flags").CreateDelegate(typeof(Action<IntPtr, IntPtr>));
351363

@@ -1201,6 +1213,28 @@ out UIntPtr cntp
12011213
) => _DeleteTopics_result_topics(result, out cntp);
12021214

12031215

1216+
private static Func<string, IntPtr> _DeleteGroup_new;
1217+
internal static IntPtr DeleteGroup_new(
1218+
string group) => _DeleteGroup_new(group);
1219+
1220+
private static Action<IntPtr> _DeleteGroup_destroy;
1221+
internal static void DeleteGroup_destroy(IntPtr del_group) => _DeleteGroup_destroy(del_group);
1222+
1223+
private static Action<IntPtr, IntPtr[], UIntPtr, IntPtr, IntPtr> _DeleteGroups;
1224+
internal static void DeleteGroups(
1225+
IntPtr rk,
1226+
IntPtr[] del_groups,
1227+
UIntPtr del_groups_cnt,
1228+
IntPtr options,
1229+
IntPtr rkqu) => _DeleteGroups(rk, del_groups, del_groups_cnt, options, rkqu);
1230+
1231+
private delegate IntPtr _DeleteGroups_result_groups_delegate(IntPtr result, out UIntPtr cntp);
1232+
private static _DeleteGroups_result_groups_delegate _DeleteGroups_result_groups;
1233+
internal static IntPtr DeleteGroups_result_groups(
1234+
IntPtr result,
1235+
out UIntPtr cntp) => _DeleteGroups_result_groups(result, out cntp);
1236+
1237+
12041238
private static Func<string, UIntPtr, StringBuilder, UIntPtr, IntPtr> _NewPartitions_new;
12051239
internal static IntPtr NewPartitions_new(
12061240
string topic,
@@ -1540,6 +1574,12 @@ out UIntPtr matchingAclsCntp
15401574
private static Func<IntPtr, IntPtr> _topic_result_name;
15411575
internal static IntPtr topic_result_name(IntPtr topicres) => _topic_result_name(topicres);
15421576

1577+
private static Func<IntPtr, IntPtr> _group_result_name;
1578+
internal static IntPtr group_result_name(IntPtr groupres) => _group_result_name(groupres);
1579+
1580+
private static Func<IntPtr, IntPtr> _group_result_error;
1581+
internal static IntPtr group_result_error(IntPtr groupres) => _group_result_error(groupres);
1582+
15431583

15441584
//
15451585
// Queues

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,30 @@ internal static extern IntPtr rd_kafka_DeleteTopics_result_topics(
571571
);
572572

573573

574+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
575+
internal static extern /* rd_kafka_DeleteGroup_t * */ IntPtr rd_kafka_DeleteGroup_new(
576+
[MarshalAs(UnmanagedType.LPStr)] string group
577+
);
578+
579+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
580+
internal static extern void rd_kafka_DeleteGroup_destroy(
581+
/* rd_kafka_DeleteGroup_t * */ IntPtr del_group);
582+
583+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
584+
internal static extern void rd_kafka_DeleteGroups(
585+
/* rd_kafka_t * */ IntPtr rk,
586+
/* rd_kafka_DeleteGroup_t ** */ IntPtr[] del_groups,
587+
UIntPtr del_group_cnt,
588+
/* rd_kafka_AdminOptions_t * */ IntPtr options,
589+
/* rd_kafka_queue_t * */ IntPtr rkqu);
590+
591+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
592+
internal static extern IntPtr rd_kafka_DeleteGroups_result_groups(
593+
/* rd_kafka_DeleteGroups_result_t * */ IntPtr result,
594+
/* size_t * */ out UIntPtr cntp
595+
);
596+
597+
574598
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
575599
internal static extern /* rd_kafka_DeleteRecords_t * */ IntPtr rd_kafka_DeleteRecords_new(
576600
/* rd_kafka_topic_partition_list_t * */ IntPtr offsets
@@ -862,6 +886,13 @@ internal static extern IntPtr rd_kafka_DeleteAcls_result_response_matching_acls(
862886
internal static extern IntPtr rd_kafka_topic_result_name(IntPtr topicres);
863887

864888

889+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
890+
internal static extern IntPtr rd_kafka_group_result_name(IntPtr groupres);
891+
892+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
893+
internal static extern IntPtr rd_kafka_group_result_error(IntPtr groupres);
894+
895+
865896
//
866897
// Queues
867898
//

0 commit comments

Comments
 (0)