diff --git a/test/v2.13/messaging/kafka.json b/test/v2.13/messaging/kafka.json new file mode 100644 index 0000000..af8b060 --- /dev/null +++ b/test/v2.13/messaging/kafka.json @@ -0,0 +1,238 @@ +{ + "description": "Full JSON Schema test", + "target": "https://www.krakend.io/schema/v2.13/krakend.json", + "tests": [ + { + "description": "Workflow test case", + "data": { + "version": 3, + "debug_endpoint": true, + "host": [ + "http://localhost:8080" + ], + "async_agent": [ + { + "name": "stocksconsumer", + "extra_config": { + "async/kafka": { + "group_id": "my_group_id", + "topics": [ + "stockprice" + ], + "connection": { + "brokers": [ + "localhost:9092" + ], + "client_id": "cid_stocksconsumer" + }, + "consumer": {} + } + }, + "connection": { + "max_retries": 2, + "backoff_strategy": "linear", + "health_interval": "30s" + }, + "consumer": { + "topic": "*", + "workers": 1, + "timeout": "1s" + }, + "backend": [ + { + "url_pattern": "/__debug/stocksconsumer/received" + } + ] + }, + { + "name": "portfolioupdatesconsumer", + "extra_config": { + "async/kafka": { + "group_id": "k_async_updates", + "topics": [ + "portfolioupdates" + ], + "connection": { + "brokers": [ + "localhost:49092" + ], + "client_id": "cid_portfolioupdateconsumer", + "client_tls": { + "allow_insecure_connections": false, + "ca_certs": [ + "./config/ca/cacert.pem" + ], + "client_certs": [ + { + "certificate": "./config/certs/client/client.signed.pem", + "private_key": "./config/certs/client/client.key" + } + ] + } + }, + "consumer": {} + } + }, + "connection": { + "max_retries": 2, + "backoff_strategy": "linear", + "health_interval": "30s" + }, + "consumer": { + "topic": "*", + "workers": 1, + "timeout": "1s" + }, + "backend": [ + { + "url_pattern": "/__debug/portfolioupdates/received" + } + ] + } + ], + "endpoints": [ + { + "endpoint": "/portfolio/order", + "method": "POST", + "input_headers": [ + "X-Idempotency-Key", + "Some-Meta" + ], + "backend": [ + { + "url_pattern": "/_publisher/", + "extra_config": { + "backend/pubsub/publisher/kafka": { + "success_status_code": 201, + "writer": { + "topic": "orderplacement", + "key_meta": "X-Idempotency-Key", + "connection": { + "brokers": [ + "localhost:49092" + ], + "client_id": "krakend_async_agent", + "client_tls": { + "allow_insecure_connections": false, + "ca_certs": [ + "./config/ca/cacert.pem" + ], + "client_certs": [ + { + "certificate": "./config/certs/client/client.signed.pem", + "private_key": "./config/certs/client/client.key" + } + ] + } + }, + "producer": { + "idempotent": true + } + } + } + } + } + ] + }, + { + "endpoint": "/portfolio/order", + "method": "GET", + "backend": [ + { + "url_pattern": "/_consumer/", + "extra_config": { + "backend/pubsub/subscriber/kafka": { + "reader": { + "topics": [ + "orderplacement" + ], + "key_meta": "X-Idempotency-Key", + "group_id": "k_endpoint_read", + "connection": { + "brokers": [ + "localhost:49092" + ], + "client_id": "krakend_async_agent", + "client_tls": { + "allow_insecure_connections": false, + "ca_certs": [ + "./config/ca/cacert.pem" + ], + "client_certs": [ + { + "certificate": "./config/certs/client/client.signed.pem", + "private_key": "./config/certs/client/client.key" + } + ] + } + }, + "consumer": { + "isolation_level": "read_commited" + } + } + } + } + } + ] + } + ], + "extra_config": { + "telemetry/opentelemetry": { + "service_name": "krakend_prometheus_service", + "metric_reporting_period": 1, + "trace_sample_rate": 1, + "exporters": { + "otlp": [ + { + "disable_metrics": true, + "disable_traces": false, + "host": "localhost", + "name": "tempo", + "port": 54317, + "use_http": false + } + ], + "prometheus": [ + { + "name": "local_prometheus", + "port": 9099, + "process_metrics": true, + "go_metrics": true + } + ] + }, + "layers": { + "global": { + "disable_metrics": false, + "disable_propagation": false, + "disable_traces": false, + "report_headers": true + }, + "proxy": { + "disable_metrics": false, + "disable_traces": true, + "report_headers": true + }, + "backend": { + "metrics": { + "detailed_connection": true, + "disable_stage": false, + "read_payload": false, + "round_trip": false + }, + "traces": { + "detailed_connection": false, + "disable_stage": false, + "read_payload": false, + "report_headers": false, + "round_trip": false + } + } + } + } + } + }, + "valid": true + } + ] +} diff --git a/v2.13/async_agent.json b/v2.13/async_agent.json index 33009f2..fdc1c2f 100644 --- a/v2.13/async_agent.json +++ b/v2.13/async_agent.json @@ -84,12 +84,24 @@ }, "extra_config": { "description": "Defines the driver that connects to your queue or PubSub system. In addition, you can place other middlewares to modify the request (message) or the response, apply logic or any other endpoint middleware, but adding the driver is mandatory.\n\nSee: https://www.krakend.io/docs/async/", - "required": [ "async/amqp" ], + "oneOf": [ + { + "required": [ "async/amqp" ] + }, + { + "required": [ "async/kafka" ] + } + ], "properties": { "async/amqp": { "title": "Async Agent extra configuration", "description": "[See the configuration for async/amqp](https://www.krakend.io/docs/async/amqp/)", "$ref": "async/amqp.json" + }, + "async/kafka": { + "title": "Kafka driver", + "description": "config details to consume from kafka topics", + "$ref": "messaging/kafka/reader.json" } } }, diff --git a/v2.13/backend/pubsub/kafka/publisher.json b/v2.13/backend/pubsub/kafka/publisher.json new file mode 100644 index 0000000..65ad035 --- /dev/null +++ b/v2.13/backend/pubsub/kafka/publisher.json @@ -0,0 +1,28 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/backend/pubsub/kafka/publisher.json", + "title": "PubSub Kafka Publisher EE", + "description": "Enterprise only. Allows for fine grained control over a kafka publishing connection", + "type": "object", + "required": [ "writer" ], + "properties": { + "success_status_code": { + "title": "Success Status Code", + "description": "HTTP status code to return for a successful write in the queue", + "default": 200, + "type": "number", + "maximum": 201, + "minimum": 200 + }, + "writer": { + "title": "Writer", + "description": "Define how to write messages to a kafka topic", + "$ref": "../../../messaging/kafka/writer.json", + "type": "object" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/backend/pubsub/kafka/subscriber.json b/v2.13/backend/pubsub/kafka/subscriber.json new file mode 100644 index 0000000..ee41fca --- /dev/null +++ b/v2.13/backend/pubsub/kafka/subscriber.json @@ -0,0 +1,20 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/backend/pubsub/kafka/subscriber.json", + "title": "PubSub Kafka Subscriber EE", + "description": "Enterprise only. Allows for fine grained control over a kafka subcription connection", + "type": "object", + "required": [ "reader" ], + "properties": { + "reader": { + "title": "Reader", + "description": "Define how to read messages from a kafka topic", + "$ref": "../../../messaging/kafka/reader.json", + "type": "object" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/backend_extra_config.json b/v2.13/backend_extra_config.json index 3854211..db5ee82 100644 --- a/v2.13/backend_extra_config.json +++ b/v2.13/backend_extra_config.json @@ -68,6 +68,12 @@ "backend/pubsub/subscriber": { "$ref": "backend/pubsub/subscriber.json" }, + "backend/pubsub/publisher/kafka": { + "$ref": "backend/pubsub/kafka/publisher.json" + }, + "backend/pubsub/subscriber/kafka": { + "$ref": "backend/pubsub/kafka/subscriber.json" + }, "backend/soap": { "$ref": "backend/soap.json" }, diff --git a/v2.13/krakend.json b/v2.13/krakend.json index 06dd7b8..87dd5fa 100644 --- a/v2.13/krakend.json +++ b/v2.13/krakend.json @@ -1577,6 +1577,266 @@ }, "additionalProperties": false }, + "https://www.krakend.io/schema/v2.13/messaging/kafka/sasl.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "Kafka SASL", + "description": "Enterprise only. SASL base authentication with broker: there are multiple SASL authentication methods but the current implementation is limited to plaintext (SASL/PLAIN) authentication", + "type": "object", + "properties": { + "mechanism": { + "title": "SASL Mechanism", + "description": "Name of the enabled SASL mechanism", + "default": "PLAIN", + "enum": [ "PLAIN", "OAUTHBEARER" ] + }, + "azure_event_hub": { + "title": "Connect to Azure Event Hub", + "description": "Kafka > 1.x should use SASL V1, except on Azure EventHub which uses V0", + "default": false, + "type": "boolean" + }, + "disable_hanshake": { + "title": "Disable Handshake", + "description": "Whether or not to send the Kafka SASL handshake first if enabled. You should only set this to false if you're using a non-Kafka SASL proxy", + "default": true, + "type": "boolean" + }, + "auth_identity": { + "title": "Auth Identity", + "description": "Auth Identity is an (optional) authorization identity (authzid) to use for SASL/PLAIN authentication (if different from User) when an authenticated user is permitted to act as the presented alternative user. See RFC4616 for details", + "type": "string" + }, + "user": { + "title": "User", + "description": "Authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Password for SASL/PLAIN authentication", + "type": "string" + }, + "scram_auth_id": { + "title": "SCRAM Auth ID", + "description": "Authz id used for SASL/SCRAM authentication", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, + "https://www.krakend.io/schema/v2.13/messaging/kafka/connection.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "Kafka Reader Connection", + "description": "Enterprise only. Defines how to connect to a Kafka cluster", + "type": "object", + "required": [ "brokers" ], + "properties": { + "brokers": { + "title": "Brokers available to connect to", + "description": "", + "type": "array", + "items": { + "type": "string" + } + }, + "client_tls": { + "title": "TLS client options", + "description": "Enables specific TLS connection options when connecting to the Kafke brokers. Supports all options under [TLS client settings](https://www.krakend.io/docs/service-settings/tls/#client-tls-settings).\n\nSee: https://www.krakend.io/docs/enterprise/backends/grpc/", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1client_tls.json", + "type": "object" + }, + "sasl": { + "title": "SASL Connection options", + "description": "Enables to authenticate to the Kafka service using user and password", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1sasl.json", + "type": "object" + }, + "dial_timeout": { + "title": "Dial Timeout", + "description": "Dial timeout for establishing new connections", + "default": "30s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "read_timeout": { + "title": "Read Timeout", + "description": "Maximum time allowed to read from broker", + "default": "30s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "write_timeout": { + "title": "Write Timeout", + "description": "Maximum time allowed to read from broker", + "default": "30s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "keep_alive": { + "title": "Keep Alive", + "description": "Time to maintain an idle connection open", + "default": "0s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "client_id": { + "title": "Client ID", + "description": "A name to give to the client stablishing the connection", + "default": "KrakenD v[X].[Y].[Z]", + "type": "string" + }, + "rack_id": { + "title": "Rack ID", + "description": "A name to identify the rack we are connecting from", + "default": "", + "type": "string" + }, + "channel_buffer_size": { + "title": "Channel Buffer Size", + "description": "The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughput", + "default": 256, + "type": "number", + "minimum": 0 + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, + "https://www.krakend.io/schema/v2.13/messaging/kafka/producer.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "Kafka Producer Details", + "description": "Enterprise only. Defines details about how to write messages to a Kafka cluster", + "type": "object", + "properties": { + "max_message_bytes": { + "title": "Max Message Bytes", + "description": "Maximum permitted size of a message. Should be set equal to or smaller than the broker's `message.max.bytes`.", + "type": "number", + "minimum": 0 + }, + "required_acks": { + "title": "Required Acks", + "description": "Level of acknowledgement reliability needed from the broker. Equivalent to the `request.required.acks` setting of the JVM producer. Can be a positibe number (as a string), or one of hte following values: `no_response` (no required acks), `wait_for_local` (waits for only the local commit to succeed before responding), `wait_for_all` (waits for all in-sync replicas to commit before responding).", + "default": "wait_for_local", + "type": "string" + }, + "required_acks_timeout": { + "title": "Required Acks Timeout", + "description": "Maximum duration the broker will wait the receipt of the number of `required_acks`. This is only relevant when `required_acks` is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.", + "default": "10s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "compression_codec": { + "title": "Compression Codec", + "description": "Type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.", + "default": "none", + "enum": [ "none", "gzip", "snappy", "lz4", "zstd" ] + }, + "compression_level": { + "title": "Compression Level", + "description": "Level of compression to use on messages. The meaning depends on the actual compression type used and defaults to default compression level for the codec.", + "type": "string" + }, + "partitioner": { + "title": "Partitioner", + "description": "Select behaviour for choosing the partition to send messages (similar to the `partitioner.class` setting for the JVM producer). The options are:\n- `sarama`: **DEPRECATED** uses a Partitioner which behaves as follows: If the message's key is nil then a random partition is chosen. Otherwise the `FNV-1a` hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.\n- `standard` is like `sarama` except that it handles absolute values in the same way as the reference Java implementation. `sarama` was supposed to do that but it had a mistake and now there are people depending on both behaviours. This will all go away on the next major version bump.\n- `random` uses a Partitioner which chooses a random partition each time.\n- `roundrobin` uses a Partitioner which walks through the available partitions one at a time.", + "default": "standard", + "enum": [ "sarama", "standard", "random", "roundrobin" ] + }, + "idempotent": { + "title": "Idempotent", + "description": "If enabled, the producer will ensure that exactly one copy of each message is written", + "default": false, + "type": "boolean" + }, + "retry_max": { + "title": "Retry Max", + "description": "The total number of times to retry sending a message. Similar to the `message.send.max.retries` setting of the JVM producer.", + "default": 3, + "type": "number", + "minimum": 0 + }, + "retry_backoff": { + "title": "Retry Backoff", + "description": "How long to wait for the cluster to settle between retries (similar to the `retry.backoff.ms` setting of the JVM producer.", + "default": "100ms", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, + "https://www.krakend.io/schema/v2.13/messaging/kafka/writer.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "Kafka Writer", + "description": "Enterprise only. Defines how to write messages to a Kafka cluster", + "type": "object", + "required": [ "connection", "topic" ], + "properties": { + "connection": { + "title": "Connection details", + "description": "Settings to establish the connection to the kafka service", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1connection.json", + "type": "object" + }, + "producer": { + "title": "Producer options", + "description": "Details about how to write to a topic", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1producer.json", + "type": "object" + }, + "topic": { + "title": "Topic", + "description": "Topic to write to", + "type": "string" + }, + "key_meta": { + "title": "Key Meta Name", + "description": "Name of the header where the kafka message key value is written", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, + "https://www.krakend.io/schema/v2.13/backend/pubsub/kafka/publisher.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "PubSub Kafka Publisher EE", + "description": "Enterprise only. Allows for fine grained control over a kafka publishing connection", + "type": "object", + "required": [ "writer" ], + "properties": { + "success_status_code": { + "title": "Success Status Code", + "description": "HTTP status code to return for a successful write in the queue", + "default": 200, + "type": "number", + "maximum": 201, + "minimum": 200 + }, + "writer": { + "title": "Writer", + "description": "Define how to write messages to a kafka topic", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1writer.json", + "type": "object" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, "https://www.krakend.io/schema/v2.13/backend/pubsub/subscriber.json": { "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "Pubsub subscriber", @@ -1595,6 +1855,125 @@ }, "additionalProperties": false }, + "https://www.krakend.io/schema/v2.13/messaging/kafka/consumer.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "Kafka Consumer Details", + "description": "Enterprise only. Defines details for reading messages from a Kafka cluster", + "type": "object", + "properties": { + "group_session_timeout": { + "description": "Timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by `group.min.session.timeout.ms` and `group.max.session.timeout.ms`", + "default": "10s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "group_heartbeat_interval": { + "title": "Grup Heartbeat Interval", + "description": "Expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances", + "default": "3s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "group_rebalance_strategies": { + "title": "Group Rebalance Strategies", + "description": "Priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader. Options are: `range`, `roundrobin`, and `sticky`", + "default": [ "range" ], + "type": "array", + "items": { + "enum": [ "range", "roundrobin", "sticky" ] + } + }, + "group_rebalance_timeout": { + "title": "Group Rebalance Timeout", + "description": "Maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.", + "default": "60s", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1timeunits.json/$defs/timeunit", + "type": "string" + }, + "group_instance_id": { + "title": "Group Instance ID", + "description": "Support KIP-345", + "type": "string" + }, + "fetch_default": { + "title": "Fetch Default", + "description": "The default number of message bytes to fetch from the broker in each request (default 1MB). This should be larger than the majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM's `fetch.message.max.bytes`", + "type": "number" + }, + "isolation_level": { + "title": "Isolation Level", + "description": "Supports 2 modes: `read_commited` to consume and return all messages in message channel, and `read_uncommited` to hide messages that are part of an aborted transaction", + "default": "read_commited", + "enum": [ "read_commited", "read_uncommited" ] + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, + "https://www.krakend.io/schema/v2.13/messaging/kafka/reader.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "Kafka Reader", + "description": "Enterprise only. Defines how to read messages from a Kafka cluster", + "type": "object", + "required": [ "connection", "topics" ], + "properties": { + "connection": { + "title": "Connection details", + "description": "Settings to stablish the connection to the kafka services", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1connection.json", + "type": "object" + }, + "consumer": { + "title": "Consumer options", + "description": "Details about how to read from the topic", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1consumer.json", + "type": "object" + }, + "topics": { + "title": "Topics", + "description": "List of topics to read from", + "type": "array", + "items": { + "type": "string" + } + }, + "group_id": { + "title": "Consumer Group ID", + "description": "Name of the consumer group to use", + "type": "string" + }, + "key_meta": { + "title": "Key Meta Name", + "description": "Name of the header where the kafka message key value is written", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, + "https://www.krakend.io/schema/v2.13/backend/pubsub/kafka/subscriber.json": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "PubSub Kafka Subscriber EE", + "description": "Enterprise only. Allows for fine grained control over a kafka subcription connection", + "type": "object", + "required": [ "reader" ], + "properties": { + "reader": { + "title": "Reader", + "description": "Define how to read messages from a kafka topic", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1reader.json", + "type": "object" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false + }, "https://www.krakend.io/schema/v2.13/backend/soap.json": { "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "SOAP Template modifier", @@ -3719,6 +4098,12 @@ "backend/pubsub/subscriber": { "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1backend~1pubsub~1subscriber.json" }, + "backend/pubsub/publisher/kafka": { + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1backend~1pubsub~1kafka~1publisher.json" + }, + "backend/pubsub/subscriber/kafka": { + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1backend~1pubsub~1kafka~1subscriber.json" + }, "backend/soap": { "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1backend~1soap.json" }, @@ -4307,12 +4692,24 @@ }, "extra_config": { "description": "Defines the driver that connects to your queue or PubSub system. In addition, you can place other middlewares to modify the request (message) or the response, apply logic or any other endpoint middleware, but adding the driver is mandatory.\n\nSee: https://www.krakend.io/docs/async/", - "required": [ "async/amqp" ], + "oneOf": [ + { + "required": [ "async/amqp" ] + }, + { + "required": [ "async/kafka" ] + } + ], "properties": { "async/amqp": { "title": "Async Agent extra configuration", "description": "[See the configuration for async/amqp](https://www.krakend.io/docs/async/amqp/)", "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1async~1amqp.json" + }, + "async/kafka": { + "title": "Kafka driver", + "description": "config details to consume from kafka topics", + "$ref": "#/$defs/https:~1~1www.krakend.io~1schema~1v2.13~1messaging~1kafka~1reader.json" } } }, diff --git a/v2.13/messaging/kafka/connection.json b/v2.13/messaging/kafka/connection.json new file mode 100644 index 0000000..0daf3d1 --- /dev/null +++ b/v2.13/messaging/kafka/connection.json @@ -0,0 +1,81 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/messaging/kafka/connection.json", + "title": "Kafka Reader Connection", + "description": "Enterprise only. Defines how to connect to a Kafka cluster", + "type": "object", + "required": [ "brokers" ], + "properties": { + "brokers": { + "title": "Brokers available to connect to", + "description": "", + "type": "array", + "items": { + "type": "string" + } + }, + "client_tls": { + "title": "TLS client options", + "description": "Enables specific TLS connection options when connecting to the Kafke brokers. Supports all options under [TLS client settings](https://www.krakend.io/docs/service-settings/tls/#client-tls-settings).\n\nSee: https://www.krakend.io/docs/enterprise/backends/grpc/", + "$ref": "../../client_tls.json", + "type": "object" + }, + "sasl": { + "title": "SASL Connection options", + "description": "Enables to authenticate to the Kafka service using user and password", + "$ref": "./sasl.json", + "type": "object" + }, + "dial_timeout": { + "title": "Dial Timeout", + "description": "Dial timeout for establishing new connections", + "default": "30s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "read_timeout": { + "title": "Read Timeout", + "description": "Maximum time allowed to read from broker", + "default": "30s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "write_timeout": { + "title": "Write Timeout", + "description": "Maximum time allowed to read from broker", + "default": "30s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "keep_alive": { + "title": "Keep Alive", + "description": "Time to maintain an idle connection open", + "default": "0s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "client_id": { + "title": "Client ID", + "description": "A name to give to the client stablishing the connection", + "default": "KrakenD v[X].[Y].[Z]", + "type": "string" + }, + "rack_id": { + "title": "Rack ID", + "description": "A name to identify the rack we are connecting from", + "default": "", + "type": "string" + }, + "channel_buffer_size": { + "title": "Channel Buffer Size", + "description": "The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughput", + "default": 256, + "type": "number", + "minimum": 0 + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/messaging/kafka/consumer.json b/v2.13/messaging/kafka/consumer.json new file mode 100644 index 0000000..80ba74e --- /dev/null +++ b/v2.13/messaging/kafka/consumer.json @@ -0,0 +1,58 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/messaging/kafka/consumer.json", + "title": "Kafka Consumer Details", + "description": "Enterprise only. Defines details for reading messages from a Kafka cluster", + "type": "object", + "properties": { + "group_session_timeout": { + "description": "Timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by `group.min.session.timeout.ms` and `group.max.session.timeout.ms`", + "default": "10s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "group_heartbeat_interval": { + "title": "Grup Heartbeat Interval", + "description": "Expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances", + "default": "3s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "group_rebalance_strategies": { + "title": "Group Rebalance Strategies", + "description": "Priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader. Options are: `range`, `roundrobin`, and `sticky`", + "default": [ "range" ], + "type": "array", + "items": { + "enum": [ "range", "roundrobin", "sticky" ] + } + }, + "group_rebalance_timeout": { + "title": "Group Rebalance Timeout", + "description": "Maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.", + "default": "60s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "group_instance_id": { + "title": "Group Instance ID", + "description": "Support KIP-345", + "type": "string" + }, + "fetch_default": { + "title": "Fetch Default", + "description": "The default number of message bytes to fetch from the broker in each request (default 1MB). This should be larger than the majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM's `fetch.message.max.bytes`", + "type": "number" + }, + "isolation_level": { + "title": "Isolation Level", + "description": "Supports 2 modes: `read_commited` to consume and return all messages in message channel, and `read_uncommited` to hide messages that are part of an aborted transaction", + "default": "read_commited", + "enum": [ "read_commited", "read_uncommited" ] + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/messaging/kafka/producer.json b/v2.13/messaging/kafka/producer.json new file mode 100644 index 0000000..7c36ec6 --- /dev/null +++ b/v2.13/messaging/kafka/producer.json @@ -0,0 +1,69 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/messaging/kafka/producer.json", + "title": "Kafka Producer Details", + "description": "Enterprise only. Defines details about how to write messages to a Kafka cluster", + "type": "object", + "properties": { + "max_message_bytes": { + "title": "Max Message Bytes", + "description": "Maximum permitted size of a message. Should be set equal to or smaller than the broker's `message.max.bytes`.", + "type": "number", + "minimum": 0 + }, + "required_acks": { + "title": "Required Acks", + "description": "Level of acknowledgement reliability needed from the broker. Equivalent to the `request.required.acks` setting of the JVM producer. Can be a positibe number (as a string), or one of hte following values: `no_response` (no required acks), `wait_for_local` (waits for only the local commit to succeed before responding), `wait_for_all` (waits for all in-sync replicas to commit before responding).", + "default": "wait_for_local", + "type": "string" + }, + "required_acks_timeout": { + "title": "Required Acks Timeout", + "description": "Maximum duration the broker will wait the receipt of the number of `required_acks`. This is only relevant when `required_acks` is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.", + "default": "10s", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + }, + "compression_codec": { + "title": "Compression Codec", + "description": "Type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.", + "default": "none", + "enum": [ "none", "gzip", "snappy", "lz4", "zstd" ] + }, + "compression_level": { + "title": "Compression Level", + "description": "Level of compression to use on messages. The meaning depends on the actual compression type used and defaults to default compression level for the codec.", + "type": "string" + }, + "partitioner": { + "title": "Partitioner", + "description": "Select behaviour for choosing the partition to send messages (similar to the `partitioner.class` setting for the JVM producer). The options are:\n- `sarama`: **DEPRECATED** uses a Partitioner which behaves as follows: If the message's key is nil then a random partition is chosen. Otherwise the `FNV-1a` hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.\n- `standard` is like `sarama` except that it handles absolute values in the same way as the reference Java implementation. `sarama` was supposed to do that but it had a mistake and now there are people depending on both behaviours. This will all go away on the next major version bump.\n- `random` uses a Partitioner which chooses a random partition each time.\n- `roundrobin` uses a Partitioner which walks through the available partitions one at a time.", + "default": "standard", + "enum": [ "sarama", "standard", "random", "roundrobin" ] + }, + "idempotent": { + "title": "Idempotent", + "description": "If enabled, the producer will ensure that exactly one copy of each message is written", + "default": false, + "type": "boolean" + }, + "retry_max": { + "title": "Retry Max", + "description": "The total number of times to retry sending a message. Similar to the `message.send.max.retries` setting of the JVM producer.", + "default": 3, + "type": "number", + "minimum": 0 + }, + "retry_backoff": { + "title": "Retry Backoff", + "description": "How long to wait for the cluster to settle between retries (similar to the `retry.backoff.ms` setting of the JVM producer.", + "default": "100ms", + "$ref": "../../timeunits.json#/$defs/timeunit", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/messaging/kafka/reader.json b/v2.13/messaging/kafka/reader.json new file mode 100644 index 0000000..2d557a8 --- /dev/null +++ b/v2.13/messaging/kafka/reader.json @@ -0,0 +1,44 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/messaging/kafka/reader.json", + "title": "Kafka Reader", + "description": "Enterprise only. Defines how to read messages from a Kafka cluster", + "type": "object", + "required": [ "connection", "topics" ], + "properties": { + "connection": { + "title": "Connection details", + "description": "Settings to stablish the connection to the kafka services", + "$ref": "./connection.json", + "type": "object" + }, + "consumer": { + "title": "Consumer options", + "description": "Details about how to read from the topic", + "$ref": "./consumer.json", + "type": "object" + }, + "topics": { + "title": "Topics", + "description": "List of topics to read from", + "type": "array", + "items": { + "type": "string" + } + }, + "group_id": { + "title": "Consumer Group ID", + "description": "Name of the consumer group to use", + "type": "string" + }, + "key_meta": { + "title": "Key Meta Name", + "description": "Name of the header where the kafka message key value is written", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/messaging/kafka/sasl.json b/v2.13/messaging/kafka/sasl.json new file mode 100644 index 0000000..bba7cc1 --- /dev/null +++ b/v2.13/messaging/kafka/sasl.json @@ -0,0 +1,51 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/messaging/kafka/sasl.json", + "title": "Kafka SASL", + "description": "Enterprise only. SASL base authentication with broker: there are multiple SASL authentication methods but the current implementation is limited to plaintext (SASL/PLAIN) authentication", + "type": "object", + "properties": { + "mechanism": { + "title": "SASL Mechanism", + "description": "Name of the enabled SASL mechanism", + "default": "PLAIN", + "enum": [ "PLAIN", "OAUTHBEARER" ] + }, + "azure_event_hub": { + "title": "Connect to Azure Event Hub", + "description": "Kafka > 1.x should use SASL V1, except on Azure EventHub which uses V0", + "default": false, + "type": "boolean" + }, + "disable_hanshake": { + "title": "Disable Handshake", + "description": "Whether or not to send the Kafka SASL handshake first if enabled. You should only set this to false if you're using a non-Kafka SASL proxy", + "default": true, + "type": "boolean" + }, + "auth_identity": { + "title": "Auth Identity", + "description": "Auth Identity is an (optional) authorization identity (authzid) to use for SASL/PLAIN authentication (if different from User) when an authenticated user is permitted to act as the presented alternative user. See RFC4616 for details", + "type": "string" + }, + "user": { + "title": "User", + "description": "Authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Password for SASL/PLAIN authentication", + "type": "string" + }, + "scram_auth_id": { + "title": "SCRAM Auth ID", + "description": "Authz id used for SASL/SCRAM authentication", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +} diff --git a/v2.13/messaging/kafka/writer.json b/v2.13/messaging/kafka/writer.json new file mode 100644 index 0000000..49d7466 --- /dev/null +++ b/v2.13/messaging/kafka/writer.json @@ -0,0 +1,36 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://www.krakend.io/schema/v2.13/messaging/kafka/writer.json", + "title": "Kafka Writer", + "description": "Enterprise only. Defines how to write messages to a Kafka cluster", + "type": "object", + "required": [ "connection", "topic" ], + "properties": { + "connection": { + "title": "Connection details", + "description": "Settings to establish the connection to the kafka service", + "$ref": "./connection.json", + "type": "object" + }, + "producer": { + "title": "Producer options", + "description": "Details about how to write to a topic", + "$ref": "./producer.json", + "type": "object" + }, + "topic": { + "title": "Topic", + "description": "Topic to write to", + "type": "string" + }, + "key_meta": { + "title": "Key Meta Name", + "description": "Name of the header where the kafka message key value is written", + "type": "string" + } + }, + "patternProperties": { + "^[@$_#]": true + }, + "additionalProperties": false +}