Skip to content
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ import (
func main() {

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
kafka.BootstrapServers: "localhost",
kafka.GroupId: "myGroup",
kafka.AutoOffSetReset: "earliest",
})

if err != nil {
Expand Down Expand Up @@ -94,7 +94,7 @@ import (

func main() {

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
p, err := kafka.NewProducer(&kafka.ConfigMap{kafka.BootstrapServers: "localhost"})
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions examples/docker_aws_lambda_example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 h1:NbOku86JJlsRJPJKE0snNsz6
github.com/confluentinc/confluent-kafka-go/v2 v2.4.0/go.mod h1:E1dEQy50ZLfqs7T9luxz0rLxaeFZJZE92XvApJOr/Rk=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y=
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0=
github.com/confluentinc/confluent-kafka-go/v2 v2.10.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves=
github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U=
github.com/containerd/containerd v1.7.12 h1:+KQsnv4VnzyxWcfO9mlxxELaoztsDEjOuCMPAuPqgU0=
Expand Down
15 changes: 9 additions & 6 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import "C"
// bool, int, string, any type with the standard String() interface
type ConfigValue interface{}

// ConfigKey is type content all the possible of configuration keys
type ConfigKey string

// ConfigMap is a map containing standard librdkafka configuration properties as documented in:
// https://github.com/confluentinc/librdkafka/tree/master/CONFIGURATION.md
//
Expand All @@ -45,7 +48,7 @@ type ConfigValue interface{}
// topic configuration properties shall be specified in the standard ConfigMap.
// For backwards compatibility, "default.topic.config" (if supplied)
// takes precedence.
type ConfigMap map[string]ConfigValue
type ConfigMap map[ConfigKey]ConfigValue

// SetKey sets configuration property key to value.
//
Expand All @@ -57,9 +60,9 @@ func (m ConfigMap) SetKey(key string, value ConfigValue) error {
if !found {
m["default.topic.config"] = ConfigMap{}
}
m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
m["default.topic.config"].(ConfigMap)[ConfigKey(strings.TrimPrefix(key, "{topic}."))] = value
} else {
m[key] = value
m[ConfigKey(key)] = value
}

return nil
Expand Down Expand Up @@ -192,7 +195,7 @@ func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
(*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))

default:
err = anyconfSet(anyconf, k, v)
err = anyconfSet(anyconf, string(k), v)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,7 +234,7 @@ func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)
}

v, ok := m[key]
v, ok := m[ConfigKey(key)]
if !ok {
return defval, nil
}
Expand All @@ -251,7 +254,7 @@ func (m ConfigMap) extract(key string, defval ConfigValue) (ConfigValue, error)
return nil, err
}

delete(m, key)
delete(m, ConfigKey(key))

return v, nil
}
Expand Down
108 changes: 108 additions & 0 deletions kafka/config_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package kafka

const (
AutoOffsetReset ConfigKey = "auto.offset.reset"
Acks ConfigKey = "acks"
AutoIncludeJmxReporter ConfigKey = "auto.include.jmx.reporter"
BatchSize ConfigKey = "batch.size"
BootstrapServers ConfigKey = "bootstrap.servers"
BufferMemory ConfigKey = "buffer.memory"
ClientDnsLookup ConfigKey = "client.dns.lookup"
ClientId ConfigKey = "client.id"
CompressionType ConfigKey = "compression.type"
CompressionGzipLevel ConfigKey = "compression.gzip.level"
CompressionLz4Level ConfigKey = "compression.lz4.level"
CompressionZstdLevel ConfigKey = "compression.zstd.level"
ConnectionsMaxIdleMs ConfigKey = "connections.max.idle.ms"
DeliveryTimeoutMs ConfigKey = "delivery.timeout.ms"
EnableAutoCommit ConfigKey = "enable.auto.commit"
EnableAutoOffset ConfigKey = "enable.auto.offset.reset"
EnableAutoOffsetStore ConfigKey = "enable.auto.offset.store"
EnableIdempotence ConfigKey = "enable.idempotence"
EnableMetricsPush ConfigKey = "enable.metrics.push"
GroupId ConfigKey = "group.id"
InterceptorClasses ConfigKey = "interceptor.classes"
KeySerializer ConfigKey = "key.serializer"
LingerMs ConfigKey = "linger.ms"
MaxBlockMs ConfigKey = "max.block.ms"
MaxInFlightRequestsPerConnection ConfigKey = "max.in.flight.requests.per.connection"
MaxRequestSize ConfigKey = "max.request.size"
MetadataMaxAgeMs ConfigKey = "metadata.max.age.ms"
MetadataMaxIdleMs ConfigKey = "metadata.max.idle.ms"
MetadataRecoveryStrategy ConfigKey = "metadata.recovery.strategy"
MetricReporters ConfigKey = "metric.reporters"
MetricsNumSamples ConfigKey = "metrics.num.samples"
MetricsRecordingLevel ConfigKey = "metrics.recording.level"
MetricsSampleWindowMs ConfigKey = "metrics.sample.window.ms"
PartitionerClass ConfigKey = "partitioner.class"
PartitionerIgnoreKeys ConfigKey = "partitioner.ignore.keys"
PartitionerAdaptivePartitioningEnable ConfigKey = "partitioner.adaptive.partitioning.enable"
PartitionerAvailabilityTimeoutMs ConfigKey = "partitioner.availability.timeout.ms"
ReceiveBufferBytes ConfigKey = "receive.buffer.bytes"
ReconnectBackoffMaxMs ConfigKey = "reconnect.backoff.max.ms"
ReconnectBackoffMs ConfigKey = "reconnect.backoff.ms"
RequestTimeoutMs ConfigKey = "request.timeout.ms"
Retries ConfigKey = "retries"
RetryBackoffMaxMs ConfigKey = "retry.backoff.max.ms"
RetryBackoffMs ConfigKey = "retry.backoff.ms"
SaslClientCallbackHandlerClass ConfigKey = "sasl.client.callback.handler.class"
SaslJaasConfig ConfigKey = "sasl.jaas.config"
SaslKerberosServiceName ConfigKey = "sasl.kerberos.service.name"
SaslKerberosKinitCmd ConfigKey = "sasl.kerberos.kinit.cmd"
SaslKerberosMinTimeBeforeRelogin ConfigKey = "sasl.kerberos.min.time.before.relogin"
SaslKerberosTicketRenewJitter ConfigKey = "sasl.kerberos.ticket.renew.jitter"
SaslKerberosTicketRenewWindowFactor ConfigKey = "sasl.kerberos.ticket.renew.window.factor"
SaslLoginCallbackHandlerClass ConfigKey = "sasl.login.callback.handler.class"
SaslLoginClass ConfigKey = "sasl.login.class"
SaslLoginConnectTimeoutMs ConfigKey = "sasl.login.connect.timeout.ms"
SaslLoginReadTimeoutMs ConfigKey = "sasl.login.read.timeout.ms"
SaslLoginRefreshBufferSeconds ConfigKey = "sasl.login.refresh.buffer.seconds"
SaslLoginRefreshMinPeriodSeconds ConfigKey = "sasl.login.refresh.min.period.seconds"
SaslLoginRefreshWindowFactor ConfigKey = "sasl.login.refresh.window.factor"
SaslLoginRefreshWindowJitter ConfigKey = "sasl.login.refresh.window.jitter"
SaslLoginRetryBackoffMaxMs ConfigKey = "sasl.login.retry.backoff.max.ms"
SaslLoginRetryBackoffMs ConfigKey = "sasl.login.retry.backoff.ms"
SaslMechanism ConfigKey = "sasl.mechanism"
SaslOauthbearerClockSkewSeconds ConfigKey = "sasl.oauthbearer.clock.skew.seconds"
SaslOauthbearerExpectedAudience ConfigKey = "sasl.oauthbearer.expected.audience"
SaslOauthbearerExpectedIssuer ConfigKey = "sasl.oauthbearer.expected.issuer"
SaslOauthbearerHeaderUrlencode ConfigKey = "sasl.oauthbearer.header.urlencode"
SaslOauthbearerIatValidationEnabled ConfigKey = "sasl.oauthbearer.iat.validation.enabled"
SaslOauthbearerJtiValidationEnabled ConfigKey = "sasl.oauthbearer.jti.validation.enabled"
SaslOauthbearerJwksEndpointUrl ConfigKey = "sasl.oauthbearer.jwks.endpoint.url"
SaslOauthbearerJwksEndpointRefreshMs ConfigKey = "sasl.oauthbearer.jwks.endpoint.refresh.ms"
SaslOauthbearerJwksEndpointRetryBackoffMaxMs ConfigKey = "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms"
SaslOauthbearerJwksEndpointRetryBackoffMs ConfigKey = "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms"
SaslOauthbearerScopeClaimName ConfigKey = "sasl.oauthbearer.scope.claim.name"
SaslOauthbearerSubClaimName ConfigKey = "sasl.oauthbearer.sub.claim.name"
SaslOauthbearerTokenEndpointUrl ConfigKey = "sasl.oauthbearer.token.endpoint.url"
SecurityProtocol ConfigKey = "security.protocol"
SecurityProviders ConfigKey = "security.providers"
SendBufferBytes ConfigKey = "send.buffer.bytes"
SessionTimeoutMs ConfigKey = "session.timeout.ms"
SocketConnectionSetupTimeoutMaxMs ConfigKey = "socket.connection.setup.timeout.max.ms"
SocketConnectionSetupTimeoutMs ConfigKey = "socket.connection.setup.timeout.ms"
SocketTimeoutMs ConfigKey = "socket.timeout.ms"
SslCipherSuites ConfigKey = "ssl.cipher.suites"
SslEnabledProtocols ConfigKey = "ssl.enabled.protocols"
SslEndpointIdentificationAlgorithm ConfigKey = "ssl.endpoint.identification.algorithm"
SslEngineFactoryClass ConfigKey = "ssl.engine.factory.class"
SslKeyPassword ConfigKey = "ssl.key.password"
SslKeymanagerAlgorithm ConfigKey = "ssl.keymanager.algorithm"
SslKeystoreCertificateChain ConfigKey = "ssl.keystore.certificate.chain"
SslKeystoreKey ConfigKey = "ssl.keystore.key"
SslKeystoreLocation ConfigKey = "ssl.keystore.location"
SslKeystorePassword ConfigKey = "ssl.keystore.password"
SslKeystoreType ConfigKey = "ssl.keystore.type"
SslProtocol ConfigKey = "ssl.protocol"
SslProvider ConfigKey = "ssl.provider"
SslSecureRandomImplementation ConfigKey = "ssl.secure.random.implementation"
SslTrustmanagerAlgorithm ConfigKey = "ssl.trustmanager.algorithm"
SslTruststoreCertificates ConfigKey = "ssl.truststore.certificates"
SslTruststoreLocation ConfigKey = "ssl.truststore.location"
SslTruststorePassword ConfigKey = "ssl.truststore.password"
SslTruststoreType ConfigKey = "ssl.truststore.type"
TransactionTimeoutMs ConfigKey = "transaction.timeout.ms"
TransactionalId ConfigKey = "transactional.id"
ValueSerializer ConfigKey = "value.serializer"
)
14 changes: 7 additions & 7 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func TestConsumerAPIs(t *testing.T) {
}

c, err = NewConsumer(&ConfigMap{
"group.id": "gotest",
"socket.timeout.ms": 10,
"session.timeout.ms": 10,
"enable.auto.offset.store": false, // permit StoreOffsets()
GroupId: "gotest",
SocketTimeoutMs: 10,
SessionTimeoutMs: 10,
EnableAutoOffsetStore: false, // permit StoreOffsets()
})
if err != nil {
t.Fatalf("%s", err)
Expand Down Expand Up @@ -621,9 +621,9 @@ func TestConsumerCloseForStaticMember(t *testing.T) {
var revokedEvents4 int32

conf1 := ConfigMap{
"bootstrap.servers": broker,
"group.id": "rebalance",
"session.timeout.ms": "6000",
BootstrapServers: broker,
GroupId: "rebalance",
SessionTimeoutMs: "6000",
"max.poll.interval.ms": "10000",
"group.instance.id": "staticmember1",
}
Expand Down
4 changes: 2 additions & 2 deletions kafka/testhelpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func testNewConsumer(t *testing.T, conf *ConfigMap) (*Consumer, error) {
"heartbeat.interval.ms",
"group.protocol.type"}
for _, prop := range forbiddenProperties {
if _, ok := (*conf)[prop]; !ok {
if _, ok := (*conf)[ConfigKey(prop)]; !ok {
continue
}
t.Logf(
"Skipping setting forbidden configuration property \"%s\" for CONSUMER protocol",
prop)
delete(*conf, prop)
delete(*conf, ConfigKey(prop))
}
}
return NewConsumer(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func main() {
if len(x) != 2 {
panic("-X expects a ,-separated list of confprop=val pairs")
}
conf[x[0]] = x[1]
conf[kafka.ConfigKey(x[0])] = x[1]
}
}
fmt.Println("SerializerConfig: ", conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func main() {
if len(x) != 2 {
panic("-X expects a ,-separated list of confprop=val pairs")
}
conf[x[0]] = x[1]
conf[kafka.ConfigKey(x[0])] = x[1]
}
}
fmt.Println("SerializerConfig: ", conf)
Expand Down