Skip to content

Commit c214b42

Browse files
authored
remove ./bootstrap-topics.sh, use AdminClient to create test topics (#801)
* remove ./bootstrap-topics.sh, use AdminClient to create test topics * windows fix. fixed AddBrokers test. remove unnecessary includes * added tests for Assign overloads * misc additions * review changes
1 parent 3968ce2 commit c214b42

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+244
-296
lines changed

test/Confluent.Kafka.IntegrationTests/TemporaryTopic.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,24 @@ namespace Confluent.Kafka
2323
{
2424
public class TemporaryTopic : IDisposable
2525
{
26-
IAdminClient adminClient;
26+
private string bootstrapServers;
27+
28+
public string Name { get; set; }
2729

2830
public TemporaryTopic(string bootstrapServers, int numPartitions)
2931
{
30-
Name = Guid.NewGuid().ToString();
31-
adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
32-
adminClient.CreateTopicsAsync(
33-
new List<TopicSpecification> { new TopicSpecification { Name = Name, NumPartitions = numPartitions, ReplicationFactor = 1 } }).Wait();
32+
this.bootstrapServers = bootstrapServers;
33+
this.Name = "dotnet_test_" + Guid.NewGuid().ToString();
34+
35+
var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
36+
adminClient.CreateTopicsAsync(new List<TopicSpecification> {
37+
new TopicSpecification { Name = Name, NumPartitions = numPartitions, ReplicationFactor = 1 } }).Wait();
38+
adminClient.Dispose();
3439
}
3540

36-
public string Name { get; set; }
3741
public void Dispose()
3842
{
43+
var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = this.bootstrapServers }).Build();
3944
adminClient.DeleteTopicsAsync(new List<string> { Name }).Wait();
4045
adminClient.Dispose();
4146
}

test/Confluent.Kafka.IntegrationTests/Tests/AddBroker.cs

Lines changed: 21 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,96 +17,55 @@
1717
#pragma warning disable xUnit1026
1818

1919
using System;
20-
using System.Text;
21-
using System.Collections.Generic;
22-
using System.Threading.Tasks;
2320
using Xunit;
2421

2522

26-
/*
2723
namespace Confluent.Kafka.IntegrationTests
2824
{
29-
public static partial class Tests
25+
public partial class Tests
3026
{
3127
/// <summary>
32-
/// Test that produces a message then consumes it.
28+
/// Test the AddBroker method works.
3329
/// </summary>
30+
/// <remarks>
31+
/// Assumes broker v0.10.0 or higher:
32+
/// https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
33+
/// A metadata request is used to check if there is a broker connection.
34+
/// </remarks>
3435
[Theory, MemberData(nameof(KafkaParameters))]
35-
public static void AddBrokers(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
36+
public void AddBrokers(string bootstrapServers)
3637
{
37-
// This test assumes broker v0.10.0 or higher:
38-
// https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
38+
var producerConfig = new ProducerConfig { BootstrapServers = "localhost:65533" };
3939

40-
// This test does a broker metadata request, as it's an easy way to see
41-
// if we are connected to broker. It's not really the best way to test this
42-
// (ideally, we would get from a working list of brokers to all brokers
43-
// that have changed IP) but this will be good enough.
44-
45-
var producerConfig = new Dictionary<string, object>
46-
{
47-
{ "bootstrap.servers", "unknown" }
48-
};
49-
50-
var consumerConfig = new Dictionary<string, object>
51-
{
52-
{ "group.id", Guid.NewGuid().ToString() },
53-
{ "bootstrap.servers", "unknown" },
54-
{ "session.timeout.ms", 6000 }
55-
};
56-
57-
using (var typedProducer = new Producer<Null, string>(producerConfig))
58-
{
59-
TestMetadata(
60-
() => typedProducer.GetMetadata(false, null, TimeSpan.FromSeconds(3)),
61-
typedProducer.AddBrokers);
62-
}
63-
64-
using (var producer = new Producer<byte[], byte[]>(producerConfig))
65-
{
66-
TestMetadata(
67-
() => producer.GetMetadata(false, null, TimeSpan.FromSeconds(3)),
68-
producer.AddBrokers);
69-
}
70-
71-
using (var consumer = new Consumer<byte[], byte[]>(consumerConfig))
72-
{
73-
TestMetadata(
74-
() => consumer.GetMetadata(false, TimeSpan.FromSeconds(3)),
75-
consumer.AddBrokers);
76-
}
77-
78-
using (var typedConsumer = new Consumer<Null, Null>(consumerConfig))
79-
{
80-
TestMetadata(
81-
() => typedConsumer.GetMetadata(false, TimeSpan.FromSeconds(3)),
82-
typedConsumer.AddBrokers);
83-
}
84-
85-
void TestMetadata(Func<Metadata> getMetadata, Func<string, int> addBrokers)
40+
using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
41+
using (var adminClient = new AdminClient(producer.Handle))
8642
{
8743
try
8844
{
89-
var metadata = getMetadata();
45+
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(3));
9046
Assert.True(false, "Broker should not be reached here");
9147
}
9248
catch (KafkaException e)
9349
{
9450
Assert.Equal(ErrorCode.Local_Transport, e.Error.Code);
9551
}
9652

97-
int brokersAdded = addBrokers(bootstrapServers);
53+
// test is > 0 note == 1 since bootstrapServers could include more than one broker.
54+
int brokersAdded = adminClient.AddBrokers(bootstrapServers);
9855
Assert.True(brokersAdded > 0, "Should have added one broker or more");
9956

100-
brokersAdded = addBrokers(bootstrapServers);
57+
brokersAdded = adminClient.AddBrokers(bootstrapServers);
10158
Assert.True(brokersAdded > 0, "Should have added one broker or more (duplicates considered added)");
10259

103-
var newMetadata = getMetadata();
104-
Assert.True(newMetadata.Brokers.Count > 0);
60+
var newMetadata = adminClient.GetMetadata(TimeSpan.FromSeconds(3));
61+
Assert.True(newMetadata.Brokers.Count == 1);
10562

106-
brokersAdded = addBrokers("");
63+
brokersAdded = adminClient.AddBrokers("");
10764
Assert.True(brokersAdded == 0, "Should not have added brokers");
65+
66+
newMetadata = adminClient.GetMetadata(TimeSpan.FromSeconds(3));
67+
Assert.True(newMetadata.Brokers.Count > 0);
10868
}
10969
}
11070
}
11171
}
112-
*/

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_AlterConfigs.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
#pragma warning disable xUnit1026
1818

1919
using System;
20-
using System.Linq;
21-
using System.Text;
22-
using System.Threading.Tasks;
2320
using System.Threading;
2421
using System.Collections.Generic;
2522
using Confluent.Kafka.Admin;
@@ -28,13 +25,13 @@
2825

2926
namespace Confluent.Kafka.IntegrationTests
3027
{
31-
public static partial class Tests
28+
public partial class Tests
3229
{
3330
/// <summary>
3431
/// Test functionality of AdminClient.AlterConfigs.
3532
/// </summary>
3633
[Theory, MemberData(nameof(KafkaParameters))]
37-
public static void AdminClient_AlterConfigs(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
34+
public void AdminClient_AlterConfigs(string bootstrapServers)
3835
{
3936
LogToFile("start AdminClient_AlterConfigs");
4037

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_CreatePartitions.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,20 @@
1919
using System;
2020
using System.Collections.Generic;
2121
using System.Linq;
22-
using System.Text;
2322
using System.Threading;
24-
using System.Threading.Tasks;
2523
using Confluent.Kafka.Admin;
2624
using Xunit;
2725

2826

2927
namespace Confluent.Kafka.IntegrationTests
3028
{
31-
public static partial class Tests
29+
public partial class Tests
3230
{
3331
/// <summary>
3432
/// Test functionality of AdminClient.CreatePartitions
3533
/// </summary>
3634
[Theory, MemberData(nameof(KafkaParameters))]
37-
public static void AdminClient_CreatePartitions(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
35+
public void AdminClient_CreatePartitions(string bootstrapServers)
3836
{
3937
LogToFile("start AdminClient_CreatePartitions");
4038

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_CreateTopics.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,20 @@
1818

1919
using System;
2020
using System.Linq;
21-
using System.Text;
22-
using System.Threading.Tasks;
2321
using System.Collections.Generic;
2422
using Confluent.Kafka.Admin;
2523
using Xunit;
2624

2725

2826
namespace Confluent.Kafka.IntegrationTests
2927
{
30-
public static partial class Tests
28+
public partial class Tests
3129
{
3230
/// <summary>
3331
/// Test functionality of AdminClient.CreateTopics.
3432
/// </summary>
3533
[Theory, MemberData(nameof(KafkaParameters))]
36-
public static void AdminClient_CreateTopics(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
34+
public void AdminClient_CreateTopics(string bootstrapServers)
3735
{
3836
LogToFile("start AdminClient_CreateTopics");
3937

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_DeleteTopics.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,21 @@
1818

1919
using System;
2020
using System.Linq;
21-
using System.Text;
2221
using System.Threading;
23-
using System.Threading.Tasks;
2422
using System.Collections.Generic;
2523
using Confluent.Kafka.Admin;
2624
using Xunit;
2725

2826

2927
namespace Confluent.Kafka.IntegrationTests
3028
{
31-
public static partial class Tests
29+
public partial class Tests
3230
{
3331
/// <summary>
3432
/// Test functionality of AdminClient.CreateTopics.
3533
/// </summary>
3634
[Theory, MemberData(nameof(KafkaParameters))]
37-
public static void AdminClient_DeleteTopics(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
35+
public void AdminClient_DeleteTopics(string bootstrapServers)
3836
{
3937
LogToFile("start AdminClient_DeleteTopics");
4038

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_DescribeConfigs.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,20 @@
1818

1919
using System;
2020
using System.Linq;
21-
using System.Text;
22-
using System.Threading.Tasks;
2321
using System.Collections.Generic;
2422
using Confluent.Kafka.Admin;
2523
using Xunit;
2624

2725

2826
namespace Confluent.Kafka.IntegrationTests
2927
{
30-
public static partial class Tests
28+
public partial class Tests
3129
{
3230
/// <summary>
3331
/// Test functionality of AdminClient.DescribeConfigs.
3432
/// </summary>
3533
[Theory, MemberData(nameof(KafkaParameters))]
36-
public static void AdminClient_DescribeConfigs(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
34+
public void AdminClient_DescribeConfigs(string bootstrapServers)
3735
{
3836
LogToFile("start AdminClient_DescribeConfigs");
3937

test/Confluent.Kafka.IntegrationTests/Tests/AssignOverloads.cs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,43 @@
1717
#pragma warning disable xUnit1026
1818

1919
using System;
20-
using System.Text;
2120
using System.Collections.Generic;
2221
using Xunit;
2322

2423

2524
namespace Confluent.Kafka.IntegrationTests
2625
{
27-
public static partial class Tests
26+
public partial class Tests
2827
{
2928
/// <summary>
30-
/// Simple test of both Consumer.Assign overloads.
29+
/// Simple test of all Consumer.Assign overloads.
3130
/// </summary>
3231
[Theory, MemberData(nameof(KafkaParameters))]
33-
public static void AssignOverloads(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
32+
public void AssignOverloads(string bootstrapServers)
3433
{
3534
LogToFile("start AssignOverloads");
3635

3736
var consumerConfig = new ConsumerConfig
3837
{
3938
GroupId = Guid.NewGuid().ToString(),
4039
BootstrapServers = bootstrapServers,
41-
SessionTimeoutMs = 6000
40+
SessionTimeoutMs = 6000,
41+
EnableAutoCommit = false
4242
};
4343
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
4444

4545
var testString = "hello world";
4646
var testString2 = "hello world 2";
47+
var testString3 = "hello world 3";
48+
var testString4 = "hello world 4";
4749

48-
DeliveryResult<Null, string> dr;
50+
DeliveryResult<Null, string> dr, dr3;
4951
using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
5052
{
5153
dr = producer.ProduceAsync(singlePartitionTopic, new Message<Null, string> { Value = testString }).Result;
52-
var dr2 = producer.ProduceAsync(singlePartitionTopic, new Message<Null, string> { Value = testString2 }).Result;
54+
producer.ProduceAsync(singlePartitionTopic, new Message<Null, string> { Value = testString2 }).Wait();
55+
dr3 = producer.ProduceAsync(singlePartitionTopic, new Message<Null, string> { Value = testString3 }).Result;
56+
producer.ProduceAsync(singlePartitionTopic, new Message<Null, string> { Value = testString4 }).Wait();
5357
producer.Flush(TimeSpan.FromSeconds(10));
5458
}
5559

@@ -58,13 +62,28 @@ public static void AssignOverloads(string bootstrapServers, string singlePartiti
5862
// Explicitly specify partition offset.
5963
consumer.Assign(new List<TopicPartitionOffset>() { new TopicPartitionOffset(dr.TopicPartition, dr.Offset) });
6064
var cr = consumer.Consume(TimeSpan.FromSeconds(10));
65+
consumer.Commit();
6166
Assert.Equal(cr.Value, testString);
62-
67+
6368
// Determine offset to consume from automatically.
6469
consumer.Assign(new List<TopicPartition>() { dr.TopicPartition });
6570
cr = consumer.Consume(TimeSpan.FromSeconds(10));
71+
consumer.Commit();
6672
Assert.NotNull(cr.Message);
6773
Assert.Equal(cr.Message.Value, testString2);
74+
75+
// Explicitly specify partition offset.
76+
consumer.Assign(new TopicPartitionOffset(dr.TopicPartition, dr3.Offset));
77+
cr = consumer.Consume(TimeSpan.FromSeconds(10));
78+
consumer.Commit();
79+
Assert.Equal(cr.Value, testString3);
80+
81+
// Determine offset to consume from automatically.
82+
consumer.Assign(dr.TopicPartition);
83+
cr = consumer.Consume(TimeSpan.FromSeconds(10));
84+
consumer.Commit();
85+
Assert.NotNull(cr.Message);
86+
Assert.Equal(cr.Message.Value, testString4);
6887
}
6988

7089
Assert.Equal(0, Library.HandleCount);

test/Confluent.Kafka.IntegrationTests/Tests/AssignPastEnd.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@
1717
#pragma warning disable xUnit1026
1818

1919
using System;
20-
using System.Text;
2120
using System.Collections.Generic;
2221
using Xunit;
2322
using Confluent.Kafka.Serdes;
2423

2524

2625
namespace Confluent.Kafka.IntegrationTests
2726
{
28-
public static partial class Tests
27+
public partial class Tests
2928
{
3029
/// <summary>
3130
/// Test functionality of Consumer.Consume when assigned to offest
3231
/// higher than the offset of the last message on a partition.
3332
/// </summary>
3433
[Theory, MemberData(nameof(KafkaParameters))]
35-
public static void AssignPastEnd(string bootstrapServers, string singlePartitionTopic, string partitionedTopic)
34+
public void AssignPastEnd(string bootstrapServers)
3635
{
3736
LogToFile("start AssignPastEnd");
3837

0 commit comments

Comments
 (0)