Skip to content

Commit 13edac6

Browse files
Add Meta Conversions API data source (#34)
* Add Meta Conversions API data source * Add Meta CAPI documentation * Update documentation and examples for Meta CAPI * Refactor Meta CAPI tests to use pytest
1 parent de2e050 commit 13edac6

File tree

9 files changed

+554
-10
lines changed

9 files changed

+554
-10
lines changed

README.md

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,30 @@ query = stream.writeStream.format("console").start()
5454

5555
## Available Data Sources
5656

57-
| Data Source | Type | Description | Install |
58-
|-------------|------|-------------|---------|
59-
| `fake` | Batch/Stream | Generate synthetic test data using Faker | `pip install pyspark-data-sources[faker]` |
60-
| `github` | Batch | Read GitHub pull requests | Built-in |
61-
| `googlesheets` | Batch | Read public Google Sheets | Built-in |
57+
### Sources (Read)
58+
59+
| Data Source | Type | Description | Dependency |
60+
|-------------|------|-------------|------------|
61+
| `fake` | Batch/Stream | Generate synthetic test data using Faker | `[faker]` |
62+
| `github` | Batch | Read GitHub pull requests | None |
63+
| `googlesheets` | Batch | Read public Google Sheets | None |
6264
| `huggingface` | Batch | Load Hugging Face datasets | `[huggingface]` |
63-
| `stock` | Batch | Fetch stock market data (Alpha Vantage) | Built-in |
64-
| `opensky` | Batch/Stream | Live flight tracking data | Built-in |
65+
| `stock` | Batch | Fetch stock market data (Alpha Vantage) | None |
66+
| `opensky` | Batch/Stream | Live flight tracking data | None |
6567
| `kaggle` | Batch | Load Kaggle datasets | `[kaggle]` |
6668
| `arrow` | Batch | Read Apache Arrow files | `[arrow]` |
69+
| `robinhood` | Batch | Read cryptocurrency market data from Robinhood API | `[robinhood]` |
70+
| `jsonplaceholder` | Batch | Read JSON data for testing | None |
71+
| `weather` | Batch | Read current weather data (OpenWeatherMap) | None |
72+
73+
### Sinks (Write)
74+
75+
| Data Source | Type | Description | Dependency |
76+
|-------------|------|-------------|------------|
6777
| `lance` | Batch Write | Write Lance vector format | `[lance]` |
78+
| `salesforce` | Stream Write | Write to Salesforce objects | `[salesforce]` |
79+
| `meta_capi` | Batch/Stream Write | Write to Meta Conversions API | None |
80+
6881

6982
📚 **[See detailed examples for all data sources →](docs/data-sources-guide.md)**
7083

docs/data-sources-guide.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,5 +494,4 @@ df = spark.read.format("fake") \
494494
- Use partitioning for large datasets
495495
- Consider sampling: `df.sample(0.1)`
496496
- Increase Spark executor memory
497-
498-
For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub.
497+
For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub.

docs/datasources/meta_capi.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# MetaCapiDataSource
2+
3+
> Requires the `requests` library. You can install it manually: `pip install requests`
4+
> or use `pip install pyspark-data-sources`.
5+
6+
::: pyspark_datasources.meta_capi.MetaCapiDataSource
7+

docs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
4242
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
4343
| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None |
4444
| [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` |
45-
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
45+
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
46+
| [MetaCapiDataSource](./datasources/meta_capi.md) | `meta_capi` | Write event data to Meta Conversions API | None |

examples/meta_capi_example.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
"""
4+
Meta Conversions API (CAPI) Datasource Example
5+
6+
This example demonstrates how to use the MetaCapiDataSource as a datasource
7+
to write event data to Meta for ad optimization.
8+
9+
Requirements:
10+
- PySpark
11+
- requests
12+
- Valid Meta System User Access Token and Pixel ID
13+
14+
Setup:
15+
pip install pyspark requests
16+
17+
Environment Variables:
18+
export META_ACCESS_TOKEN="your-access-token"
19+
export META_PIXEL_ID="your-pixel-id"
20+
"""
21+
22+
import os
23+
import tempfile
24+
import time
25+
from pyspark.sql import SparkSession
26+
from pyspark.sql.functions import col, lit, current_timestamp, unix_timestamp
27+
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
28+
29+
def check_credentials():
30+
"""Check if Meta credentials are available"""
31+
token = os.getenv("META_ACCESS_TOKEN")
32+
pixel_id = os.getenv("META_PIXEL_ID")
33+
34+
if not all([token, pixel_id]):
35+
print("❌ Missing Meta credentials!")
36+
print("Please set the following environment variables:")
37+
print(" export META_ACCESS_TOKEN='your-access-token'")
38+
print(" export META_PIXEL_ID='your-pixel-id'")
39+
return False, None, None
40+
41+
print(f"✅ Using Pixel ID: {pixel_id}")
42+
return True, token, pixel_id
43+
44+
def example_1_rate_source_to_capi():
45+
"""Example 1: Stream simulated purchases to Meta CAPI"""
46+
print("\n" + "=" * 60)
47+
print("EXAMPLE 1: Simulated Purchases → Meta CAPI (Streaming)")
48+
print("=" * 60)
49+
50+
has_creds, token, pixel_id = check_credentials()
51+
if not has_creds:
52+
return
53+
54+
spark = SparkSession.builder.appName("MetaCapiExample1").getOrCreate()
55+
56+
try:
57+
from pyspark_datasources.meta_capi import MetaCapiDataSource
58+
spark.dataSource.register(MetaCapiDataSource)
59+
print("✅ Meta CAPI datasource registered")
60+
61+
# Create streaming data (simulating 1 purchase per second)
62+
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
63+
64+
# Transform to CAPI format (Flat Mode)
65+
# We simulate user data. In production, this comes from your tables.
66+
events_df = streaming_df.select(
67+
lit("Purchase").alias("event_name"),
68+
col("timestamp").alias("event_time"),
69+
lit("test@example.com").alias("email"), # Will be auto-hashed
70+
lit("website").alias("action_source"),
71+
(col("value") * 10.0 + 5.0).alias("value"),
72+
lit("USD").alias("currency"),
73+
lit("TEST12345").alias("test_event_code") # For testing in Events Manager
74+
)
75+
76+
print("📊 Starting streaming write to Meta CAPI...")
77+
print(" Check your Events Manager 'Test Events' tab!")
78+
79+
# Write to Meta CAPI
80+
query = (
81+
events_df.writeStream.format("meta_capi")
82+
.option("access_token", token)
83+
.option("pixel_id", pixel_id)
84+
.option("test_event_code", "TEST12345") # Optional: direct test code option
85+
.option("batch_size", "10")
86+
.option("checkpointLocation", "/tmp/meta_capi_example1_checkpoint")
87+
.trigger(processingTime="10 seconds")
88+
.start()
89+
)
90+
91+
# Run for 30 seconds then stop
92+
time.sleep(30)
93+
query.stop()
94+
print("✅ Streaming stopped")
95+
96+
except Exception as e:
97+
print(f"❌ Error: {e}")
98+
finally:
99+
spark.stop()
100+
101+
def example_2_batch_dataframe_to_capi():
102+
"""Example 2: Batch write a static DataFrame to Meta CAPI"""
103+
print("\n" + "=" * 60)
104+
print("EXAMPLE 2: Static DataFrame → Meta CAPI (Batch)")
105+
print("=" * 60)
106+
107+
has_creds, token, pixel_id = check_credentials()
108+
if not has_creds:
109+
return
110+
111+
spark = SparkSession.builder.appName("MetaCapiExample2").getOrCreate()
112+
113+
try:
114+
from pyspark_datasources.meta_capi import MetaCapiDataSource
115+
spark.dataSource.register(MetaCapiDataSource)
116+
print("✅ Meta CAPI datasource registered")
117+
118+
# Create sample data
119+
data = [
120+
("Purchase", 1700000001, "user1@example.com", 120.50, "USD"),
121+
("Purchase", 1700000002, "user2@example.com", 85.00, "USD"),
122+
("AddToCart", 1700000003, "user3@example.com", 25.99, "USD"),
123+
]
124+
125+
columns = ["event_name", "event_time", "email", "value", "currency"]
126+
df = spark.createDataFrame(data, columns)
127+
128+
# Add optional fields
129+
df = df.withColumn("action_source", lit("website")) \
130+
.withColumn("test_event_code", lit("TEST12345"))
131+
132+
print(f"📊 Writing {df.count()} records to Meta CAPI in batch mode...")
133+
print(" Check your Events Manager 'Test Events' tab!")
134+
135+
# Write to Meta CAPI (Batch)
136+
df.write.format("meta_capi") \
137+
.option("access_token", token) \
138+
.option("pixel_id", pixel_id) \
139+
.option("test_event_code", "TEST12345") \
140+
.option("batch_size", "50") \
141+
.save()
142+
143+
print("✅ Batch write completed")
144+
145+
except Exception as e:
146+
print(f"❌ Error: {e}")
147+
finally:
148+
spark.stop()
149+
150+
def main():
151+
print("🚀 Meta CAPI Datasource Example")
152+
example_1_rate_source_to_capi()
153+
example_2_batch_dataframe_to_capi()
154+
155+
if __name__ == "__main__":
156+
main()

mkdocs.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ nav:
3737
- datasources/lance.md
3838
- datasources/opensky.md
3939
- datasources/weather.md
40+
- datasources/meta_capi.md
41+
4042

4143
markdown_extensions:
4244
- pymdownx.highlight:

pyspark_datasources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@
1010
from .simplejson import SimpleJsonDataSource
1111
from .stock import StockDataSource
1212
from .jsonplaceholder import JSONPlaceholderDataSource
13+
from .meta_capi import MetaCapiDataSource

0 commit comments

Comments
 (0)