Skip to content

Commit 1f09085

Browse files
authored
Fix calls with empty lists (#2131)
* Empty topic collection and increased test coverage * Empty input list returns empty ResultInfos Increased test coverage
1 parent beb44ff commit 1f09085

File tree

9 files changed

+371
-19
lines changed

9 files changed

+371
-19
lines changed

examples/AdminClient/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -891,13 +891,13 @@ static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs
891891
{
892892
var listOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsets, options);
893893
Console.WriteLine("ListOffsetsResult:");
894-
PrintListOffsetsResultInfos(listOffsetsResult.ListOffsetsResultInfos);
894+
PrintListOffsetsResultInfos(listOffsetsResult.ResultInfos);
895895
}
896896
catch (ListOffsetsException e)
897897
{
898898
Console.WriteLine("ListOffsetsReport:");
899899
Console.WriteLine($" Error: {e.Error}");
900-
PrintListOffsetsResultInfos(e.Result.ListOffsetsResultInfos);
900+
PrintListOffsetsResultInfos(e.Result.ResultInfos);
901901
}
902902
catch (KafkaException e)
903903
{

src/Confluent.Kafka/Admin/ListOffsetsReport.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class ListOffsetsReport
3131
/// with ListOffsets. At least one of these
3232
/// results will be in error.
3333
/// </summary>
34-
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }
34+
public List<ListOffsetsResultInfo> ResultInfos { get; set; }
3535

3636
/// <summary>
3737
/// Operation error status.
@@ -47,8 +47,8 @@ public class ListOffsetsReport
4747
public override string ToString()
4848
{
4949
var result = new StringBuilder();
50-
result.Append($"{{\"ListOffsetsResultInfos\": [");
51-
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
50+
result.Append($"{{\"ResultInfos\": [");
51+
result.Append(string.Join(",", ResultInfos.Select(b => $" {b.ToString()}")));
5252
result.Append($"], \"Error\": \"{Error.Code}\"}}");
5353
return result.ToString();
5454
}

src/Confluent.Kafka/Admin/ListOffsetsResult.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class ListOffsetsResult
3030
/// Result information for all the partitions queried
3131
/// with ListOffsets.
3232
/// </summary>
33-
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }
33+
public List<ListOffsetsResultInfo> ResultInfos { get; set; }
3434

3535

3636
/// <summary>
@@ -42,8 +42,8 @@ public class ListOffsetsResult
4242
public override string ToString()
4343
{
4444
var result = new StringBuilder();
45-
result.Append($"{{\"ListOffsetsResultInfos\": [");
46-
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
45+
result.Append($"{{\"ResultInfos\": [");
46+
result.Append(string.Join(",", ResultInfos.Select(b => $" {b.ToString()}")));
4747
result.Append("]}");
4848
return result.ToString();
4949
}

src/Confluent.Kafka/Admin/OffsetSpec.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public abstract class OffsetSpec
3232
/// </summary>
3333
public class EarliestSpec : OffsetSpec
3434
{
35+
internal EarliestSpec()
36+
{
37+
}
38+
3539
internal override long Value()
3640
{
3741
return -2;
@@ -43,6 +47,10 @@ internal override long Value()
4347
/// </summary>
4448
public class LatestSpec : OffsetSpec
4549
{
50+
internal LatestSpec()
51+
{
52+
}
53+
4654
internal override long Value()
4755
{
4856
return -1;
@@ -55,6 +63,10 @@ internal override long Value()
5563
/// can be specified client-side.
5664
/// </summary>
5765
public class MaxTimestampSpec : OffsetSpec {
66+
internal MaxTimestampSpec()
67+
{
68+
}
69+
5870
internal override long Value()
5971
{
6072
return -3;

src/Confluent.Kafka/AdminClient.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -597,8 +597,12 @@ private DescribeClusterResult extractDescribeClusterResult(IntPtr resultPtr)
597597
private ListOffsetsReport extractListOffsetsReport(IntPtr resultPtr)
598598
{
599599
var resultInfosPtr = Librdkafka.ListOffsets_result_infos(resultPtr, out UIntPtr resulInfosCntPtr);
600+
600601
IntPtr[] resultResponsesPtrArr = new IntPtr[(int)resulInfosCntPtr];
601-
Marshal.Copy(resultInfosPtr, resultResponsesPtrArr, 0, (int)resulInfosCntPtr);
602+
if ((int)resulInfosCntPtr > 0)
603+
{
604+
Marshal.Copy(resultInfosPtr, resultResponsesPtrArr, 0, (int)resulInfosCntPtr);
605+
}
602606

603607
ErrorCode reportErrorCode = ErrorCode.NoError;
604608
var listOffsetsResultInfos = resultResponsesPtrArr.Select(resultResponsePtr =>
@@ -625,7 +629,7 @@ private ListOffsetsReport extractListOffsetsReport(IntPtr resultPtr)
625629

626630
return new ListOffsetsReport
627631
{
628-
ListOffsetsResultInfos = listOffsetsResultInfos,
632+
ResultInfos = listOffsetsResultInfos,
629633
Error = new Error(reportErrorCode)
630634
};
631635
}
@@ -1241,7 +1245,7 @@ private Task StartPollTask(CancellationToken ct)
12411245
}
12421246
else
12431247
{
1244-
var result = new ListOffsetsResult() { ListOffsetsResultInfos = report.ListOffsetsResultInfos };
1248+
var result = new ListOffsetsResult() { ResultInfos = report.ResultInfos };
12451249
Task.Run(() =>
12461250
((TaskCompletionSource<ListOffsetsResult>)adminClientResult).TrySetResult(
12471251
result));

src/Confluent.Kafka/Impl/SafeKafkaHandle.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,7 +2512,7 @@ internal void ListOffsets(IEnumerable<TopicPartitionOffsetSpec> topicPartitionOf
25122512
topic_partition,
25132513
(int) Util.Marshal.OffsetOf<rd_kafka_topic_partition>("offset"),
25142514
topicPartitionOffset.OffsetSpec.Value());
2515-
}
2515+
}
25162516
Librdkafka.ListOffsets(handle, topic_partition_list, optionsPtr, resultQueuePtr);
25172517
}
25182518
finally
@@ -2532,11 +2532,6 @@ internal void DescribeTopics(TopicCollection topicCollection, DescribeTopicsOpti
25322532
{
25332533
ThrowIfHandleClosed();
25342534

2535-
if (topicCollection.Topics.Count() == 0)
2536-
{
2537-
throw new ArgumentException("at least one topic should be provided to DescribeTopics");
2538-
}
2539-
25402535
var optionsPtr = IntPtr.Zero;
25412536
var topicCollectionPtr = IntPtr.Zero;
25422537
try

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListOffsets.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ public async void AdminClient_ListOffsets(string bootstrapServers)
6565

6666
var listOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsetSpecs, options);
6767

68-
foreach (var ListOffsetsResultInfo in listOffsetsResult.ListOffsetsResultInfos)
68+
foreach (var resultInfo in listOffsetsResult.ResultInfos)
6969
{
70-
TopicPartitionOffsetError topicPartition = ListOffsetsResultInfo.TopicPartitionOffsetError;
70+
TopicPartitionOffsetError topicPartition = resultInfo.TopicPartitionOffsetError;
7171
Assert.Equal(offset, topicPartition.Offset);
7272
}
7373
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Xunit;
18+
using System;
19+
using Confluent.Kafka.Admin;
20+
using System.Collections.Generic;
21+
22+
23+
namespace Confluent.Kafka.UnitTests
24+
{
25+
public class DescribeTopicsErrorTests
26+
{
27+
private readonly List<DescribeTopicsOptions> options = new List<DescribeTopicsOptions>
28+
{
29+
new DescribeTopicsOptions {},
30+
new DescribeTopicsOptions { RequestTimeout = TimeSpan.FromMilliseconds(200) },
31+
new DescribeTopicsOptions { IncludeAuthorizedOperations = true },
32+
new DescribeTopicsOptions { RequestTimeout = TimeSpan.FromMilliseconds(200), IncludeAuthorizedOperations = false },
33+
};
34+
35+
[Fact]
36+
public async void NullTopicCollection()
37+
{
38+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = "localhost:90922" }).Build())
39+
{
40+
foreach (var option in options)
41+
{
42+
await Assert.ThrowsAsync<NullReferenceException>(() =>
43+
adminClient.DescribeTopicsAsync(null, option)
44+
);
45+
}
46+
}
47+
}
48+
49+
[Fact]
50+
public async void EmptyTopicCollection()
51+
{
52+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = "localhost:90922" }).Build())
53+
{
54+
foreach (var option in options)
55+
{
56+
var result = await adminClient.DescribeTopicsAsync(
57+
TopicCollection.OfTopicNames(new List<string> {}),
58+
option);
59+
Assert.Empty(result.TopicDescriptions);
60+
}
61+
}
62+
}
63+
64+
65+
[Fact]
66+
public async void WrongTopicNames()
67+
{
68+
var wrongTopicCollections = new List<TopicCollection>
69+
{
70+
TopicCollection.OfTopicNames(new List<string> {""}),
71+
TopicCollection.OfTopicNames(new List<string> {"correct", ""}),
72+
};
73+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = "localhost:90922" }).Build())
74+
{
75+
foreach (var option in options)
76+
{
77+
foreach (var collection in wrongTopicCollections)
78+
{
79+
await Assert.ThrowsAsync<KafkaException>(() =>
80+
adminClient.DescribeTopicsAsync(collection, option)
81+
);
82+
}
83+
}
84+
}
85+
}
86+
87+
[Fact]
88+
public async void WrongRequestTimeoutValue()
89+
{
90+
var topicCollections = TopicCollection.OfTopicNames(new List<string> {});
91+
var wrongRequestTimeoutValue = new DescribeTopicsOptions
92+
{
93+
RequestTimeout = TimeSpan.FromSeconds(-1)
94+
};
95+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = "localhost:90922" }).Build())
96+
{
97+
await Assert.ThrowsAsync<KafkaException>(() =>
98+
adminClient.DescribeTopicsAsync(topicCollections, wrongRequestTimeoutValue)
99+
);
100+
}
101+
}
102+
103+
[Fact]
104+
public async void LocalTimeout()
105+
{
106+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig
107+
{
108+
BootstrapServers = "localhost:90922",
109+
SocketTimeoutMs = 10
110+
}).Build())
111+
{
112+
foreach (var option in options)
113+
{
114+
var ex = await Assert.ThrowsAsync<KafkaException>(() =>
115+
adminClient.DescribeTopicsAsync(
116+
TopicCollection.OfTopicNames(new List<string> {"test"}),
117+
option)
118+
);
119+
Assert.Equal("Failed while waiting for controller: Local: Timed out", ex.Message);
120+
}
121+
}
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)