diff --git a/README.md b/README.md index fdc6ed95..1da1dc3f 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,8 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th - `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`) - `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`. - `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`. +- `KAFKA_BATCH_SIZE`: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead, defaults to `1000000`. +- `KAFKA_LINGER_MS`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches, defaults to `5`. - `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`. - `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin). - `BASIC_AUTH_USERNAME`: basic auth username to be used for receive endpoint, defaults is no basic auth. diff --git a/config.go b/config.go index 1b4da23c..f13f97ad 100644 --- a/config.go +++ b/config.go @@ -37,6 +37,8 @@ var ( basicauthPassword = "" kafkaCompression = "none" kafkaBatchNumMessages = "10000" + kafkaBatchSize = "1000000" + kafkaLingerMs = "5" kafkaSslClientCertFile = "" kafkaSslClientKeyFile = "" kafkaSslClientKeyPass = "" @@ -82,6 +84,14 @@ func init() { kafkaBatchNumMessages = value } + if value := os.Getenv("KAFKA_BATCH_SIZE"); value != "" { + kafkaBatchSize = value + } + + if value := os.Getenv("KAFKA_LINGER_MS"); value != "" { + kafkaLingerMs = value + } + if value := os.Getenv("KAFKA_SSL_CLIENT_CERT_FILE"); value != "" { kafkaSslClientCertFile = value } @@ -99,7 +109,7 @@ func init() { } if value := os.Getenv("KAFKA_SECURITY_PROTOCOL"); value != "" { - kafkaSecurityProtocol = strings.ToLower(value) + kafkaSecurityProtocol = value } if value := os.Getenv("KAFKA_SASL_MECHANISM"); value != "" { diff --git a/handlers.go b/handlers.go index 28d22225..08f0d0b2 100644 --- a/handlers.go +++ b/handlers.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "net/http" + "time" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -75,7 +76,36 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin Value: metric, }, nil) + go func() { + for event := range producer.Events() { + switch ev := event.(type) { + case *kafka.Message: + message := ev + if message.TopicPartition.Error != nil { + logrus.WithError(message.TopicPartition.Error).Errorf("failed to deliver message: %v", + message.TopicPartition) + } else { + logrus.Debugf("delivered to topic %s [%d] at offset %v", + *message.TopicPartition.Topic, + message.TopicPartition.Partition, + message.TopicPartition.Offset) + } + case kafka.Error: + logrus.WithError(err).Errorf("Error: %v", ev) + default: + logrus.Infof("Ignored event: %s", ev) + } + } + }() + if err != nil { + if err.(kafka.Error).Code() == kafka.ErrQueueFull { + // Producer queue is full, wait 1s for messages to delivered + // Maybe we should fail fast? As we are losing data... + logrus.Warning("producer queue is full, waiting 1s") + time.Sleep(time.Second) + } + objectsFailed.Add(float64(1)) c.AbortWithStatus(http.StatusInternalServerError) logrus.WithError(err).Debug(fmt.Sprintf("Failing metric %v", metric)) diff --git a/main.go b/main.go index 3553be6d..4e7fb324 100644 --- a/main.go +++ b/main.go @@ -28,12 +28,15 @@ func main() { logrus.Info("creating kafka producer") kafkaConfig := kafka.ConfigMap{ - "bootstrap.servers": kafkaBrokerList, - "compression.codec": kafkaCompression, - "batch.num.messages": kafkaBatchNumMessages, - "go.batch.producer": true, // Enable batch producer (for increased performance). - "go.delivery.reports": false, // per-message delivery reports to the Events() channel - "acks": kafkaAcks, + "bootstrap.servers": kafkaBrokerList, + "compression.codec": kafkaCompression, + "batch.num.messages": kafkaBatchNumMessages, + "batch.size": kafkaBatchSize, + "linger.ms": kafkaLingerMs, + "go.batch.producer": true, // Enable batch producer (for increased performance). + "go.delivery.reports": true, // per-message delivery reports to the Events() channel + "go.logs.channel.enable": true, + "acks": kafkaAcks, } if kafkaSslClientCertFile != "" && kafkaSslClientKeyFile != "" && kafkaSslCACertFile != "" { @@ -41,7 +44,7 @@ func main() { kafkaSecurityProtocol = "ssl" } - if kafkaSecurityProtocol != "ssl" && kafkaSecurityProtocol != "sasl_ssl" { + if kafkaSecurityProtocol != "ssl" && kafkaSecurityProtocol != "SASL_SSL" { logrus.Fatal("invalid config: kafka security protocol is not ssl based but ssl config is provided") } @@ -53,7 +56,7 @@ func main() { } if kafkaSaslMechanism != "" && kafkaSaslUsername != "" && kafkaSaslPassword != "" { - if kafkaSecurityProtocol != "sasl_ssl" && kafkaSecurityProtocol != "sasl_plaintext" { + if kafkaSecurityProtocol != "SASL_SSL" && kafkaSecurityProtocol != "SASL_PLAINTEXT" { logrus.Fatal("invalid config: kafka security protocol is not sasl based but sasl config is provided") }