Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions product_demos/streaming-sessionization/01-Delta-session-BRONZE.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,40 @@
# DBTITLE 1,Read messages from Kafka and save them as events_raw
# NOTE: the demo runs with Kafka, and dbdemos doesn't publically expose its demo kafka servers. Use your own IPs to run the demo properly
kafka_bootstrap_servers_tls = "b-1.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-2.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-3.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092"
#kafka_bootstrap_servers_tls = "<Replace by your own kafka servers>"
# Also make sure to have the proper instance profile to allow the access if you're on AWS.

kafka_auth_options = {
"kafka.bootstrap.servers": kafka_bootstrap_servers_tls,
"kafka.security.protocol": "PLAINTEXT"
}

# Alternative: Azure EventHub with Kafka support example, using SPN authentication:
# 1: Create SPN and use the tenant, client_id and secret.
# az ad sp create-for-rbac -n spn-databricks-to-eventhub
# 2: Assign SPN role "Azure Event Hubs Data Owner" on Azure EventHub Namespace
# 3: (Optional) Store SPN credentials in secretScope using the Databricks-CLI.
you_have_setup_eventhub = False

if you_have_setup_eventhub:
event_hubs_server = "your-evenhub-namespace.servicebus.windows.net"
tenant_id = ""
client_id = ""
client_secret = "" # best practice: use secretScope, retrieve using dbutils.secrets.get(scope="", key="")

kafka_auth_options.update({
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";',
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
})

stream = (spark
.readStream
#=== Configurations for Kafka streams ===
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls)
.option("kafka.security.protocol", "PLAINTEXT") #SSL
.options(**kafka_auth_options)
.option("subscribe", "dbdemos-sessions") #kafka topic
.option("startingOffsets", "latest") #Consume messages from the end
.option("maxOffsetsPerTrigger", "10000") # Control ingestion rate - backpressure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
'security.protocol': 'SSL'
}

# Alternatively, if you have set up an EventHub with Kafka interface, you can use the following configuration.
you_have_setup_eventhub = False

if you_have_setup_eventhub:
conf.update({
'bootstrap.servers': '<your-eventhub-namespace>.servicebus.windows.net:9093', # Replace with your EventHub Kafka endpoint
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '', # Use your EventHub connection string (e.g., Endpoint=sb://...SharedAccessKey=...)
})

producer = Producer(conf)

def delivery_report(err, msg):
Expand Down