Skip to content

Commit c0597a9

Browse files
committed
Add a produce/consume test
1 parent 79bb726 commit c0597a9

File tree

1 file changed

+39
-1
lines changed

1 file changed

+39
-1
lines changed

test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,21 +50,33 @@ public void SimpleProduceConsume(string bootstrapServers)
5050

5151
string testString1 = "hello world";
5252
string testString2 = null;
53+
string testString3 = "dlrow olleh";
54+
string testString4 = null;
5355

5456
DeliveryResult<Null, string> produceResult1;
5557
DeliveryResult<Null, string> produceResult2;
58+
DeliveryResult<Null, Memory<byte>?> produceResult3;
59+
DeliveryResult<Null, Memory<byte>?> produceResult4;
5660
using (var producer = new TestProducerBuilder<Null, string>(producerConfig).Build())
5761
{
5862
produceResult1 = ProduceMessage(singlePartitionTopic, producer, testString1);
5963
produceResult2 = ProduceMessage(singlePartitionTopic, producer, testString2);
6064
}
6165

66+
using (var producer = new TestProducerBuilder<Null, Memory<byte>?>(producerConfig).Build())
67+
{
68+
produceResult3 = ProduceMessage(singlePartitionTopic, producer, testString3);
69+
produceResult4 = ProduceMessage(singlePartitionTopic, producer, testString4);
70+
}
71+
6272
using (var consumer = new TestConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
6373
{
6474
ConsumeMessage(consumer, produceResult1, testString1);
6575
ConsumeMessage(consumer, produceResult2, testString2);
76+
ConsumeMessage(consumer, produceResult3, testString3);
77+
ConsumeMessage(consumer, produceResult4, testString4);
6678
}
67-
79+
6880
Assert.Equal(0, Library.HandleCount);
6981
LogToFile("end SimpleProduceConsume");
7082
}
@@ -80,6 +92,17 @@ private static void ConsumeMessage(IConsumer<byte[], byte[]> consumer, DeliveryR
8092
Assert.Equal(r.Message.Timestamp.UnixTimestampMs, dr.Message.Timestamp.UnixTimestampMs);
8193
}
8294

95+
private static void ConsumeMessage(IConsumer<byte[], byte[]> consumer, DeliveryResult<Null, Memory<byte>?> dr, string testString)
96+
{
97+
consumer.Assign(new List<TopicPartitionOffset> { dr.TopicPartitionOffset });
98+
var r = consumer.Consume(TimeSpan.FromSeconds(10));
99+
Assert.NotNull(r?.Message);
100+
Assert.Equal(testString, r.Message.Value == null ? null : Encoding.UTF8.GetString(r.Message.Value));
101+
Assert.Null(r.Message.Key);
102+
Assert.Equal(r.Message.Timestamp.Type, dr.Message.Timestamp.Type);
103+
Assert.Equal(r.Message.Timestamp.UnixTimestampMs, dr.Message.Timestamp.UnixTimestampMs);
104+
}
105+
83106
private static DeliveryResult<Null, string> ProduceMessage(string topic, IProducer<Null, string> producer, string testString)
84107
{
85108
var result = producer.ProduceAsync(topic, new Message<Null, string> { Value = testString }).Result;
@@ -91,5 +114,20 @@ private static DeliveryResult<Null, string> ProduceMessage(string topic, IProduc
91114
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10)));
92115
return result;
93116
}
117+
118+
private static DeliveryResult<Null, Memory<byte>?> ProduceMessage(string topic, IProducer<Null, Memory<byte>?> producer, string testString)
119+
{
120+
var result = producer .ProduceAsync(topic, new Message<Null, Memory<byte>?>
121+
{
122+
Value = testString == null ? null : Encoding.UTF8.GetBytes(testString),
123+
}).Result;
124+
Assert.NotNull(result?.Message);
125+
Assert.Equal(topic, result.Topic);
126+
Assert.NotEqual<long>(result.Offset, Offset.Unset);
127+
Assert.Equal(TimestampType.CreateTime, result.Message.Timestamp.Type);
128+
Assert.True(Math.Abs((DateTime.UtcNow - result.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0);
129+
Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10)));
130+
return result;
131+
}
94132
}
95133
}

0 commit comments

Comments
 (0)