Skip to content

Proposal: Allocation-free alternative API for producer and consumer #2543

@marcin-krystianc

Description

@marcin-krystianc

This proposal is an alternative approach to the one discussed in confluentinc/confluent-kafka-dotnet#2157.

As mentioned in that issue, the existing .NET API introduces a non-negligible overhead compared to the native library. To address this, we designed a new allocation-free API that removes unnecessary overhead and achieves performance parity with the native library.

The core design principles are:

  • Use callbacks instead of returning result objects.
  • Use ReadOnlySpan<byte> for keys and values to avoid additional allocations.

Proposed Producer API

public interface IProducer<TKey, TValue> : IClient
{
    void Produce(
        string topic,
        ReadOnlySpan<byte> val,
        ReadOnlySpan<byte> key,
        Timestamp timestamp,
        Partition partition,
        Headers headers);
}

public class ProducerBuilder<TKey, TValue>
{
    public ProducerBuilder<TKey, TValue> SetAllocFreeDeliveryHandler(
        Experimental.AllocFreeDeliveryHandler allocFreeDeliveryHandler)
}

Proposed Consumer API

public interface IConsumer<TKey, TValue> : IClient
{
    bool ConsumeWithCallback(
        int millisecondsTimeout,
        Experimental.AllocFreeConsumeCallback callback);
}

New MessageReader Interface

public abstract class Experimental
{
    public delegate void AllocFreeConsumeCallback(in MessageReader reader);
    public delegate void AllocFreeDeliveryHandler(in MessageReader reader);

    public unsafe ref struct MessageReader
    {
        private readonly rd_kafka_message* msg;
        private IntPtr hdrsPtr = IntPtr.Zero;

        internal MessageReader(rd_kafka_message* msg)
        {
            this.msg = msg;
        }

        public ReadOnlySpan<byte> KeySpan =>
            msg->key == IntPtr.Zero
                ? ReadOnlySpan<byte>.Empty
                : new ReadOnlySpan<byte>(msg->key.ToPointer(), (int)msg->key_len);

        public ReadOnlySpan<byte> ValueSpan =>
            msg->val == IntPtr.Zero
                ? ReadOnlySpan<byte>.Empty
                : new ReadOnlySpan<byte>(msg->val.ToPointer(), (int)msg->len);

        // etc...
    }
}

A full implementation is available here:
G-Research/confluent-kafka-dotnet#5


Questions / Next Steps

  • Would there be interest in integrating an allocation-free API into the main library?
  • Should we open a PR for review?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions