diff --git a/src/Confluent.Kafka/IProducer.cs b/src/Confluent.Kafka/IProducer.cs index cb3501cdc..9aa9f426b 100644 --- a/src/Confluent.Kafka/IProducer.cs +++ b/src/Confluent.Kafka/IProducer.cs @@ -21,12 +21,33 @@ namespace Confluent.Kafka -{ +{ + /// + /// Defines a high-level Apache Kafka producer client without serialization capable of producing pre-serialized messages. + /// + public interface IProducer : IClient + { + /// + /// Asynchronously send a single preserialized message to a Kafka topic. + /// + /// + /// Use this method to produce with minimal allocations. + /// + /// The topic to produce message to. + /// The partition to produce to or Partition.Any to use configured partitioner. + /// Serialized message key or null. + /// Serialized message value or null. + /// Message headers or null to produce message without headers. + /// + /// Result of produce. + Task ProduceAsync(string topic, Partition partition, ArraySegment? key, ArraySegment? value, IReadOnlyList headers, Timestamp timestamp); + } + /// /// Defines a high-level Apache Kafka producer client /// that provides key and value serialization. /// - public interface IProducer : IClient + public interface IProducer : IProducer { /// /// Asynchronously send a single message to a @@ -99,9 +120,8 @@ Task> ProduceAsync( Task> ProduceAsync( TopicPartition topicPartition, Message message, - CancellationToken cancellationToken = default(CancellationToken)); - - + CancellationToken cancellationToken = default(CancellationToken)); + /// /// Asynchronously send a single message to a /// Kafka topic. The partition the message is sent @@ -540,6 +560,6 @@ void Produce( /// /// Thrown on all other errors. /// - void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout); + void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout); } } diff --git a/src/Confluent.Kafka/ProduceResult.cs b/src/Confluent.Kafka/ProduceResult.cs new file mode 100644 index 000000000..af5e13fed --- /dev/null +++ b/src/Confluent.Kafka/ProduceResult.cs @@ -0,0 +1,52 @@ +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka +{ + /// + /// Models the result of a call to ProduceAsync. + /// + public readonly record struct ProduceResult + { + /// + /// The partition message was produced to. + /// + public Partition Partition { get; } + + /// + /// The offset message was persisted at. + /// + public Offset Offset { get; } + + /// + /// The persistence status of the message. + /// + public PersistenceStatus PersistenceStatus { get; } + + /// + /// Creates new result. + /// + /// + /// + /// + public ProduceResult(Partition partition, Offset offset, PersistenceStatus persistenceStatus) + { + Partition = partition; + Offset = offset; + PersistenceStatus = persistenceStatus; + } + } +} diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..8ec0ac837 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -1,19 +1,19 @@ -// Copyright 2016-2018 Confluent Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Refer to LICENSE for more information. - +// Copyright 2016-2018 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + using System; using System.Collections.Generic; using System.Linq; @@ -25,11 +25,11 @@ namespace Confluent.Kafka -{ +{ /// /// A high level producer with serialization capability. /// - internal class Producer : IProducer, IClient + internal class Producer : IProducer { internal class Config { @@ -56,8 +56,8 @@ internal class Config { typeof(float), Serializers.Single }, { typeof(double), Serializers.Double }, { typeof(byte[]), Serializers.ByteArray } - }; - + }; + private int cancellationDelayMaxMs; private bool disposeHasBeenCalled = false; private object disposeHasBeenCalledLockObj = new object(); @@ -324,7 +324,7 @@ private void ProduceImpl( else { err = KafkaHandle.Produce( - topic, + topic, val, valOffset, valLength, key, keyOffset, keyLength, partition.Value, @@ -741,7 +741,41 @@ internal Producer(ProducerBuilder builder) builder.AsyncKeySerializer, builder.AsyncValueSerializer); } - + /// + public async Task ProduceAsync( + string topic, + Partition partition, + ArraySegment? key, + ArraySegment? value, + IReadOnlyList headers, + Timestamp timestamp) + { + // Start produce request + DeliveryHandlerSlim deliveryHandler = new(); + ProduceImpl(topic, + value?.Array, value?.Offset ?? 0, value?.Count ?? 0, + key?.Array, key?.Offset ?? 0, key?.Count ?? 0, + timestamp, + partition, + headers ?? Array.Empty(), + deliveryHandler); + + // Wait for response + DeliveryReport deliveryReport = await deliveryHandler.Task.ConfigureAwait(false); + + // Return response + return new(deliveryReport.Partition, deliveryReport.Offset, deliveryReport.Status); + } + + /// + /// Implements light-weight delivery handler for produce requests. + /// + class DeliveryHandlerSlim : TaskCompletionSource>, IDeliveryHandler + { + public DeliveryHandlerSlim() : base(TaskCreationOptions.RunContinuationsAsynchronously) {} + public void HandleDeliveryReport(DeliveryReport deliveryReport) => TrySetResult(deliveryReport); + } + /// public async Task> ProduceAsync( TopicPartition topicPartition, @@ -815,7 +849,7 @@ public async Task> ProduceAsync( else { ProduceImpl( - topicPartition.Topic, + topicPartition.Topic, valBytes, 0, valBytes == null ? 0 : valBytes.Length, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, message.Timestamp, topicPartition.Partition, headers.BackingList, @@ -912,7 +946,7 @@ public void Produce( } try - { + { ProduceImpl( topicPartition.Topic, valBytes, 0, valBytes == null ? 0 : valBytes.Length,