Skip to content

Commit d745216

Browse files
committed
Merge branch '1.0.x'
2 parents 26e32f4 + d1557b2 commit d745216

File tree

5 files changed

+17
-20
lines changed

5 files changed

+17
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
- Builder classes now return interfaces, not concrete classes.
2727
- Removed the dependency on `CompilerServices.Unsafe` which was causing `ProduceAsync` to hang in some scenarios.
2828
- Fixed a deadlock-on-dispose issue in `AdminClient`.
29+
- Made `Producer.ProduceAsync` async.
2930

3031

3132
# 1.0.0-beta3

src/Confluent.Kafka/Producer.cs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
658658
/// A Task which will complete with a delivery report corresponding to
659659
/// the produce request, or an exception if an error occured.
660660
/// </returns>
661-
public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
661+
public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
662662
TopicPartition topicPartition,
663663
Message<TKey, TValue> message)
664664
{
@@ -667,10 +667,7 @@ public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
667667
{
668668
keyBytes = (keySerializer != null)
669669
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic))
670-
: asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic))
671-
.ConfigureAwait(continueOnCapturedContext: false)
672-
.GetAwaiter()
673-
.GetResult();
670+
: await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic));
674671
}
675672
catch (Exception exception)
676673
{
@@ -689,10 +686,7 @@ public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
689686
{
690687
valBytes = (valueSerializer != null)
691688
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic))
692-
: asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic))
693-
.ConfigureAwait(continueOnCapturedContext: false)
694-
.GetAwaiter()
695-
.GetResult();
689+
: await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic));
696690
}
697691
catch (Exception exception)
698692
{
@@ -722,7 +716,7 @@ public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
722716
message.Timestamp, topicPartition.Partition, message.Headers,
723717
handler);
724718

725-
return handler.Task;
719+
return await handler.Task;
726720
}
727721
else
728722
{
@@ -739,7 +733,7 @@ public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
739733
Message = message
740734
};
741735

742-
return Task.FromResult(result);
736+
return result;
743737
}
744738
}
745739
catch (KafkaException ex)

test/Confluent.Kafka.IntegrationTests/Tests/AddBroker.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void AddBrokers(string bootstrapServers)
5858
Assert.True(brokersAdded > 0, "Should have added one broker or more (duplicates considered added)");
5959

6060
var newMetadata = adminClient.GetMetadata(TimeSpan.FromSeconds(3));
61-
Assert.True(newMetadata.Brokers.Count == 1);
61+
Assert.True(newMetadata.Brokers.Count >= 1);
6262

6363
brokersAdded = adminClient.AddBrokers("");
6464
Assert.True(brokersAdded == 0, "Should not have added brokers");

test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_CreatePartitions.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ public void AdminClient_CreatePartitions(string bootstrapServers)
5858
producer.ProduceAsync(new TopicPartition(topicName1, 2), new Message<Null, Null>()).Wait();
5959
Assert.True(false, "expecting exception");
6060
}
61-
catch (KafkaException ex)
61+
catch (AggregateException ex)
6262
{
63-
Assert.True(ex.Error.IsError);
63+
Assert.IsType<ProduceException<Null,Null>>(ex.InnerException);
64+
Assert.True(((ProduceException<Null,Null>)ex.InnerException).Error.IsError);
6465
}
6566
}
6667

@@ -79,9 +80,10 @@ public void AdminClient_CreatePartitions(string bootstrapServers)
7980
var dr2 = producer.ProduceAsync(new TopicPartition(topicName2, 1), new Message<Null, Null>()).Result;
8081
Assert.True(false, "expecting exception");
8182
}
82-
catch (KafkaException ex)
83+
catch (AggregateException ex)
8384
{
84-
Assert.True(ex.Error.IsError);
85+
Assert.IsType<ProduceException<Null,Null>>(ex.InnerException);
86+
Assert.True(((ProduceException<Null,Null>)ex.InnerException).Error.IsError);
8587
}
8688
}
8789

test/Confluent.Kafka.IntegrationTests/Tests/Timestamps.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void Timestamps(string bootstrapServers)
7070
new Message<Null, string> { Value = "test-value" }).Result);
7171

7272
// TimestampType: LogAppendTime
73-
Assert.Throws<ArgumentException>(() =>
73+
Assert.Throws<AggregateException>(() =>
7474
producer.ProduceAsync(
7575
new TopicPartition(singlePartitionTopic, 0),
7676
new Message<Null, string>
@@ -80,7 +80,7 @@ public void Timestamps(string bootstrapServers)
8080
}).Result);
8181

8282
// TimestampType: NotAvailable
83-
Assert.Throws<ArgumentException>(() =>
83+
Assert.Throws<AggregateException>(() =>
8484
producer.ProduceAsync(
8585
new TopicPartition(singlePartitionTopic, 0),
8686
new Message<Null, string>
@@ -159,13 +159,13 @@ Action<DeliveryReport<Null, string>> dh
159159
new Message<byte[], byte[]> { Timestamp = Timestamp.Default }).Result);
160160

161161
// TimestampType: LogAppendTime
162-
Assert.Throws<ArgumentException>(() =>
162+
Assert.Throws<AggregateException>(() =>
163163
producer.ProduceAsync(
164164
singlePartitionTopic,
165165
new Message<byte[], byte[]> { Timestamp = new Timestamp(DateTime.Now, TimestampType.LogAppendTime) }).Result);
166166

167167
// TimestampType: NotAvailable
168-
Assert.Throws<ArgumentException>(() =>
168+
Assert.Throws<AggregateException>(() =>
169169
producer.ProduceAsync(
170170
singlePartitionTopic,
171171
new Message<byte[], byte[]> { Timestamp = new Timestamp(10, TimestampType.NotAvailable) }).Result);

0 commit comments

Comments
 (0)