diff --git a/product_demos/streaming-sessionization/01-Delta-session-BRONZE.py b/product_demos/streaming-sessionization/01-Delta-session-BRONZE.py index c1944c00..52a18d62 100644 --- a/product_demos/streaming-sessionization/01-Delta-session-BRONZE.py +++ b/product_demos/streaming-sessionization/01-Delta-session-BRONZE.py @@ -97,15 +97,40 @@ # DBTITLE 1,Read messages from Kafka and save them as events_raw # NOTE: the demo runs with Kafka, and dbdemos doesn't publically expose its demo kafka servers. Use your own IPs to run the demo properly kafka_bootstrap_servers_tls = "b-1.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-2.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-3.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092" -#kafka_bootstrap_servers_tls = "" # Also make sure to have the proper instance profile to allow the access if you're on AWS. +kafka_auth_options = { + "kafka.bootstrap.servers": kafka_bootstrap_servers_tls, + "kafka.security.protocol": "PLAINTEXT" +} + +# Alternative: Azure EventHub with Kafka support example, using SPN authentication: +# 1: Create SPN and use the tenant, client_id and secret. +# az ad sp create-for-rbac -n spn-databricks-to-eventhub +# 2: Assign SPN role "Azure Event Hubs Data Owner" on Azure EventHub Namespace +# 3: (Optional) Store SPN credentials in secretScope using the Databricks-CLI. +you_have_setup_eventhub = False + +if you_have_setup_eventhub: + event_hubs_server = "your-evenhub-namespace.servicebus.windows.net" + tenant_id = "" + client_id = "" + client_secret = "" # best practice: use secretScope, retrieve using dbutils.secrets.get(scope="", key="") + + kafka_auth_options.update({ + "kafka.bootstrap.servers": f"{event_hubs_server}:9093", + "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";', + "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token", + "kafka.security.protocol": "SASL_SSL", + "kafka.sasl.mechanism": "OAUTHBEARER", + "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler" + }) + stream = (spark .readStream #=== Configurations for Kafka streams === .format("kafka") - .option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls) - .option("kafka.security.protocol", "PLAINTEXT") #SSL + .options(**kafka_auth_options) .option("subscribe", "dbdemos-sessions") #kafka topic .option("startingOffsets", "latest") #Consume messages from the end .option("maxOffsetsPerTrigger", "10000") # Control ingestion rate - backpressure diff --git a/product_demos/streaming-sessionization/_00-Delta-session-PRODUCER.py b/product_demos/streaming-sessionization/_00-Delta-session-PRODUCER.py index 5b235d31..4adb878f 100644 --- a/product_demos/streaming-sessionization/_00-Delta-session-PRODUCER.py +++ b/product_demos/streaming-sessionization/_00-Delta-session-PRODUCER.py @@ -32,6 +32,18 @@ 'security.protocol': 'SSL' } +# Alternatively, if you have set up an EventHub with Kafka interface, you can use the following configuration. +you_have_setup_eventhub = False + +if you_have_setup_eventhub: + conf.update({ + 'bootstrap.servers': '.servicebus.windows.net:9093', # Replace with your EventHub Kafka endpoint + 'security.protocol': 'SASL_SSL', + 'sasl.mechanism': 'PLAIN', + 'sasl.username': '$ConnectionString', + 'sasl.password': '', # Use your EventHub connection string (e.g., Endpoint=sb://...SharedAccessKey=...) + }) + producer = Producer(conf) def delivery_report(err, msg):