Skip to content

Commit 3e55abe

Browse files
author
Matt Howlett
committed
Merge branch '1.6.x'
2 parents 2b54dd2 + 1b41d3e commit 3e55abe

File tree

85 files changed

+3946
-728
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+3946
-728
lines changed

CHANGELOG.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,24 @@
1+
2+
# 1.6.1
3+
4+
## Enhancements
5+
6+
- References librdkafka.redist 1.6.1. Refer to the [1.6.0](https://github.com/edenhill/librdkafka/releases/tag/v1.6.0) and [1.6.1](https://github.com/edenhill/librdkafka/releases/tag/v1.6.1) release notes for more information. Headline features:
7+
- KIP-429: Incremental rebalancing.
8+
- KIP-447: Producer scalability for exactly once semantics.
9+
- KIP-480: Sticky partitioner.
10+
- KIP-22: Support for custom partitioners.
11+
- Confluent.Kafka can now be used with Mono on Linux and MacOS. **Note**: Mono is not a supported runtime.
12+
- The debian9-librdkafka.so build of librdkafka has been replaced with a more portable one: centos6-librdkafka.so (note: Debian 9 is still supported).
13+
- Exceptions thrown by `Producer.Produce` now include an inner exception with additional context on the error ([joostas](https://github.com/joostas)).
14+
- Added `ConfigureAwait(false)` to async methods in the Avro Serdes.
15+
- Added `IsInvalid` property to `Handle` class ([volgunin](https://github.com/volgunin)).
16+
17+
## Fixes
18+
19+
- Fixed race condition in `ProtobufSerializer` ([yurii-hunter](https://github.com/yurii-hunter)).
20+
21+
122
# 1.5.3
223

324
## Enhancements

Confluent.Kafka.sln

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Protobuf", "examples\Protob
4949
EndProject
5050
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.Kafka.SyncOverAsync", "test\Confluent.Kafka.SyncOverAsync\Confluent.Kafka.SyncOverAsync.csproj", "{07E3455B-9CDF-4124-854E-8AC9B8950B1A}"
5151
EndProject
52-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Transactions", "examples\Transactions\Transactions.csproj", "{C44F96F6-4711-4DB9-A00E-9FA6456513B9}"
52+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExactlyOnce", "examples\ExactlyOnce\ExactlyOnce.csproj", "{C44F96F6-4711-4DB9-A00E-9FA6456513B9}"
5353
EndProject
5454
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.Kafka.Transactions", "test\Confluent.Kafka.Transactions\Confluent.Kafka.Transactions.csproj", "{2A6D1D58-4F02-480E-8A9C-F2A0D0BC911C}"
5555
EndProject
@@ -61,6 +61,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.SchemaRegistry.Se
6161
EndProject
6262
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.SchemaRegistry.Serdes.Json", "src\Confluent.SchemaRegistry.Serdes.Json\Confluent.SchemaRegistry.Serdes.Json.csproj", "{98D7F3E1-80EE-437C-8915-528BFD80E9B2}"
6363
EndProject
64+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExactlyOnceOldBroker", "examples\ExactlyOnceOldBroker\ExactlyOnceOldBroker.csproj", "{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}"
65+
EndProject
6466
Global
6567
GlobalSection(SolutionConfigurationPlatforms) = preSolution
6668
Debug|Any CPU = Debug|Any CPU
@@ -386,6 +388,18 @@ Global
386388
{98D7F3E1-80EE-437C-8915-528BFD80E9B2}.Release|x64.Build.0 = Release|Any CPU
387389
{98D7F3E1-80EE-437C-8915-528BFD80E9B2}.Release|x86.ActiveCfg = Release|Any CPU
388390
{98D7F3E1-80EE-437C-8915-528BFD80E9B2}.Release|x86.Build.0 = Release|Any CPU
391+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
392+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Debug|Any CPU.Build.0 = Debug|Any CPU
393+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Debug|x64.ActiveCfg = Debug|Any CPU
394+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Debug|x64.Build.0 = Debug|Any CPU
395+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Debug|x86.ActiveCfg = Debug|Any CPU
396+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Debug|x86.Build.0 = Debug|Any CPU
397+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|Any CPU.ActiveCfg = Release|Any CPU
398+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|Any CPU.Build.0 = Release|Any CPU
399+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x64.ActiveCfg = Release|Any CPU
400+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x64.Build.0 = Release|Any CPU
401+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x86.ActiveCfg = Release|Any CPU
402+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x86.Build.0 = Release|Any CPU
389403
EndGlobalSection
390404
GlobalSection(NestedProjects) = preSolution
391405
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
@@ -414,5 +428,6 @@ Global
414428
{1B2C48B7-FD1D-4457-9E5B-80EBB7A28104} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
415429
{3BE5B540-43FC-4945-ACE5-88BB6B0D846E} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
416430
{98D7F3E1-80EE-437C-8915-528BFD80E9B2} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
431+
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
417432
EndGlobalSection
418433
EndGlobal

README.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,21 @@ confluent-kafka-dotnet is distributed via NuGet. We provide five packages:
4444
To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:
4545

4646
```
47-
Install-Package Confluent.Kafka -Version 1.5.3
47+
Install-Package Confluent.Kafka -Version 1.6.1
4848
```
4949

5050
To add a reference to a dotnet core project, execute the following at the command line:
5151

5252
```
53-
dotnet add package -v 1.5.3 Confluent.Kafka
53+
dotnet add package -v 1.6.1 Confluent.Kafka
5454
```
5555

5656
Note: `Confluent.Kafka` depends on the `librdkafka.redist` package which provides a number of different builds of `librdkafka` that are compatible with [common platforms](https://github.com/edenhill/librdkafka/wiki/librdkafka.redist-NuGet-package-runtime-libraries). If you are on one of these platforms this will all work seamlessly (and you don't need to explicitly reference `librdkafka.redist`). If you are on a different platform, you may need to [build librdkafka](https://github.com/edenhill/librdkafka#building) manually (or acquire it via other means) and load it using the [Library.Load](https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Library.html#Confluent_Kafka_Library_Load_System_String_) method.
5757

5858
### Branch builds
5959

6060
Nuget packages corresponding to all commits to release branches are available from the following nuget package source (Note: this is not a web URL - you
61-
should specify it in the nuget package manger):
61+
should specify it in the nuget package manager):
6262
[https://ci.appveyor.com/nuget/confluent-kafka-dotnet](https://ci.appveyor.com/nuget/confluent-kafka-dotnet). The version suffix of these nuget packages
6363
matches the appveyor build number. You can see which commit a particular build number corresponds to by looking at the
6464
[AppVeyor build history](https://ci.appveyor.com/project/ConfluentClientEngineering/confluent-kafka-dotnet/history)
@@ -206,6 +206,13 @@ The [Web](https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/exa
206206
Apache Kafka with a web application, including how to implement `IHostedService` to realize a long running consumer poll loop, how to
207207
register a producer as a singleton service, and how to bind configuration from an injected `IConfiguration` instance.
208208

209+
### Exactly Once Processing
210+
211+
The .NET Client has full support for transactions and idempotent message production, allowing you to write horizontally scalable stream
212+
processing applications with exactly once semantics. The [ExactlyOnce](examples/ExactlyOnce) example demonstrates this capability by way
213+
of an implementation of the classic "word count" problem, also demonstrating how to use the [FASTER](https://github.com/microsoft/FASTER)
214+
Key/Value store (similar to RocksDb) to materialize working state that may be larger than available memory, and incremental rebalancing
215+
to avoid stop-the-world rebalancing operations and unnecessary reloading of state when you add or remove processing nodes.
209216

210217
### Schema Registry Integration
211218

@@ -271,3 +278,7 @@ Instructions on building and testing confluent-kafka-dotnet can be found [here](
271278
Copyright (c)
272279
2016-2019 [Confluent Inc.](https://www.confluent.io)
273280
2015-2016 [Andreas Heider](mailto:[email protected])
281+
282+
KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use
283+
by confluent-kafka-dotnet. confluent-kafka-dotnet has no affiliation with and is not endorsed by
284+
The Apache Software Foundation.

examples/AdminClient/AdminClient.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.5.3" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.6.1" /> -->
1313
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1414
</ItemGroup>
1515

examples/AvroBlogExamples/AvroBlogExamples.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.5.3" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.6.1" /> -->
1212
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
1313
</ItemGroup>
1414

examples/AvroGeneric/AvroGeneric.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.5.3" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.6.1" /> -->
1313
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
1414
</ItemGroup>
1515

examples/AvroSpecific/AvroSpecific.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.5.3" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.6.1" /> -->
1313
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
1414
</ItemGroup>
1515

examples/ConfluentCloud/ConfluentCloud.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.5.3" /> -->
10+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.6.1" /> -->
1111
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1212
</ItemGroup>
1313

examples/Consumer/Consumer.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.5.3" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.6.1" /> -->
1212
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1313
</ItemGroup>
1414

examples/Consumer/Program.cs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
using System;
2020
using System.Collections.Generic;
2121
using System.Linq;
22-
using System.Text;
2322
using System.Threading;
24-
using System.Threading.Tasks;
2523

2624

2725
/// <summary>
@@ -46,7 +44,10 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
4644
StatisticsIntervalMs = 5000,
4745
SessionTimeoutMs = 6000,
4846
AutoOffsetReset = AutoOffsetReset.Earliest,
49-
EnablePartitionEof = true
47+
EnablePartitionEof = true,
48+
// A good introduction to the CooperativeSticky assignor and incremental rebalancing:
49+
// https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/
50+
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
5051
};
5152

5253
const int commitPeriod = 5;
@@ -62,15 +63,25 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
6263
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
6364
.SetPartitionsAssignedHandler((c, partitions) =>
6465
{
65-
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
66-
// possibly manually specify start offsets or override the partition assignment provided by
67-
// the consumer group by returning a list of topic/partition/offsets to assign to, e.g.:
68-
//
66+
// Since a cooperative assignor (CooperativeSticky) has been configured, the
67+
// partition assignment is incremental (adds partitions to any existing assignment).
68+
Console.WriteLine($"Incremental partition assignment: [{string.Join(", ", partitions)}]");
69+
70+
// Possibly manually specify start offsets by returning a list of topic/partition/offsets
71+
// to assign to, e.g.:
6972
// return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));
7073
})
7174
.SetPartitionsRevokedHandler((c, partitions) =>
7275
{
73-
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
76+
// Since a cooperative assignor (CooperativeSticky) has been configured, the revoked
77+
// assignment is incremental (may remove only some partitions of the current assignment).
78+
Console.WriteLine($"Incremental partition revokation: [{string.Join(", ", partitions)}]");
79+
})
80+
.SetPartitionsLostHandler((c, partitions) =>
81+
{
82+
// The lost partitions handler is called when the consumer detects that it has lost ownership
83+
// of its assignment (fallen out of the group).
84+
Console.WriteLine($"Partitions were lost: [{string.Join(", ", partitions)}]");
7485
})
7586
.Build())
7687
{

0 commit comments

Comments
 (0)