Skip to content

Commit 208bcaf

Browse files
committed
Merge branch '1.0.x'
2 parents ce784ef + 5aac327 commit 208bcaf

File tree

3 files changed

+155
-89
lines changed

3 files changed

+155
-89
lines changed

src/ConfigGen/Program.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ internal static List<PropertySpecification> RemoveLegacyOrNotRelevant(List<Prope
4040
if (p.Name == "request.required.acks") { return false; }
4141
// legacy
4242
if (p.Name == "consume.callback.max.messages") { return false; }
43+
if (p.Name == "offset.store.method") { return false; }
4344
if (p.Name == "offset.store.path") { return false; }
4445
if (p.Name == "offset.store.sync.interval.ms") { return false; }
4546
if (p.Name == "builtin.features") { return false; }
@@ -204,7 +205,7 @@ public Acks? Acks
204205
}
205206

206207

207-
class PropertySpecification
208+
class PropertySpecification : IComparable
208209
{
209210
public PropertySpecification() {}
210211

@@ -230,6 +231,9 @@ public PropertySpecification(PropertySpecification other)
230231
public string Description { get; set; }
231232
public string Type { get; set; }
232233
public string AliasFor { get; set; }
234+
235+
public int CompareTo(object obj)
236+
=> Name.CompareTo(((PropertySpecification)obj).Name);
233237
}
234238

235239
class Program
@@ -250,7 +254,7 @@ static string parseType(string type)
250254
static string createFileHeader(string branch)
251255
{
252256
return
253-
@"// *** Auto-generated from librdkafka branch " + branch + @" *** - do not modify manually.
257+
@"// *** Auto-generated from librdkafka " + branch + @" *** - do not modify manually.
254258
//
255259
// Copyright 2018 Confluent Inc.
256260
//
@@ -588,7 +592,7 @@ static List<PropertySpecification> removeDuplicateTopicLevel(List<PropertySpecif
588592
{
589593
if (global.Count(gp => gp.Name.Equals(p.Name)) > 0) { removeTopicLevel.Add(p.Name); }
590594
}
591-
props = props.Where(p => !removeTopicLevel.Contains(p.Name)).ToList();
595+
props = topicLevel.Where(p => !removeTopicLevel.Contains(p.Name)).Concat(global).ToList();
592596
return props;
593597
}
594598

@@ -621,6 +625,13 @@ static List<PropertySpecification> choosePreferredNames(List<PropertySpecificati
621625
}).ToList();
622626
}
623627

628+
static void PrintProps(IEnumerable<PropertySpecification> props)
629+
{
630+
var props_ = props.ToArray();
631+
Array.Sort(props_);
632+
Console.WriteLine(String.Join(" ", props_.Select(p => p.Name)));
633+
}
634+
624635
static async Task<int> Main(string[] args)
625636
{
626637
if (args.Length != 1)
@@ -635,12 +646,11 @@ static async Task<int> Main(string[] args)
635646
.GetAsync(url))
636647
.Content.ReadAsStringAsync();
637648

638-
var props =
639-
choosePreferredNames(
640-
linkAliased(
641-
removeDuplicateTopicLevel(
642-
MappingConfiguration.RemoveLegacyOrNotRelevant(
643-
extractAll(configDoc)))));
649+
var props = extractAll(configDoc);
650+
var props2 = MappingConfiguration.RemoveLegacyOrNotRelevant(props);
651+
var props3 = removeDuplicateTopicLevel(props2);
652+
var props4 = props = linkAliased(props3);
653+
var props5 = choosePreferredNames(props4);
644654

645655
if (props.Count() == 0)
646656
{

src/Confluent.Kafka/Config_gen.cs

Lines changed: 111 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// *** Auto-generated from librdkafka branch v1.0.0-RC7 *** - do not modify manually.
1+
// *** Auto-generated from librdkafka branch v1.0.0 *** - do not modify manually.
22
//
33
// Copyright 2018 Confluent Inc.
44
//
@@ -24,6 +24,58 @@
2424

2525
namespace Confluent.Kafka
2626
{
27+
/// <summary>
28+
/// Partitioner enum values
29+
/// </summary>
30+
public enum Partitioner
31+
{
32+
/// <summary>
33+
/// Random
34+
/// </summary>
35+
Random,
36+
37+
/// <summary>
38+
/// Consistent
39+
/// </summary>
40+
Consistent,
41+
42+
/// <summary>
43+
/// ConsistentRandom
44+
/// </summary>
45+
ConsistentRandom,
46+
47+
/// <summary>
48+
/// Murmur2
49+
/// </summary>
50+
Murmur2,
51+
52+
/// <summary>
53+
/// Murmur2Random
54+
/// </summary>
55+
Murmur2Random
56+
}
57+
58+
/// <summary>
59+
/// AutoOffsetReset enum values
60+
/// </summary>
61+
public enum AutoOffsetReset
62+
{
63+
/// <summary>
64+
/// Latest
65+
/// </summary>
66+
Latest,
67+
68+
/// <summary>
69+
/// Earliest
70+
/// </summary>
71+
Earliest,
72+
73+
/// <summary>
74+
/// Error
75+
/// </summary>
76+
Error
77+
}
78+
2779
/// <summary>
2880
/// BrokerAddressFamily enum values
2981
/// </summary>
@@ -88,55 +140,34 @@ public enum PartitionAssignmentStrategy
88140
}
89141

90142
/// <summary>
91-
/// Partitioner enum values
143+
/// CompressionType enum values
92144
/// </summary>
93-
public enum Partitioner
145+
public enum CompressionType
94146
{
95147
/// <summary>
96-
/// Random
97-
/// </summary>
98-
Random,
99-
100-
/// <summary>
101-
/// Consistent
102-
/// </summary>
103-
Consistent,
104-
105-
/// <summary>
106-
/// ConsistentRandom
107-
/// </summary>
108-
ConsistentRandom,
109-
110-
/// <summary>
111-
/// Murmur2
148+
/// None
112149
/// </summary>
113-
Murmur2,
150+
None,
114151

115152
/// <summary>
116-
/// Murmur2Random
153+
/// Gzip
117154
/// </summary>
118-
Murmur2Random
119-
}
155+
Gzip,
120156

121-
/// <summary>
122-
/// AutoOffsetReset enum values
123-
/// </summary>
124-
public enum AutoOffsetReset
125-
{
126157
/// <summary>
127-
/// Latest
158+
/// Snappy
128159
/// </summary>
129-
Latest,
160+
Snappy,
130161

131162
/// <summary>
132-
/// Earliest
163+
/// Lz4
133164
/// </summary>
134-
Earliest,
165+
Lz4,
135166

136167
/// <summary>
137-
/// Error
168+
/// Zstd
138169
/// </summary>
139-
Error
170+
Zstd
140171
}
141172

142173
/// <summary>
@@ -416,14 +447,6 @@ public Acks? Acks
416447
/// </summary>
417448
public BrokerAddressFamily? BrokerAddressFamily { get { return (BrokerAddressFamily?)GetEnum(typeof(BrokerAddressFamily), "broker.address.family"); } set { this.SetObject("broker.address.family", value); } }
418449

419-
/// <summary>
420-
/// When enabled the client will only connect to brokers it needs to communicate with. When disabled the client will maintain connections to all brokers in the cluster.
421-
///
422-
/// default: true
423-
/// importance: medium
424-
/// </summary>
425-
public bool? EnableSparseConnections { get { return GetBool("enable.sparse.connections"); } set { this.SetObject("enable.sparse.connections", value); } }
426-
427450
/// <summary>
428451
/// The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
429452
///
@@ -745,6 +768,38 @@ public ProducerConfig() {}
745768
/// </summary>
746769
public string DeliveryReportFields { get { return Get("dotnet.producer.delivery.report.fields"); } set { this.SetObject("dotnet.producer.delivery.report.fields", value.ToString()); } }
747770

771+
/// <summary>
772+
/// The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request.required.acks` being != 0.
773+
///
774+
/// default: 5000
775+
/// importance: medium
776+
/// </summary>
777+
public int? RequestTimeoutMs { get { return GetInt("request.timeout.ms"); } set { this.SetObject("request.timeout.ms", value); } }
778+
779+
/// <summary>
780+
/// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded.
781+
///
782+
/// default: 300000
783+
/// importance: high
784+
/// </summary>
785+
public int? MessageTimeoutMs { get { return GetInt("message.timeout.ms"); } set { this.SetObject("message.timeout.ms", value); } }
786+
787+
/// <summary>
788+
/// Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).
789+
///
790+
/// default: consistent_random
791+
/// importance: high
792+
/// </summary>
793+
public Partitioner? Partitioner { get { return (Partitioner?)GetEnum(typeof(Partitioner), "partitioner"); } set { this.SetObject("partitioner", value); } }
794+
795+
/// <summary>
796+
/// Compression level parameter for algorithm selected by configuration property `compression.codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level.
797+
///
798+
/// default: -1
799+
/// importance: medium
800+
/// </summary>
801+
public int? CompressionLevel { get { return GetInt("compression.level"); } set { this.SetObject("compression.level", value); } }
802+
748803
/// <summary>
749804
/// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible.
750805
///
@@ -810,44 +865,20 @@ public ProducerConfig() {}
810865
public int? QueueBufferingBackpressureThreshold { get { return GetInt("queue.buffering.backpressure.threshold"); } set { this.SetObject("queue.buffering.backpressure.threshold", value); } }
811866

812867
/// <summary>
813-
/// Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes.
814-
///
815-
/// default: 10000
816-
/// importance: medium
817-
/// </summary>
818-
public int? BatchNumMessages { get { return GetInt("batch.num.messages"); } set { this.SetObject("batch.num.messages", value); } }
819-
820-
/// <summary>
821-
/// The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request.required.acks` being != 0.
868+
/// compression codec to use for compressing message sets. This is the default value for all topics, may be overridden by the topic configuration property `compression.codec`.
822869
///
823-
/// default: 5000
870+
/// default: none
824871
/// importance: medium
825872
/// </summary>
826-
public int? RequestTimeoutMs { get { return GetInt("request.timeout.ms"); } set { this.SetObject("request.timeout.ms", value); } }
827-
828-
/// <summary>
829-
/// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded.
830-
///
831-
/// default: 300000
832-
/// importance: high
833-
/// </summary>
834-
public int? MessageTimeoutMs { get { return GetInt("message.timeout.ms"); } set { this.SetObject("message.timeout.ms", value); } }
873+
public CompressionType? CompressionType { get { return (CompressionType?)GetEnum(typeof(CompressionType), "compression.type"); } set { this.SetObject("compression.type", value); } }
835874

836875
/// <summary>
837-
/// Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).
838-
///
839-
/// default: consistent_random
840-
/// importance: high
841-
/// </summary>
842-
public Partitioner? Partitioner { get { return (Partitioner?)GetEnum(typeof(Partitioner), "partitioner"); } set { this.SetObject("partitioner", value); } }
843-
844-
/// <summary>
845-
/// Compression level parameter for algorithm selected by configuration property `compression.codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level.
876+
/// Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes.
846877
///
847-
/// default: -1
878+
/// default: 10000
848879
/// importance: medium
849880
/// </summary>
850-
public int? CompressionLevel { get { return GetInt("compression.level"); } set { this.SetObject("compression.level", value); } }
881+
public int? BatchNumMessages { get { return GetInt("batch.num.messages"); } set { this.SetObject("batch.num.messages", value); } }
851882

852883
}
853884

@@ -888,6 +919,14 @@ public ConsumerConfig() {}
888919
/// </summary>
889920
public string ConsumeResultFields { set { this.SetObject("dotnet.consumer.consume.result.fields", value); } }
890921

922+
/// <summary>
923+
/// Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'.
924+
///
925+
/// default: largest
926+
/// importance: high
927+
/// </summary>
928+
public AutoOffsetReset? AutoOffsetReset { get { return (AutoOffsetReset?)GetEnum(typeof(AutoOffsetReset), "auto.offset.reset"); } set { this.SetObject("auto.offset.reset", value); } }
929+
891930
/// <summary>
892931
/// Client group id string. All clients sharing the same group.id belong to the same group.
893932
///
@@ -1040,14 +1079,6 @@ public ConsumerConfig() {}
10401079
/// </summary>
10411080
public bool? CheckCrcs { get { return GetBool("check.crcs"); } set { this.SetObject("check.crcs", value); } }
10421081

1043-
/// <summary>
1044-
/// Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'.
1045-
///
1046-
/// default: largest
1047-
/// importance: high
1048-
/// </summary>
1049-
public AutoOffsetReset? AutoOffsetReset { get { return (AutoOffsetReset?)GetEnum(typeof(AutoOffsetReset), "auto.offset.reset"); } set { this.SetObject("auto.offset.reset", value); } }
1050-
10511082
}
10521083

10531084
}

test/Confluent.Kafka.UnitTests/ConfigEnums.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,30 @@ public void Substituted()
7979
Assert.Equal(Partitioner.Murmur2Random, config4.Partitioner);
8080
Assert.Equal(PartitionAssignmentStrategy.RoundRobin, config5.PartitionAssignmentStrategy);
8181
}
82+
83+
[Fact]
84+
public void CompileTimeCheck()
85+
{
86+
// Set a value for every enum. This tests that ConfigGen
87+
// isn't missing any (compile time check).
88+
var pConfig = new ProducerConfig
89+
{
90+
CompressionType = CompressionType.Lz4,
91+
SecurityProtocol = SecurityProtocol.SaslSsl,
92+
Partitioner = Partitioner.Murmur2,
93+
BrokerAddressFamily = BrokerAddressFamily.V4,
94+
SaslMechanism = SaslMechanism.ScramSha256,
95+
Acks = Acks.Leader,
96+
};
97+
98+
var cConfig = new ConsumerConfig
99+
{
100+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
101+
PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range,
102+
AutoOffsetReset = AutoOffsetReset.Latest,
103+
BrokerAddressFamily = BrokerAddressFamily.V6,
104+
SaslMechanism = SaslMechanism.Plain
105+
};
106+
}
82107
}
83108
}

0 commit comments

Comments
 (0)