Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
name: 'confluent-kafka-dotnet build pipeline'

env:
DOTNET_CLI_TELEMETRY_OPTOUT: 'true'

on:
push:
pull_request:

jobs:

build-test:
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-13] # macos-13 for x86_x64 arch
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: |
6.0.x
8.0.x
- name: Build and test
run: |
dotnet nuget add source --username user --password ${{ github.token }} --store-password-in-clear-text --name github https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json
dotnet restore
dotnet test -c Release test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj

package:
needs: [build-test]
runs-on: windows-latest
steps:

- name: Show default environment variables
run: |
echo "The job_id is: $GITHUB_JOB" # reference the default environment variables
echo "The id of this action is: $GITHUB_ACTION" # reference the default environment variables
echo "The run id is: $GITHUB_RUN_ID"
echo "The GitHub Actor's username is: $GITHUB_ACTOR"
echo "GitHub SHA: $GITHUB_SHA"
- uses: actions/checkout@v4
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: |
6.0.x
8.0.x
- name: Build and create packages
run: |
dotnet restore
dotnet build Confluent.Kafka.sln -c Release

# Different packaging for tagged vs untagged builds
if ($env:GITHUB_REF -match '^refs/tags/') {
$suffix = "gr-experimental"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gr-experimental suffix

} else {
$suffix = "ci-$env:GITHUB_RUN_ID"
}

dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj --output dist -c Release --version-suffix $suffix

- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: build-artifacts
path: dist/

# Publish NuGet packages when a tag is pushed.
# Tests need to succeed for all components and on all platforms first,
# including having a tag name that matches the version number.
publish-release:
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
needs: package
runs-on: ubuntu-latest
steps:
- name: Download NuGet package artifacts
uses: actions/download-artifact@v4
with:
name: build-artifacts
path: dist
- name: Publish to NuGet
run: |
dotnet nuget push "dist/Confluent.Kafka*.nupkg" --source https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json --api-key ${{ github.token }}
2 changes: 1 addition & 1 deletion src/Confluent.Kafka/Confluent.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="librdkafka.redist" Version="2.10.0">
<PackageReference Include="librdkafka.redist" Version="2.10.0-gr">
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
</PackageReference>
</ItemGroup>
Expand Down
52 changes: 52 additions & 0 deletions src/Confluent.Kafka/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,58 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
public ConsumeResult<TKey, TValue> Consume(TimeSpan timeout)
=> Consume(timeout.TotalMillisecondsAsInt());

/// <summary>
/// Consumes a message and invokes the callback with an allocation-free reader.
/// Returns true if a message was consumed, false if timeout occurred.
/// </summary>
/// <param name="millisecondsTimeout">Timeout in milliseconds</param>
/// <param name="callback">Callback to invoke with the message reader</param>
/// <returns>True if a message was consumed, false if timeout occurred</returns>
/// <exception cref="ConsumeException">Thrown when consumption fails</exception>
public unsafe bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback)
{
if (callback == null)
throw new ArgumentNullException(nameof(callback));

var msgPtr = kafkaHandle.ConsumerPoll((IntPtr)millisecondsTimeout);

if (this.handlerException != null)
{
var ex = this.handlerException;
this.handlerException = null;
if (msgPtr != IntPtr.Zero)
{
Librdkafka.message_destroy(msgPtr);
}

throw ex;
}

if (msgPtr == IntPtr.Zero)
{
return false; // Timeout occurred
}

try
{
var msg = (rd_kafka_message*)msgPtr;

// Handle message errors before invoking callback
if (msg->err != ErrorCode.NoError && msg->err != ErrorCode.Local_PartitionEOF)
{
throw new KafkaException(msg->err);
}

// Create the reader and invoke the callback
var reader = new Experimental.MessageReader(msg);
callback(in reader);
return true;
}
finally
{
Librdkafka.message_destroy(msgPtr);
}
}

/// <inheritdoc/>
public IConsumerGroupMetadata ConsumerGroupMetadata
Expand Down
135 changes: 135 additions & 0 deletions src/Confluent.Kafka/Experimental.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using System;
using Confluent.Kafka.Impl;
using Confluent.Kafka.Internal;

namespace Confluent.Kafka;

/// <summary>
/// Experimental alloc-free APIs.
/// Note that these APIs can be changed substantially or removed entirely in the future versions.
/// </summary>
public abstract class Experimental
{
/// <summary>
/// Callback delegate for allocation-free message consumption.
/// </summary>
/// <typeparam name="TKey">The key type</typeparam>

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / package

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name

Check warning on line 16 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / package

XML comment has a typeparam tag for 'TKey', but there is no type parameter by that name
/// <typeparam name="TValue">The value type</typeparam>

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / package

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name

Check warning on line 17 in src/Confluent.Kafka/Experimental.cs

View workflow job for this annotation

GitHub Actions / package

XML comment has a typeparam tag for 'TValue', but there is no type parameter by that name
/// <param name="reader">Message reader providing allocation-free access to message data</param>
public delegate void AllocFreeConsumeCallback(in MessageReader reader);

/// <summary>
/// Alloc-free delivery handler
/// </summary>
public delegate void AllocFreeDeliveryHandler(in MessageReader arg);

/// <summary>
/// Allocation-free message reader that provides access to consumed message data.
/// This is a ref struct to ensure stack allocation and prevent escaping the callback scope.
/// </summary>
public readonly unsafe ref struct MessageReader
{
private readonly rd_kafka_message* msg;
private readonly Lazy<IntPtr> _hdrsPtr;

internal MessageReader(
rd_kafka_message* msg)
{
this.msg = msg;
_hdrsPtr = new Lazy<IntPtr>(() =>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised that this is not alloc-free. Will update it.

{
var ptr = (IntPtr)msg;
IntPtr hdrsPtr;
Librdkafka.message_headers(ptr, out hdrsPtr);
return hdrsPtr;
});
}

/// <summary>Gets the topic name</summary>
public string Topic =>
msg->rkt != IntPtr.Zero
? Util.Marshal.PtrToStringUTF8(Librdkafka.topic_name(msg->rkt))
: null;

/// <summary>Gets the partition</summary>
public Partition Partition => msg->partition;

/// <summary>Gets the error code</summary>
public ErrorCode ErrorCode => msg->err;

/// <summary>Gets the offset</summary>
public Offset Offset => msg->offset;

/// <summary>Gets the leader epoch</summary>
public int? LeaderEpoch =>
msg->rkt != IntPtr.Zero && msg->offset != Offset.Unset
? Librdkafka.message_leader_epoch((IntPtr)msg)
: null;

/// <summary>Gets the timestamp</summary>
public Timestamp Timestamp
{
get
{
var timestampUnix = Librdkafka.message_timestamp((IntPtr)msg, out var timestampType);
return new Timestamp(timestampUnix, (TimestampType)timestampType);
}
}

/// <summary>Gets whether this indicates partition EOF</summary>
public bool IsPartitionEOF => msg->err == ErrorCode.Local_PartitionEOF;

/// <summary>
/// Gets the raw key data as a ReadOnlySpan without allocation
/// </summary>
public ReadOnlySpan<byte> KeySpan =>
msg->key == IntPtr.Zero
? ReadOnlySpan<byte>.Empty
: new ReadOnlySpan<byte>(msg->key.ToPointer(), (int)msg->key_len);

/// <summary>
/// Gets the raw value data as a ReadOnlySpan without allocation
/// </summary>
public ReadOnlySpan<byte> ValueSpan =>
msg->val == IntPtr.Zero
? ReadOnlySpan<byte>.Empty
: new ReadOnlySpan<byte>(msg->val.ToPointer(), (int)msg->len);

/// <summary>
/// Gets a header by index without allocation
/// </summary>
/// <param name="index">Zero-based header index</param>
/// <param name="name">Header name (allocated string)</param>
/// <param name="value">Header value as ReadOnlySpan</param>
/// <returns>True if header exists at the given index</returns>
public bool TryGetHeader(int index, out string name, out ReadOnlySpan<byte> value)
{
name = null;
value = ReadOnlySpan<byte>.Empty;

var hdrsPtr = _hdrsPtr.Value;
if (hdrsPtr == IntPtr.Zero)
return false;

var err = Librdkafka.header_get_all(
hdrsPtr,
(IntPtr)index,
out IntPtr namep,
out IntPtr valuep,
out IntPtr sizep
);

if (err != ErrorCode.NoError)
return false;

name = Util.Marshal.PtrToStringUTF8(namep);

if (valuep != IntPtr.Zero)
{
value = new ReadOnlySpan<byte>(valuep.ToPointer(), (int)sizep);
}

return true;
}
}
}
1 change: 1 addition & 0 deletions src/Confluent.Kafka/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
/// </exception>
ConsumeResult<TKey, TValue> Consume(TimeSpan timeout);

bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback);

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / package

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

Check warning on line 113 in src/Confluent.Kafka/IConsumer.cs

View workflow job for this annotation

GitHub Actions / package

Missing XML comment for publicly visible type or member 'IConsumer<TKey, TValue>.ConsumeWithCallback(int, Experimental.AllocFreeConsumeCallback)'

/// <summary>
/// Gets the (dynamic) group member id of
Expand Down
49 changes: 49 additions & 0 deletions src/Confluent.Kafka/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,55 @@
Message<TKey, TValue> message,
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);

/// <summary>
/// Asynchronously send a single message to a
/// Kafka topic partition.
/// </summary>
/// <param name="topicPartition">
/// The topic partition to produce
/// the message to.
/// </param>
/// <param name="deliveryHandler">

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / package

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 193 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / package

XML comment has a param tag for 'deliveryHandler', but there is no parameter by that name
/// A delegate that will be called
/// with a delivery report corresponding to the
/// produce request (if enabled).
/// </param>
/// <param name="val">
/// Value bytes to produce
/// </param>
/// <param name="key">
/// Key bytes to produce
/// </param>
/// <param name="timestamp">
/// Timestamp of the message to be produced
/// </param>
/// <param name="headers">
/// Optional headers of the message to be produced
/// </param>
/// <exception cref="ProduceException{TKey,TValue}">
/// Thrown in response to any error that is known
/// immediately (excluding user application logic errors),
/// for example ErrorCode.Local_QueueFull. Asynchronous
/// notification of unsuccessful produce requests is made
/// available via the <paramref name="deliveryHandler" />

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (windows-latest)

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (ubuntu-latest)

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / build-test (macos-13)

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / package

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name

Check warning on line 215 in src/Confluent.Kafka/IProducer.cs

View workflow job for this annotation

GitHub Actions / package

XML comment on 'IProducer<TKey, TValue>.Produce(TopicPartition, ReadOnlySpan<byte>, ReadOnlySpan<byte>, Timestamp, Headers)' has a paramref tag for 'deliveryHandler', but there is no parameter by that name
/// parameter (if specified). The Error property of the
/// exception / delivery report provides more detailed
/// information.
/// </exception>
/// <exception cref="System.ArgumentException">
/// Thrown in response to invalid argument values.
/// </exception>
/// <exception cref="System.InvalidOperationException">
/// Thrown in response to error conditions that reflect
/// an error in the application logic of the calling
/// application.
/// </exception>
void Produce(
TopicPartition topicPartition,
ReadOnlySpan<byte> val,
ReadOnlySpan<byte> key,
Timestamp timestamp,
Headers headers);

/// <summary>
/// Poll for callback events.
Expand Down
Loading