diff --git a/pom.xml b/pom.xml index e6cf749..e879bf5 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.microsoft.azure.functions azure-functions-java-library - 3.2.2 + 3.2.3 jar com.microsoft.maven diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java index 829b097..fde2bc9 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaOutput.java @@ -7,6 +7,8 @@ import com.microsoft.azure.functions.BrokerAuthenticationMode; import com.microsoft.azure.functions.BrokerProtocol; +import com.microsoft.azure.functions.OAuthBearerMethod; +import com.microsoft.azure.functions.KafkaMessageKeyType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -208,6 +210,62 @@ */ String avroSchema() default ""; + /** + * Gets or sets the Avro schema of message key. + * Should be used only if a generic record should be generated. + * default "" + * + * @return the avro schema for message key + */ + String keyAvroSchema() default ""; + + /** + * Specifies the data type of the message key. + * This data type will be used to serialize the key before sending it to the Kafka topic. + * If KeyAvroSchema is set, this value is ignored and the key will be serialized using Avro. + * The default type is STRING. + * Default: STRING + * + * @return the data type of the message key + */ + KafkaMessageKeyType keyDataType() default KafkaMessageKeyType.STRING; + + /** + * Client certificate in PEM format. + * ssl.certificate.pem in librdkafka + * default "" + * + * @return the ssl certificate PEM + */ + String sslCertificatePEM() default ""; + + /** + * Client Private Key in PEM format. + * ssl.key.pem in librdkafka + * default "" + * + * @return the ssl key PEM + */ + String sslKeyPEM() default ""; + + /** + * CA certificate for verifying the broker's certificate in PEM format + * ssl.ca.pem in librdkafka + * default "" + * + * @return the ssl CA PEM + */ + String sslCaPEM() default ""; + + /** + * Client certificate and key in PEM format. + * Additional Configuration for extension as KeyVault supports uploading certificate only with private key. + * default "" + * + * @return the ssl certificate and key PEM + */ + String sslCertificateandKeyPEM() default ""; + /** * URL for the Avro Schema Registry * default "" @@ -232,4 +290,66 @@ */ String schemaRegistryPassword() default ""; + /** + * OAuth Bearer method. + * Either 'default' or 'oidc' + * sasl.oauthbearer in librdkafka + * default "" + * + * @return the OAuth Bearer method + */ + OAuthBearerMethod oAuthBearerMethod() default OAuthBearerMethod.Default; + + /** + * OAuth Bearer Client Id + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.id in librdkafka + * default "" + * + * @return the OAuth Bearer client id + */ + String oAuthBearerClientId() default ""; + + /** + * OAuth Bearer Client Secret + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.secret in librdkafka + * default "" + * + * @return the OAuth Bearer client secret + */ + String oAuthBearerClientSecret() default ""; + + /** + * OAuth Bearer scope. + * Client use this to specify the scope of the access request to the broker. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer scope + */ + String oAuthBearerScope() default ""; + + /** + * OAuth Bearer token endpoint url. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.token.endpoint.url in librdkafka + * default "" + * + * @return the OAuth Bearer token endpoint url + */ + String oAuthBearerTokenEndpointUrl() default ""; + + /** + * OAuth Bearer extensions. + * Allow additional information to be provided to the broker. + * Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea" + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer extensions + */ + String oAuthBearerExtensions() default ""; + } diff --git a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java index 5656d3c..8b332b2 100644 --- a/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java +++ b/src/main/java/com/microsoft/azure/functions/annotation/KafkaTrigger.java @@ -7,6 +7,8 @@ import com.microsoft.azure.functions.BrokerAuthenticationMode; import com.microsoft.azure.functions.BrokerProtocol; +import com.microsoft.azure.functions.OAuthBearerMethod; +import com.microsoft.azure.functions.KafkaMessageKeyType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -182,10 +184,130 @@ */ String avroSchema() default ""; - /*** + /** + * Gets or sets the Avro schema of message key. + * Should be used only if a generic record should be generated. + * default "" + * + * @return the avro schema for message key + */ + String keyAvroSchema() default ""; + + /** + * Specifies the data type of the message key that will be deserialized from the Kafka topic. + * If KeyAvroSchema is set, this value is ignored and the key will be generated as a generic record. + * The default type is STRING. + * Default: STRING + * + * @return the data type of the message key + */ + KafkaMessageKeyType keyDataType() default KafkaMessageKeyType.STRING; + + /** + * Client certificate in PEM format. + * ssl.certificate.pem in librdkafka + * default "" + * + * @return the ssl certificate PEM + */ + String sslCertificatePEM() default ""; + + /** + * Client Private Key in PEM format. + * ssl.key.pem in librdkafka + * default "" + * + * @return the ssl key PEM + */ + String sslKeyPEM() default ""; + + /** + * CA certificate for verifying the broker's certificate in PEM format + * ssl.ca.pem in librdkafka + * default "" + * + * @return the ssl CA PEM + */ + String sslCaPEM() default ""; + + /** + * Client certificate and key in PEM format. + * Additional Configuration for extension as KeyVault supports uploading certificate only with private key. + * default "" + * + * @return the ssl certificate and key PEM + */ + String sslCertificateandKeyPEM() default ""; + + /** + * OAuth Bearer method. + * Either 'default' or 'oidc' + * sasl.oauthbearer in librdkafka + * default "" + * + * @return the OAuth Bearer method + */ + OAuthBearerMethod oAuthBearerMethod() default OAuthBearerMethod.Default; + + /** + * OAuth Bearer Client Id + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.id in librdkafka + * default "" + * + * @return the OAuth Bearer client id + */ + String oAuthBearerClientId() default ""; + + /** + * OAuth Bearer Client Secret + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.client.secret in librdkafka + * default "" + * + * @return the OAuth Bearer client secret + */ + String oAuthBearerClientSecret() default ""; + + /** + * OAuth Bearer scope. + * Client use this to specify the scope of the access request to the broker. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer scope + */ + String oAuthBearerScope() default ""; + + /** + * OAuth Bearer token endpoint url. + * Specify only when OAuthBearerMethod is 'oidc' + * sasl.oauthbearer.token.endpoint.url in librdkafka + * default "" + * + * @return the OAuth Bearer token endpoint url + */ + String oAuthBearerTokenEndpointUrl() default ""; + + /** + * OAuth Bearer extensions. + * Allow additional information to be provided to the broker. + * Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea" + * sasl.oauthbearer.extensions in librdkafka + * default "" + * + * @return the OAuth Bearer extensions + */ + String oAuthBearerExtensions() default ""; + + /** + * Maximum number of unprocessed messages a worker is expected to have at an instance. + * When target-based scaling is not disabled, this is used to divide total unprocessed event count to determine the number of worker instances, which will then be rounded up to a worker instance count that creates a balanced partition distribution. + * Default: 1000 * - * @return - */ + * @return the lag threshold + */ int lagThreshold() default 1000; /**