This guide provides general guidance for integrating LaunchDarkly experiment data with Databricks using Auto Loader.
Note: This guide applies to data from both Python and PHP implementations, as they produce the same S3 data format. The examples use PySpark (Python) because that's what Databricks uses, but the data structure is identical regardless of which SDK implementation you use.
This integration guide is provided as a starting point and has not been tested in a live Databricks environment. Please test thoroughly and adapt the queries to your specific setup before using in production.
The LaunchDarkly experiment data is stored in S3 with the following structure:
s3://your-bucket/experiments/year=2024/month=01/day=15//hour=hh/experiment-data.json
Each JSON file contains one experiment event per line with this schema:
{
"timestamp": "2024-01-15T14:30:00.123456+00:00",
"flag_key": "example-experiment-flag",
"evaluation_context": {
"key": "user-123",
"kind": "user",
"tier": "premium",
"country": "US"
},
"flag_value": "treatment",
"variation_index": 1,
"reason_kind": "FALLTHROUGH",
"metadata": {
"source": "launchdarkly-python-hook" or "launchdarkly-php-wrapper",
"version": "1.0"
}
}# Configure Auto Loader to read from S3
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/tmp/launchdarkly-schema") \
.option("cloudFiles.schemaEvolution", "true") \
.load("s3://your-launchdarkly-experiments-bucket/experiments/")from pyspark.sql.functions import col, from_json, current_timestamp
# Extract and transform experiment data
experiment_events = df.select(
col("timestamp"),
col("flag_key"),
col("flag_value"),
col("evaluation_context.key").alias("user_key"),
col("evaluation_context.kind").alias("context_kind"),
col("evaluation_context.tier").alias("user_tier"),
col("evaluation_context.country").alias("user_country"),
col("variation_index"),
current_timestamp().alias("processed_at")
)# Write streaming data to Delta table
experiment_events.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/launchdarkly-checkpoint") \
.table("launchdarkly_experiments")SELECT
flag_key,
flag_value,
COUNT(*) as participant_count,
COUNT(DISTINCT user_key) as unique_users
FROM launchdarkly_experiments
GROUP BY flag_key, flag_value
ORDER BY flag_key, flag_value;SELECT
flag_key,
user_tier,
COUNT(*) as count
FROM launchdarkly_experiments
WHERE user_tier IS NOT NULL
GROUP BY flag_key, user_tier
ORDER BY flag_key, count DESC;SELECT
flag_key,
user_country,
COUNT(*) as participant_count
FROM launchdarkly_experiments
WHERE user_country IS NOT NULL
GROUP BY flag_key, user_country
ORDER BY flag_key, participant_count DESC;SELECT
DATE(timestamp) as experiment_date,
flag_key,
flag_value,
COUNT(*) as daily_participants
FROM launchdarkly_experiments
GROUP BY DATE(timestamp), flag_key, flag_value
ORDER BY experiment_date DESC, flag_key, flag_value;For historical data analysis:
# Read all historical data
historical_df = spark.read \
.option("multiline", "true") \
.json("s3://your-launchdarkly-experiments-bucket/experiments/")
# Perform batch analysis
experiment_summary = historical_df.groupBy("flag_key", "flag_value") \
.count() \
.orderBy("flag_key", "flag_value")-- Calculate conversion rates (requires additional conversion data)
WITH experiment_data AS (
SELECT
flag_key,
flag_value,
user_key,
timestamp
FROM launchdarkly_experiments
),
conversions AS (
-- Join with your conversion events table
SELECT user_key, conversion_timestamp
FROM your_conversions_table
)
SELECT
e.flag_key,
e.flag_value,
COUNT(DISTINCT e.user_key) as participants,
COUNT(DISTINCT c.user_key) as conversions,
ROUND(COUNT(DISTINCT c.user_key) * 100.0 / COUNT(DISTINCT e.user_key), 2) as conversion_rate
FROM experiment_data e
LEFT JOIN conversions c ON e.user_key = c.user_key
AND c.conversion_timestamp >= e.timestamp
GROUP BY e.flag_key, e.flag_value;-- Analyze user behavior by experiment group
SELECT
flag_key,
flag_value,
user_tier,
COUNT(DISTINCT user_key) as users,
AVG(CAST(variation_index AS FLOAT)) as avg_variation
FROM launchdarkly_experiments
WHERE user_tier IS NOT NULL
GROUP BY flag_key, flag_value, user_tier
ORDER BY flag_key, flag_value, user_tier;The S3 data is already partitioned by date and hour (year=YYYY/month=MM/day=DD/hour=HH). For better query performance:
- Use partition pruning in your queries
- Consider additional partitioning by
flag_keyif you have many flags - Use Delta table partitioning for frequently queried columns
-- Query specific date range and flag
SELECT *
FROM launchdarkly_experiments
WHERE flag_key = 'your-specific-flag'
AND DATE(timestamp) BETWEEN '2024-01-01' AND '2024-01-31';- Schema Evolution: Enable
cloudFiles.schemaEvolutionfor new context attributes - Data Format: Ensure JSON files are properly formatted (one JSON per line)
- Permissions: Verify S3 bucket access permissions for Databricks
- Checkpointing: Use unique checkpoint locations for different streams
- Monitor Auto Loader metrics in Databricks
- Set up alerts for failed data loads
- Check S3 access logs for data arrival
For issues with this integration:
- Check Databricks documentation
- Verify S3 permissions and data format
- Test with small data samples first
- Contact your Databricks administrator for platform-specific issues