Skip to content

Commit 972cb28

Browse files
author
Pranava Vedagnya Gaddam
committed
add kafka configs and tests
1 parent 8bd30d3 commit 972cb28

File tree

3 files changed

+165
-23
lines changed

3 files changed

+165
-23
lines changed

azure/functions/decorators/function_app.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
3131
HttpMethod
3232
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
33-
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
33+
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod, \
34+
KafkaMessageKeyType
3435
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
3536
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
3637
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
@@ -1244,12 +1245,18 @@ def kafka_trigger(self,
12441245
event_hub_connection_string: Optional[str] = None,
12451246
consumer_group: Optional[str] = None,
12461247
avro_schema: Optional[str] = None,
1248+
key_avro_schema: Optional[str] = None,
1249+
key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None,
12471250
username: Optional[str] = None,
12481251
password: Optional[str] = None,
12491252
ssl_key_location: Optional[str] = None,
12501253
ssl_ca_location: Optional[str] = None,
12511254
ssl_certificate_location: Optional[str] = None,
12521255
ssl_key_password: Optional[str] = None,
1256+
ssl_certificate_pem: Optional[str] = None,
1257+
ssl_key_pem: Optional[str] = None,
1258+
ssl_ca_pem: Optional[str] = None,
1259+
ssl_certificate_and_key_pem: Optional[str] = None,
12531260
schema_registry_url: Optional[str] = None,
12541261
schema_registry_username: Optional[str] = None,
12551262
schema_registry_password: Optional[str] = None,
@@ -1287,6 +1294,10 @@ def kafka_trigger(self,
12871294
Azure Event Hubs).
12881295
:param consumer_group: Kafka consumer group used by the trigger.
12891296
:param avro_schema: Used only if a generic Avro record should be generated.
1297+
:param key_avro_schema: Avro schema for the message key. Used only if a
1298+
generic Avro record should be generated for the key.
1299+
:param key_data_type: Data type of the message key. Valid values: Int, Long,
1300+
String, Binary. Default is String. Ignored if key_avro_schema is set.
12901301
:param username: SASL username for use with the PLAIN or SASL-SCRAM mechanisms.
12911302
Equivalent to 'sasl.username' in librdkafka. Default is empty string.
12921303
:param password: SASL password for use with the PLAIN or SASL-SCRAM mechanisms.
@@ -1338,12 +1349,19 @@ def decorator():
13381349
event_hub_connection_string=event_hub_connection_string, # noqa: E501
13391350
consumer_group=consumer_group,
13401351
avro_schema=avro_schema,
1352+
key_avro_schema=key_avro_schema,
1353+
key_data_type=parse_singular_param_to_enum(
1354+
key_data_type, KafkaMessageKeyType),
13411355
username=username,
13421356
password=password,
13431357
ssl_key_location=ssl_key_location,
13441358
ssl_ca_location=ssl_ca_location,
13451359
ssl_certificate_location=ssl_certificate_location,
13461360
ssl_key_password=ssl_key_password,
1361+
ssl_certificate_pem=ssl_certificate_pem,
1362+
ssl_key_pem=ssl_key_pem,
1363+
ssl_ca_pem=ssl_ca_pem,
1364+
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
13471365
schema_registry_url=schema_registry_url,
13481366
schema_registry_username=schema_registry_username,
13491367
schema_registry_password=schema_registry_password,
@@ -2588,12 +2606,18 @@ def kafka_output(self,
25882606
topic: str,
25892607
broker_list: str,
25902608
avro_schema: Optional[str] = None,
2609+
key_avro_schema: Optional[str] = None,
2610+
key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None,
25912611
username: Optional[str] = None,
25922612
password: Optional[str] = None,
25932613
ssl_key_location: Optional[str] = None,
25942614
ssl_ca_location: Optional[str] = None,
25952615
ssl_certificate_location: Optional[str] = None,
25962616
ssl_key_password: Optional[str] = None,
2617+
ssl_certificate_pem: Optional[str] = None,
2618+
ssl_key_pem: Optional[str] = None,
2619+
ssl_ca_pem: Optional[str] = None,
2620+
ssl_certificate_and_key_pem: Optional[str] = None,
25972621
schema_registry_url: Optional[str] = None,
25982622
schema_registry_username: Optional[str] = None,
25992623
schema_registry_password: Optional[str] = None,
@@ -2630,6 +2654,10 @@ def kafka_output(self,
26302654
:param topic: The Kafka topic to which messages are published.
26312655
:param broker_list: The list of Kafka brokers to which the producer connects.
26322656
:param avro_schema: Optional. Avro schema to generate a generic record.
2657+
:param key_avro_schema: Avro schema for the message key. Used only if a
2658+
generic Avro record should be generated for the key.
2659+
:param key_data_type: Data type of the message key. Valid values: Int, Long,
2660+
String, Binary. Default is String. Ignored if key_avro_schema is set.
26332661
:param username: SASL username for use with the PLAIN and SASL-SCRAM
26342662
mechanisms. Equivalent to `'sasl.username'` in librdkafka.
26352663
:param password: SASL password for use with the PLAIN and SASL-SCRAM
@@ -2642,6 +2670,14 @@ def kafka_output(self,
26422670
Equivalent to `'ssl.certificate.location'` in librdkafka.
26432671
:param ssl_key_password: Password for the client's SSL key.
26442672
Equivalent to `'ssl.key.password'` in librdkafka.
2673+
:param ssl_certificate_pem: Client certificate in PEM format.
2674+
Equivalent to 'ssl.certificate.pem' in librdkafka.
2675+
:param ssl_key_pem: Client private key in PEM format.
2676+
Equivalent to 'ssl.key.pem' in librdkafka.
2677+
:param ssl_ca_pem: CA certificate for verifying the broker's certificate in PEM format.
2678+
Equivalent to 'ssl.ca.pem' in librdkafka.
2679+
:param ssl_certificate_and_key_pem: Client certificate and key in PEM format.
2680+
Additional configuration for KeyVault support (certificate with private key).
26452681
:param schema_registry_url: URL of the Avro Schema Registry.
26462682
:param schema_registry_username: Username for accessing the Schema Registry.
26472683
:param schema_registry_password: Password for accessing the Schema Registry.
@@ -2695,12 +2731,19 @@ def decorator():
26952731
topic=topic,
26962732
broker_list=broker_list,
26972733
avro_schema=avro_schema,
2734+
key_avro_schema=key_avro_schema,
2735+
key_data_type=parse_singular_param_to_enum(
2736+
key_data_type, KafkaMessageKeyType),
26982737
username=username,
26992738
password=password,
27002739
ssl_key_location=ssl_key_location,
27012740
ssl_ca_location=ssl_ca_location,
27022741
ssl_certificate_location=ssl_certificate_location,
27032742
ssl_key_password=ssl_key_password,
2743+
ssl_certificate_pem=ssl_certificate_pem,
2744+
ssl_key_pem=ssl_key_pem,
2745+
ssl_ca_pem=ssl_ca_pem,
2746+
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
27042747
schema_registry_url=schema_registry_url,
27052748
schema_registry_username=schema_registry_username,
27062749
schema_registry_password=schema_registry_password,

azure/functions/decorators/kafka.py

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ class OAuthBearerMethod(StringifyEnum):
2929
OIDC = 1
3030

3131

32+
class KafkaMessageKeyType(StringifyEnum):
33+
INT = 0
34+
LONG = 1
35+
STRING = 2
36+
BINARY = 3
37+
38+
3239
class KafkaOutput(OutputBinding):
3340
@staticmethod
3441
def get_binding_name() -> str:
@@ -39,15 +46,21 @@ def __init__(self,
3946
topic: str,
4047
broker_list: str,
4148
avro_schema: Optional[str],
42-
username: Optional[str],
43-
password: Optional[str],
44-
ssl_key_location: Optional[str],
45-
ssl_ca_location: Optional[str],
46-
ssl_certificate_location: Optional[str],
47-
ssl_key_password: Optional[str],
48-
schema_registry_url: Optional[str],
49-
schema_registry_username: Optional[str],
50-
schema_registry_password: Optional[str],
49+
key_avro_schema: Optional[str] = None,
50+
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
51+
username: Optional[str] = None,
52+
password: Optional[str] = None,
53+
ssl_key_location: Optional[str] = None,
54+
ssl_ca_location: Optional[str] = None,
55+
ssl_certificate_location: Optional[str] = None,
56+
ssl_key_password: Optional[str] = None,
57+
ssl_certificate_pem: Optional[str] = None,
58+
ssl_key_pem: Optional[str] = None,
59+
ssl_ca_pem: Optional[str] = None,
60+
ssl_certificate_and_key_pem: Optional[str] = None,
61+
schema_registry_url: Optional[str] = None,
62+
schema_registry_username: Optional[str] = None,
63+
schema_registry_password: Optional[str] = None,
5164
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
5265
o_auth_bearer_client_id: Optional[str] = None,
5366
o_auth_bearer_client_secret: Optional[str] = None,
@@ -68,12 +81,18 @@ def __init__(self,
6881
self.topic = topic
6982
self.broker_list = broker_list
7083
self.avro_schema = avro_schema
84+
self.key_avro_schema = key_avro_schema
85+
self.key_data_type = key_data_type
7186
self.username = username
7287
self.password = password
7388
self.ssl_key_location = ssl_key_location
7489
self.ssl_ca_location = ssl_ca_location
7590
self.ssl_certificate_location = ssl_certificate_location
7691
self.ssl_key_password = ssl_key_password
92+
self.ssl_certificate_pem = ssl_certificate_pem
93+
self.ssl_key_pem = ssl_key_pem
94+
self.ssl_ca_pem = ssl_ca_pem
95+
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
7796
self.schema_registry_url = schema_registry_url
7897
self.schema_registry_username = schema_registry_username
7998
self.schema_registry_password = schema_registry_password
@@ -104,18 +123,24 @@ def __init__(self,
104123
name: str,
105124
topic: str,
106125
broker_list: str,
107-
event_hub_connection_string: Optional[str],
108-
consumer_group: Optional[str],
109-
avro_schema: Optional[str],
110-
username: Optional[str],
111-
password: Optional[str],
112-
ssl_key_location: Optional[str],
113-
ssl_ca_location: Optional[str],
114-
ssl_certificate_location: Optional[str],
115-
ssl_key_password: Optional[str],
116-
schema_registry_url: Optional[str],
117-
schema_registry_username: Optional[str],
118-
schema_registry_password: Optional[str],
126+
event_hub_connection_string: Optional[str] = None,
127+
consumer_group: Optional[str] = None,
128+
avro_schema: Optional[str] = None,
129+
key_avro_schema: Optional[str] = None,
130+
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
131+
username: Optional[str] = None,
132+
password: Optional[str] = None,
133+
ssl_key_location: Optional[str] = None,
134+
ssl_ca_location: Optional[str] = None,
135+
ssl_certificate_location: Optional[str] = None,
136+
ssl_key_password: Optional[str] = None,
137+
ssl_certificate_pem: Optional[str] = None,
138+
ssl_key_pem: Optional[str] = None,
139+
ssl_ca_pem: Optional[str] = None,
140+
ssl_certificate_and_key_pem: Optional[str] = None,
141+
schema_registry_url: Optional[str] = None,
142+
schema_registry_username: Optional[str] = None,
143+
schema_registry_password: Optional[str] = None,
119144
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
120145
o_auth_bearer_client_id: Optional[str] = None,
121146
o_auth_bearer_client_secret: Optional[str] = None,
@@ -133,12 +158,18 @@ def __init__(self,
133158
self.event_hub_connection_string = event_hub_connection_string
134159
self.consumer_group = consumer_group
135160
self.avro_schema = avro_schema
161+
self.key_avro_schema = key_avro_schema
162+
self.key_data_type = key_data_type
136163
self.username = username
137164
self.password = password
138165
self.ssl_key_location = ssl_key_location
139166
self.ssl_ca_location = ssl_ca_location
140167
self.ssl_certificate_location = ssl_certificate_location
141168
self.ssl_key_password = ssl_key_password
169+
self.ssl_certificate_pem = ssl_certificate_pem
170+
self.ssl_key_pem = ssl_key_pem
171+
self.ssl_ca_pem = ssl_ca_pem
172+
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
142173
self.schema_registry_url = schema_registry_url
143174
self.schema_registry_username = schema_registry_username
144175
self.schema_registry_password = schema_registry_password

tests/decorators/test_kafka.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from azure.functions.decorators.core import BindingDirection, Cardinality, \
77
DataType
88
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
9-
BrokerAuthenticationMode, BrokerProtocol
9+
BrokerAuthenticationMode, BrokerProtocol, KafkaMessageKeyType
1010

1111

1212
class TestKafka(unittest.TestCase):
@@ -102,3 +102,71 @@ def test_kafka_output_valid_creation(self):
102102
'topic': 'topic',
103103
'type': KAFKA,
104104
'username': 'username'})
105+
106+
def test_kafka_trigger_with_key_data_type_and_pem(self):
107+
trigger = KafkaTrigger(name="arg_name",
108+
topic="topic",
109+
broker_list="broker_list",
110+
key_avro_schema="key_avro_schema",
111+
key_data_type=KafkaMessageKeyType.LONG,
112+
ssl_certificate_pem="cert_pem",
113+
ssl_key_pem="key_pem",
114+
ssl_ca_pem="ca_pem",
115+
ssl_certificate_and_key_pem="cert_and_key_pem",
116+
data_type=DataType.UNDEFINED)
117+
118+
self.assertEqual(trigger.get_binding_name(), "kafkaTrigger")
119+
dict_repr = trigger.get_dict_repr()
120+
self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema")
121+
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.LONG)
122+
self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem")
123+
self.assertEqual(dict_repr["sslKeyPem"], "key_pem")
124+
self.assertEqual(dict_repr["sslCaPem"], "ca_pem")
125+
self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem")
126+
127+
def test_kafka_output_with_key_data_type_and_pem(self):
128+
output = KafkaOutput(name="arg_name",
129+
topic="topic",
130+
broker_list="broker_list",
131+
key_avro_schema="key_avro_schema",
132+
key_data_type=KafkaMessageKeyType.BINARY,
133+
ssl_certificate_pem="cert_pem",
134+
ssl_key_pem="key_pem",
135+
ssl_ca_pem="ca_pem",
136+
ssl_certificate_and_key_pem="cert_and_key_pem",
137+
data_type=DataType.UNDEFINED)
138+
139+
self.assertEqual(output.get_binding_name(), "kafka")
140+
dict_repr = output.get_dict_repr()
141+
self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema")
142+
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.BINARY)
143+
self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem")
144+
self.assertEqual(dict_repr["sslKeyPem"], "key_pem")
145+
self.assertEqual(dict_repr["sslCaPem"], "ca_pem")
146+
self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem")
147+
148+
def test_kafka_message_key_type_enum(self):
149+
"""Test that KafkaMessageKeyType enum has the correct values"""
150+
self.assertEqual(KafkaMessageKeyType.INT, 0)
151+
self.assertEqual(KafkaMessageKeyType.LONG, 1)
152+
self.assertEqual(KafkaMessageKeyType.STRING, 2)
153+
self.assertEqual(KafkaMessageKeyType.BINARY, 3)
154+
155+
def test_kafka_trigger_key_data_type_default(self):
156+
"""Test that key_data_type defaults to STRING"""
157+
trigger = KafkaTrigger(name="arg_name",
158+
topic="topic",
159+
broker_list="broker_list")
160+
161+
dict_repr = trigger.get_dict_repr()
162+
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING)
163+
164+
def test_kafka_output_key_data_type_default(self):
165+
"""Test that key_data_type defaults to STRING"""
166+
output = KafkaOutput(name="arg_name",
167+
topic="topic",
168+
broker_list="broker_list",
169+
avro_schema="schema")
170+
171+
dict_repr = output.get_dict_repr()
172+
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING)

0 commit comments

Comments
 (0)