Skip to content

Commit 196e1b5

Browse files
authored
Add topic id to describe topics response (#2121)
Added topic id to describe topics response
1 parent 07de95e commit 196e1b5

File tree

11 files changed

+215
-2
lines changed

11 files changed

+215
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
77
Return authorized operations in describe responses (#2021, @jainruchir).
88
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484): Added support for ListOffsets Admin API (#2086).
9+
- Add `Rack` to the `Node` type, so AdminAPI calls can expose racks for brokers (currently, all Describe
10+
Responses) (#2021, @jainruchir).
911
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
1012
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
1113
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).

examples/AdminClient/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, boo
911911
foreach (var topic in topicDescriptions)
912912
{
913913
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}");
914+
Console.WriteLine($" Topic Id: {topic.TopicId}");
914915
Console.WriteLine($" Partitions:");
915916
foreach (var partition in topic.Partitions)
916917
{

src/Confluent.Kafka/Admin/TopicDescription.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public class TopicDescription
3232
/// </summary>
3333
public string Name { get; set; }
3434

35+
/// <summary>
36+
/// The topic Id.
37+
/// </summary>
38+
public Uuid TopicId {get; set; }
39+
3540
/// <summary>
3641
/// Error, if any, of topic reported by the broker
3742
/// </summary>
@@ -77,6 +82,10 @@ public override string ToString()
7782
}
7883

7984
result.Append($"{{\"Name\": {Name.Quote()}");
85+
if (TopicId != null)
86+
{
87+
result.Append($", \"TopicId\": {TopicId.ToString().Quote()}");
88+
}
8089
result.Append($", \"Error\": \"{Error.Code}\", \"IsInternal\": {IsInternal.Quote()}");
8190
result.Append($", \"Partitions\": [{partitions}], \"AuthorizedOperations\": {authorizedOperations}}}");
8291
return result.ToString();

src/Confluent.Kafka/AdminClient.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ private DescribeTopicsReport extractDescribeTopicsResults(IntPtr resultPtr)
486486
{
487487

488488
var topicName = PtrToStringUTF8(Librdkafka.TopicDescription_name(topicPtr));
489+
var topicId = Librdkafka.TopicDescription_topic_id(topicPtr);
489490
var error = new Error(Librdkafka.TopicDescription_error(topicPtr), false);
490491
var isInternal = Librdkafka.TopicDescription_is_internal(topicPtr) != IntPtr.Zero;
491492
List<AclOperation> authorizedOperations = extractAuthorizedOperations(
@@ -497,6 +498,7 @@ private DescribeTopicsReport extractDescribeTopicsResults(IntPtr resultPtr)
497498
return new TopicDescription()
498499
{
499500
Name = topicName,
501+
TopicId = extractUuid(topicId),
500502
Error = error,
501503
AuthorizedOperations = authorizedOperations,
502504
IsInternal = isInternal,
@@ -509,6 +511,19 @@ private DescribeTopicsReport extractDescribeTopicsResults(IntPtr resultPtr)
509511
}).ToList();
510512
return result;
511513
}
514+
515+
private Uuid extractUuid(IntPtr uuidPtr)
516+
{
517+
if (uuidPtr == IntPtr.Zero)
518+
{
519+
return null;
520+
}
521+
522+
return new Uuid(
523+
Librdkafka.Uuid_most_significant_bits(uuidPtr),
524+
Librdkafka.Uuid_least_significant_bits(uuidPtr)
525+
);
526+
}
512527

513528
private Node extractNode(IntPtr nodePtr)
514529
{

src/Confluent.Kafka/Impl/LibRdKafka.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ static bool SetDelegates(Type nativeMethodsClass)
236236
_new = (Func<RdKafkaType, IntPtr, StringBuilder, UIntPtr, SafeKafkaHandle>)methods.Single(m => m.Name == "rd_kafka_new").CreateDelegate(typeof(Func<RdKafkaType, IntPtr, StringBuilder, UIntPtr, SafeKafkaHandle>));
237237
_name = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_name").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
238238
_memberid = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_memberid").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
239+
_Uuid_new = (Func<long, long, IntPtr>)methods.Single(m => m.Name == "rd_kafka_Uuid_new").CreateDelegate(typeof(Func<long, long, IntPtr>));
240+
_Uuid_base64str = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_Uuid_base64str").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
241+
_Uuid_most_significant_bits = (Func<IntPtr, long>)methods.Single(m => m.Name == "rd_kafka_Uuid_most_significant_bits").CreateDelegate(typeof(Func<IntPtr, long>));
242+
_Uuid_least_significant_bits = (Func<IntPtr, long>)methods.Single(m => m.Name == "rd_kafka_Uuid_least_significant_bits").CreateDelegate(typeof(Func<IntPtr, long>));
243+
_Uuid_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_Uuid_destroy").CreateDelegate(typeof(Action<IntPtr>));
239244
_topic_new = (Func<IntPtr, IntPtr, IntPtr, SafeTopicHandle>)methods.Single(m => m.Name == "rd_kafka_topic_new").CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr, SafeTopicHandle>));
240245
_topic_destroy = (Action<IntPtr>)methods.Single(m => m.Name == "rd_kafka_topic_destroy").CreateDelegate(typeof(Action<IntPtr>));
241246
_topic_name = (Func<IntPtr, IntPtr>)methods.Single(m => m.Name == "rd_kafka_topic_name").CreateDelegate(typeof(Func<IntPtr, IntPtr>));
@@ -458,6 +463,7 @@ static bool SetDelegates(Type nativeMethodsClass)
458463
_TopicCollection_destroy = (_TopicCollection_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_TopicCollection_destroy").CreateDelegate(typeof (_TopicCollection_destroy_delegate));
459464
_TopicDescription_error = (_TopicDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_error").CreateDelegate(typeof (_TopicDescription_error_delegate));
460465
_TopicDescription_name = (_TopicDescription_name_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_name").CreateDelegate(typeof (_TopicDescription_name_delegate));
466+
_TopicDescription_topic_id = (_TopicDescription_topic_id_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_topic_id").CreateDelegate(typeof (_TopicDescription_topic_id_delegate));
461467
_TopicDescription_partitions = (_TopicDescription_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_partitions").CreateDelegate(typeof (_TopicDescription_partitions_delegate));
462468
_TopicDescription_is_internal = (_TopicDescription_is_internal_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_is_internal").CreateDelegate(typeof (_TopicDescription_is_internal_delegate));
463469
_TopicDescription_authorized_operations = (_TopicDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_TopicDescription_authorized_operations").CreateDelegate(typeof (_TopicDescription_authorized_operations_delegate));
@@ -1037,6 +1043,22 @@ internal static SafeKafkaHandle kafka_new(RdKafkaType type, IntPtr conf,
10371043
private static Func<IntPtr, IntPtr> _memberid;
10381044
internal static IntPtr memberid(IntPtr rk) => _memberid(rk);
10391045

1046+
private static Func<long, long, IntPtr> _Uuid_new;
1047+
internal static IntPtr Uuid_new(long most_significant_bits, long least_significant_bits)
1048+
=> _Uuid_new(most_significant_bits, least_significant_bits);
1049+
1050+
private static Func<IntPtr, IntPtr> _Uuid_base64str;
1051+
internal static IntPtr Uuid_base64str(IntPtr uuid) => _Uuid_base64str(uuid);
1052+
1053+
private static Func<IntPtr, long> _Uuid_most_significant_bits;
1054+
internal static long Uuid_most_significant_bits(IntPtr uuid) => _Uuid_most_significant_bits(uuid);
1055+
1056+
private static Func<IntPtr, long> _Uuid_least_significant_bits;
1057+
internal static long Uuid_least_significant_bits(IntPtr uuid) => _Uuid_least_significant_bits(uuid);
1058+
1059+
private static Action<IntPtr> _Uuid_destroy;
1060+
internal static void Uuid_destroy(IntPtr uuid) => _Uuid_destroy(uuid);
1061+
10401062
private static Func<IntPtr, IntPtr, IntPtr, SafeTopicHandle> _topic_new;
10411063
internal static SafeTopicHandle topic_new(IntPtr rk, IntPtr topic, IntPtr conf)
10421064
=> _topic_new(rk, topic, conf);
@@ -2162,6 +2184,12 @@ internal static IntPtr TopicDescription_error(IntPtr topicdesc)
21622184
internal static IntPtr TopicDescription_name(IntPtr topicdesc)
21632185
=> _TopicDescription_name(topicdesc);
21642186

2187+
2188+
private delegate IntPtr _TopicDescription_topic_id_delegate(IntPtr topicdesc);
2189+
private static _TopicDescription_topic_id_delegate _TopicDescription_topic_id;
2190+
internal static IntPtr TopicDescription_topic_id(IntPtr topicdesc)
2191+
=> _TopicDescription_topic_id(topicdesc);
2192+
21652193
private delegate IntPtr _TopicDescription_partitions_delegate(IntPtr topicdesc, out UIntPtr cntp);
21662194
private static _TopicDescription_partitions_delegate _TopicDescription_partitions;
21672195
internal static IntPtr TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp)

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,24 @@ internal static extern SafeKafkaHandle rd_kafka_new(
311311
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
312312
internal static extern /* char * */ IntPtr rd_kafka_memberid(IntPtr rk);
313313

314+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
315+
internal static extern /* rd_kafka_Uuid_t * */IntPtr rd_kafka_Uuid_new(
316+
long most_significant_bits,
317+
long least_significant_bits
318+
);
319+
320+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
321+
internal static extern /* char * */IntPtr rd_kafka_Uuid_base64str(IntPtr uuid);
322+
323+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
324+
internal static extern long rd_kafka_Uuid_most_significant_bits(IntPtr uuid);
325+
326+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
327+
internal static extern long rd_kafka_Uuid_least_significant_bits(IntPtr uuid);
328+
329+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
330+
internal static extern void rd_kafka_Uuid_destroy(IntPtr uuid);
331+
314332
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
315333
internal static extern SafeTopicHandle rd_kafka_topic_new(
316334
IntPtr rk, IntPtr topic,
@@ -1212,6 +1230,9 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs
12121230
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12131231
internal static extern IntPtr rd_kafka_TopicDescription_name(IntPtr topicdesc);
12141232

1233+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1234+
internal static extern IntPtr rd_kafka_TopicDescription_topic_id(IntPtr topicdesc);
1235+
12151236
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12161237
internal static extern IntPtr rd_kafka_TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp);
12171238

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,24 @@ internal static extern SafeKafkaHandle rd_kafka_new(
315315
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
316316
internal static extern /* char * */ IntPtr rd_kafka_memberid(IntPtr rk);
317317

318+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
319+
internal static extern /* rd_kafka_Uuid_t * */IntPtr rd_kafka_Uuid_new(
320+
long most_significant_bits,
321+
long least_significant_bits
322+
);
323+
324+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
325+
internal static extern /* char * */IntPtr rd_kafka_Uuid_base64str(IntPtr uuid);
326+
327+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
328+
internal static extern long rd_kafka_Uuid_most_significant_bits(IntPtr uuid);
329+
330+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
331+
internal static extern long rd_kafka_Uuid_least_significant_bits(IntPtr uuid);
332+
333+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
334+
internal static extern void rd_kafka_Uuid_destroy(IntPtr uuid);
335+
318336
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
319337
internal static extern SafeTopicHandle rd_kafka_topic_new(
320338
IntPtr rk, IntPtr topic,
@@ -1216,6 +1234,9 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs
12161234
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12171235
internal static extern IntPtr rd_kafka_TopicDescription_name(IntPtr topicdesc);
12181236

1237+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1238+
internal static extern IntPtr rd_kafka_TopicDescription_topic_id(IntPtr topicdesc);
1239+
12191240
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12201241
internal static extern IntPtr rd_kafka_TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp);
12211242

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos6.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,24 @@ internal static extern SafeKafkaHandle rd_kafka_new(
315315
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
316316
internal static extern /* char * */ IntPtr rd_kafka_memberid(IntPtr rk);
317317

318+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
319+
internal static extern /* rd_kafka_Uuid_t * */IntPtr rd_kafka_Uuid_new(
320+
long most_significant_bits,
321+
long least_significant_bits
322+
);
323+
324+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
325+
internal static extern /* char * */IntPtr rd_kafka_Uuid_base64str(IntPtr uuid);
326+
327+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
328+
internal static extern long rd_kafka_Uuid_most_significant_bits(IntPtr uuid);
329+
330+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
331+
internal static extern long rd_kafka_Uuid_least_significant_bits(IntPtr uuid);
332+
333+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
334+
internal static extern void rd_kafka_Uuid_destroy(IntPtr uuid);
335+
318336
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
319337
internal static extern SafeTopicHandle rd_kafka_topic_new(
320338
IntPtr rk, IntPtr topic,
@@ -1216,6 +1234,9 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs
12161234
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12171235
internal static extern IntPtr rd_kafka_TopicDescription_name(IntPtr topicdesc);
12181236

1237+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1238+
internal static extern IntPtr rd_kafka_TopicDescription_topic_id(IntPtr topicdesc);
1239+
12191240
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12201241
internal static extern IntPtr rd_kafka_TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp);
12211242

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos7.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,24 @@ internal static extern SafeKafkaHandle rd_kafka_new(
315315
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
316316
internal static extern /* char * */ IntPtr rd_kafka_memberid(IntPtr rk);
317317

318+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
319+
internal static extern /* rd_kafka_Uuid_t * */IntPtr rd_kafka_Uuid_new(
320+
long most_significant_bits,
321+
long least_significant_bits
322+
);
323+
324+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
325+
internal static extern /* char * */IntPtr rd_kafka_Uuid_base64str(IntPtr uuid);
326+
327+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
328+
internal static extern long rd_kafka_Uuid_most_significant_bits(IntPtr uuid);
329+
330+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
331+
internal static extern long rd_kafka_Uuid_least_significant_bits(IntPtr uuid);
332+
333+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
334+
internal static extern void rd_kafka_Uuid_destroy(IntPtr uuid);
335+
318336
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
319337
internal static extern SafeTopicHandle rd_kafka_topic_new(
320338
IntPtr rk, IntPtr topic,
@@ -1216,6 +1234,9 @@ internal static extern IntPtr rd_kafka_TopicCollection_of_topic_names([MarshalAs
12161234
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12171235
internal static extern IntPtr rd_kafka_TopicDescription_name(IntPtr topicdesc);
12181236

1237+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1238+
internal static extern IntPtr rd_kafka_TopicDescription_topic_id(IntPtr topicdesc);
1239+
12191240
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
12201241
internal static extern IntPtr rd_kafka_TopicDescription_partitions(IntPtr topicdesc, out UIntPtr cntp);
12211242

src/Confluent.Kafka/Uuid.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
16+
//
17+
// Refer to LICENSE for more information.
18+
19+
using System;
20+
using Confluent.Kafka.Impl;
21+
using static Confluent.Kafka.Internal.Util.Marshal;
22+
23+
namespace Confluent.Kafka
24+
{
25+
/// <summary>
26+
/// Represents a UUID
27+
/// </summary>
28+
public class Uuid
29+
{
30+
/// <summary>
31+
/// Initializes a new instance of the Uuid class
32+
/// with the specified Most and Least significant bits.
33+
/// </summary>
34+
/// <param name="mostSignificantBits">
35+
/// Most significant 64 bits of the 128 bits UUID.
36+
/// </param>
37+
/// <param name="leastSignificantBits">
38+
/// Least significant 64 bits of the 128 bits UUID.
39+
/// </param>
40+
public Uuid(long mostSignificantBits, long leastSignificantBits)
41+
{
42+
Librdkafka.Initialize(null);
43+
MostSignificantBits = mostSignificantBits;
44+
LeastSignificantBits = leastSignificantBits;
45+
IntPtr cUuid = Librdkafka.Uuid_new(mostSignificantBits, leastSignificantBits);
46+
IntPtr cBase64str = Librdkafka.Uuid_base64str(cUuid);
47+
Base64str = PtrToStringUTF8(cBase64str);
48+
Librdkafka.Uuid_destroy(cUuid);
49+
}
50+
51+
/// <summary>
52+
/// Most significant 64 bits of the 128 bits UUID.
53+
/// </summary>
54+
public Offset MostSignificantBits { get; }
55+
56+
/// <summary>
57+
/// Most significant 64 bits of the 128 bits UUID.
58+
/// </summary>
59+
public Offset LeastSignificantBits { get; }
60+
61+
private readonly string Base64str;
62+
63+
/// <summary>
64+
/// Returns a string representation of the Uuid object.
65+
/// </summary>
66+
/// <returns>
67+
/// A string representation of the Uuid object.
68+
/// </returns>
69+
public override string ToString()
70+
=> Base64str;
71+
}
72+
}

0 commit comments

Comments
 (0)