Skip to content

Commit eac9181

Browse files
Experimental alloc-free Consumer & Producer API (#5)
1 parent e31a0aa commit eac9181

File tree

12 files changed

+584
-87
lines changed

12 files changed

+584
-87
lines changed

.github/workflows/build.yml

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
name: 'confluent-kafka-dotnet build pipeline'
2+
3+
env:
4+
DOTNET_CLI_TELEMETRY_OPTOUT: 'true'
5+
6+
on:
7+
push:
8+
pull_request:
9+
10+
jobs:
11+
12+
build-test:
13+
strategy:
14+
matrix:
15+
os: [ubuntu-latest, windows-latest, macos-13] # macos-13 for x86_x64 arch
16+
runs-on: ${{ matrix.os }}
17+
steps:
18+
- uses: actions/checkout@v4
19+
- name: Setup .NET
20+
uses: actions/setup-dotnet@v4
21+
with:
22+
dotnet-version: |
23+
6.0.x
24+
8.0.x
25+
- name: Build and test
26+
run: |
27+
dotnet nuget add source --username user --password ${{ github.token }} --store-password-in-clear-text --name github https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json
28+
dotnet restore
29+
dotnet test -c Release test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj
30+
31+
package:
32+
needs: [build-test]
33+
runs-on: windows-latest
34+
steps:
35+
36+
- name: Show default environment variables
37+
run: |
38+
echo "The job_id is: $GITHUB_JOB" # reference the default environment variables
39+
echo "The id of this action is: $GITHUB_ACTION" # reference the default environment variables
40+
echo "The run id is: $GITHUB_RUN_ID"
41+
echo "The GitHub Actor's username is: $GITHUB_ACTOR"
42+
echo "GitHub SHA: $GITHUB_SHA"
43+
- uses: actions/checkout@v4
44+
- name: Setup .NET
45+
uses: actions/setup-dotnet@v4
46+
with:
47+
dotnet-version: |
48+
6.0.x
49+
8.0.x
50+
- name: Build and create packages
51+
run: |
52+
dotnet restore
53+
dotnet build Confluent.Kafka.sln -c Release
54+
55+
# Different packaging for tagged vs untagged builds
56+
if ($env:GITHUB_REF -match '^refs/tags/') {
57+
$suffix = "gr-experimental"
58+
} else {
59+
$suffix = "ci-$env:GITHUB_RUN_ID"
60+
}
61+
62+
dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj --output dist -c Release --version-suffix $suffix
63+
64+
- name: Upload artifacts
65+
uses: actions/upload-artifact@v4
66+
with:
67+
name: build-artifacts
68+
path: dist/
69+
70+
# Publish NuGet packages when a tag is pushed.
71+
# Tests need to succeed for all components and on all platforms first,
72+
# including having a tag name that matches the version number.
73+
publish-release:
74+
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
75+
needs: package
76+
runs-on: ubuntu-latest
77+
steps:
78+
- name: Download NuGet package artifacts
79+
uses: actions/download-artifact@v4
80+
with:
81+
name: build-artifacts
82+
path: dist
83+
- name: Publish to NuGet
84+
run: |
85+
dotnet nuget push "dist/Confluent.Kafka*.nupkg" --source https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json --api-key ${{ github.token }}

src/Confluent.Kafka/Confluent.Kafka.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
</PropertyGroup>
2626

2727
<ItemGroup>
28-
<PackageReference Include="librdkafka.redist" Version="2.10.0">
28+
<PackageReference Include="librdkafka.redist" Version="2.10.0-gr">
2929
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
3030
</PackageReference>
3131
</ItemGroup>

src/Confluent.Kafka/Consumer.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,52 @@ public ConsumeResult<TKey, TValue> Consume(TimeSpan timeout)
10001000
=> Consume(timeout.TotalMillisecondsAsInt());
10011001

10021002

1003+
/// <inheritdoc/>
1004+
public unsafe bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback)
1005+
{
1006+
if (callback == null)
1007+
throw new ArgumentNullException(nameof(callback));
1008+
1009+
var msgPtr = kafkaHandle.ConsumerPoll((IntPtr)millisecondsTimeout);
1010+
1011+
if (this.handlerException != null)
1012+
{
1013+
var ex = this.handlerException;
1014+
this.handlerException = null;
1015+
if (msgPtr != IntPtr.Zero)
1016+
{
1017+
Librdkafka.message_destroy(msgPtr);
1018+
}
1019+
1020+
throw ex;
1021+
}
1022+
1023+
if (msgPtr == IntPtr.Zero)
1024+
{
1025+
return false; // Timeout occurred
1026+
}
1027+
1028+
try
1029+
{
1030+
var msg = (rd_kafka_message*)msgPtr;
1031+
1032+
// Handle message errors before invoking callback
1033+
if (msg->err != ErrorCode.NoError && msg->err != ErrorCode.Local_PartitionEOF)
1034+
{
1035+
throw new KafkaException(msg->err);
1036+
}
1037+
1038+
// Create the reader and invoke the callback
1039+
var reader = new Experimental.MessageReader(msg);
1040+
callback(in reader);
1041+
return true;
1042+
}
1043+
finally
1044+
{
1045+
Librdkafka.message_destroy(msgPtr);
1046+
}
1047+
}
1048+
10031049
/// <inheritdoc/>
10041050
public IConsumerGroupMetadata ConsumerGroupMetadata
10051051
{
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
using System;
2+
using Confluent.Kafka.Impl;
3+
using Confluent.Kafka.Internal;
4+
5+
namespace Confluent.Kafka;
6+
7+
/// <summary>
8+
/// Experimental alloc-free APIs.
9+
/// Note that these APIs can be changed substantially or removed entirely in the future versions.
10+
/// </summary>
11+
public abstract class Experimental
12+
{
13+
/// <summary>
14+
/// Callback delegate for allocation-free message consumption.
15+
/// </summary>
16+
/// <param name="reader">Message reader providing allocation-free access to message data</param>
17+
public delegate void AllocFreeConsumeCallback(in MessageReader reader);
18+
19+
/// <summary>
20+
/// Alloc-free delivery handler
21+
/// </summary>
22+
public delegate void AllocFreeDeliveryHandler(in MessageReader arg);
23+
24+
/// <summary>
25+
/// Allocation-free message reader that provides access to consumed message data.
26+
/// This is a ref struct to ensure stack allocation and prevent escaping the callback scope.
27+
/// </summary>
28+
public unsafe ref struct MessageReader
29+
{
30+
private readonly rd_kafka_message* msg;
31+
private IntPtr hdrsPtr = IntPtr.Zero;
32+
33+
internal MessageReader(
34+
rd_kafka_message* msg)
35+
{
36+
this.msg = msg;
37+
}
38+
39+
/// <summary>Gets the topic name</summary>
40+
public string Topic =>
41+
msg->rkt != IntPtr.Zero
42+
? Util.Marshal.PtrToStringUTF8(Librdkafka.topic_name(msg->rkt))
43+
: null;
44+
45+
/// <summary>Gets the partition</summary>
46+
public Partition Partition => msg->partition;
47+
48+
/// <summary>Gets the error code</summary>
49+
public ErrorCode ErrorCode => msg->err;
50+
51+
/// <summary>Gets the offset</summary>
52+
public Offset Offset => msg->offset;
53+
54+
/// <summary>Gets the leader epoch</summary>
55+
public int? LeaderEpoch =>
56+
msg->rkt != IntPtr.Zero && msg->offset != Offset.Unset
57+
? Librdkafka.message_leader_epoch((IntPtr)msg)
58+
: null;
59+
60+
/// <summary>Gets the timestamp</summary>
61+
public Timestamp Timestamp
62+
{
63+
get
64+
{
65+
var timestampUnix = Librdkafka.message_timestamp((IntPtr)msg, out var timestampType);
66+
return new Timestamp(timestampUnix, (TimestampType)timestampType);
67+
}
68+
}
69+
70+
/// <summary>Gets whether this indicates partition EOF</summary>
71+
public bool IsPartitionEOF => msg->err == ErrorCode.Local_PartitionEOF;
72+
73+
/// <summary>
74+
/// Gets the raw key data as a ReadOnlySpan without allocation
75+
/// </summary>
76+
public ReadOnlySpan<byte> KeySpan =>
77+
msg->key == IntPtr.Zero
78+
? ReadOnlySpan<byte>.Empty
79+
: new ReadOnlySpan<byte>(msg->key.ToPointer(), (int)msg->key_len);
80+
81+
/// <summary>
82+
/// Gets the raw value data as a ReadOnlySpan without allocation
83+
/// </summary>
84+
public ReadOnlySpan<byte> ValueSpan =>
85+
msg->val == IntPtr.Zero
86+
? ReadOnlySpan<byte>.Empty
87+
: new ReadOnlySpan<byte>(msg->val.ToPointer(), (int)msg->len);
88+
89+
/// <summary>
90+
/// Gets a header by index without allocation
91+
/// </summary>
92+
/// <param name="index">Zero-based header index</param>
93+
/// <param name="name">Header name (allocated string)</param>
94+
/// <param name="value">Header value as ReadOnlySpan</param>
95+
/// <returns>True if header exists at the given index</returns>
96+
public bool TryGetHeader(int index, out string name, out ReadOnlySpan<byte> value)
97+
{
98+
name = null;
99+
value = ReadOnlySpan<byte>.Empty;
100+
101+
if (hdrsPtr == IntPtr.Zero)
102+
{
103+
Librdkafka.message_headers((IntPtr)msg, out hdrsPtr);
104+
if (hdrsPtr == IntPtr.Zero)
105+
return false;
106+
}
107+
108+
var err = Librdkafka.header_get_all(
109+
hdrsPtr,
110+
(IntPtr)index,
111+
out IntPtr namep,
112+
out IntPtr valuep,
113+
out IntPtr sizep
114+
);
115+
116+
if (err != ErrorCode.NoError)
117+
return false;
118+
119+
name = Util.Marshal.PtrToStringUTF8(namep);
120+
121+
if (valuep != IntPtr.Zero)
122+
{
123+
value = new ReadOnlySpan<byte>(valuep.ToPointer(), (int)sizep);
124+
}
125+
126+
return true;
127+
}
128+
}
129+
}

src/Confluent.Kafka/IConsumer.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ public interface IConsumer<TKey, TValue> : IClient
110110
/// </exception>
111111
ConsumeResult<TKey, TValue> Consume(TimeSpan timeout);
112112

113+
/// <summary>
114+
/// Consumes a message and invokes the callback with an allocation-free reader.
115+
/// Returns true if a message was consumed, false if timeout occurred.
116+
/// </summary>
117+
/// <param name="millisecondsTimeout">Timeout in milliseconds</param>
118+
/// <param name="callback">Callback to invoke with the message reader</param>
119+
/// <returns>True if a message was consumed, false if timeout occurred</returns>
120+
/// <exception cref="ConsumeException">Thrown when consumption fails</exception>
121+
bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback);
113122

114123
/// <summary>
115124
/// Gets the (dynamic) group member id of

src/Confluent.Kafka/IProducer.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,48 @@ void Produce(
182182
Message<TKey, TValue> message,
183183
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);
184184

185+
/// <summary>
186+
/// Asynchronously send a single message to a
187+
/// Kafka topic partition.
188+
/// </summary>
189+
/// <param name="topic">
190+
/// Topic of the message to produce to
191+
/// </param>
192+
/// <param name="val">
193+
/// Value bytes to produce
194+
/// </param>
195+
/// <param name="key">
196+
/// Key bytes to produce
197+
/// </param>
198+
/// <param name="timestamp">
199+
/// Timestamp of the message to be produced
200+
/// </param>
201+
/// <param name="partition">
202+
/// Partition of the message to produce to
203+
/// </param>
204+
/// <param name="headers">
205+
/// Optional headers of the message to be produced
206+
/// </param>
207+
/// <exception cref="ProduceException{TKey,TValue}">
208+
/// Thrown in response to any error that is known
209+
/// immediately (excluding user application logic errors),
210+
/// for example ErrorCode.Local_QueueFull.
211+
/// </exception>
212+
/// <exception cref="System.ArgumentException">
213+
/// Thrown in response to invalid argument values.
214+
/// </exception>
215+
/// <exception cref="System.InvalidOperationException">
216+
/// Thrown in response to error conditions that reflect
217+
/// an error in the application logic of the calling
218+
/// application.
219+
/// </exception>
220+
void Produce(
221+
string topic,
222+
ReadOnlySpan<byte> val,
223+
ReadOnlySpan<byte> key,
224+
Timestamp timestamp,
225+
Partition partition,
226+
Headers headers);
185227

186228
/// <summary>
187229
/// Poll for callback events.

0 commit comments

Comments
 (0)