Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ query = stream.writeStream.format("console").start()

## Available Data Sources

| Data Source | Type | Description | Install |
|-------------|------|-------------|---------|
| `fake` | Batch/Stream | Generate synthetic test data using Faker | `pip install pyspark-data-sources[faker]` |
| `github` | Batch | Read GitHub pull requests | Built-in |
| `googlesheets` | Batch | Read public Google Sheets | Built-in |
| `huggingface` | Batch | Load Hugging Face datasets | `[huggingface]` |
| `stock` | Batch | Fetch stock market data (Alpha Vantage) | Built-in |
| `opensky` | Batch/Stream | Live flight tracking data | Built-in |
| `kaggle` | Batch | Load Kaggle datasets | `[kaggle]` |
| `arrow` | Batch | Read Apache Arrow files | `[arrow]` |
| `lance` | Batch Write | Write Lance vector format | `[lance]` |
| Data Source | Type | Description | Install |
|-------------|--------------|--------------------------------------------|-------------------------------------------|
| `fake` | Batch/Stream | Generate synthetic test data using Faker | `pip install pyspark-data-sources[faker]` |
| `github` | Batch | Read GitHub pull requests | Built-in |
| `googlesheets` | Batch | Read public Google Sheets | Built-in |
| `huggingface` | Batch | Load Hugging Face datasets | `[huggingface]` |
| `stock` | Batch | Fetch stock market data (Alpha Vantage) | Built-in |
| `opensky` | Batch/Stream | Live flight tracking data | Built-in |
| `kaggle` | Batch | Load Kaggle datasets | `[kaggle]` |
| `arrow` | Batch | Read Apache Arrow files | `[arrow]` |
| `lance` | Batch Write | Write Lance vector format | `[lance]` |
| `pyspark.datasource.sharepoint` | Stream Write | Write streaming data to Sharepoint objects | `[sharepoint]` |

📚 **[See detailed examples for all data sources →](docs/data-sources-guide.md)**

Expand Down
53 changes: 53 additions & 0 deletions docs/data-sources-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This guide provides detailed examples and usage patterns for all available data
7. [KaggleDataSource - Load Kaggle Datasets](#7-kaggledatasource---load-kaggle-datasets)
8. [ArrowDataSource - Read Apache Arrow Files](#8-arrowdatasource---read-apache-arrow-files)
9. [LanceDataSource - Vector Database Format](#9-lancedatasource---vector-database-format)
10. [SharepointDataSource - Write to Sharepoint](#10-sharepointdatasource---write-to-sharepoint)

## 1. FakeDataSource - Generate Synthetic Data

Expand Down Expand Up @@ -422,6 +423,58 @@ lance_df.printSchema()
- Fast random access
- Version control built-in

## 10. SharepointDataSource - Write to Sharepoint

Write (streaming) data to Sharepoint.

### Installation
```bash
pip install pyspark-data-sources[sharepoint]
```

### Write Data
```python
import json
from pyspark.sql.functions import col, lit
from pyspark_datasources import SharepointDataSource

spark.dataSource.register(SharepointDataSource)

# Prepare your DataFrame
df = (
spark.readStream.format("rate").option("rowsPerSecond", 10).load()
.select(
col("value").cast("string").alias("name"),
lit("Technology").alias("industry"),
(col("value") * 100000).cast("double").alias("annual_revenue")
)
)

# Write DataFrame to Sharepoint
query = (
df.writeStream
.format("pyspark.datasource.sharepoint")
.option("pyspark.datasource.sharepoint.auth.tenant_id", "<>")
.option("pyspark.datasource.sharepoint.auth.client_id", "<>")
.option("pyspark.datasource.sharepoint.auth.client_secret", "<>")
.option("pyspark.datasource.sharepoint.resource", "list")
.option("pyspark.datasource.sharepoint.site_id", "<>")
.option("pyspark.datasource.sharepoint.list.list_id", "<>")
.option("pyspark.datasource.sharepoint.list.fields", json.dumps({"name": "Name", "industry": "Industry", "annual_revenue": "AnnualRevenue"}))
.option("pyspark.datasource.sharepoint.batch_size", "200")
.option("pyspark.datasource.sharepoint.fail_fast", "true")
.option("checkpointLocation", "/Volumes/bu_1/default/test/chk/sharepoint")
.start()
)
```

### Features
- **Write-only datasource**: Designed specifically for writing data to Sharepoint
- **Stream processing**: Uses Microsoft Graph API for efficient concurrent writes based on configurable batch-size parameter
- **Exactly-once semantics**: Integrates with Spark's checkpoint mechanism
- **Error handling**: Control over whether to fail the write operation if a record fails to be written
- **Flexible resource implementations**: Supports multiple resource types (currently only `list`)

## Common Patterns

### Error Handling
Expand Down
6 changes: 6 additions & 0 deletions docs/datasources/sharepoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# SharepointDataSource

> Requires the [`azure-identity`, `msgraph-sdk`] libraries. You can install it manually: `pip install azure-identity msgraph-sdk`
> or use `pip install pyspark-data-sources[sharepoint]`.

::: pyspark_datasources.sharepoint.SharepointDataSource
25 changes: 13 additions & 12 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ spark.readStream.format("fake").load().writeStream.format("console").start()

## Data Sources

| Data Source | Short Name | Description | Dependencies |
| ------------------------------------------------------- | -------------- | --------------------------------------------- | --------------------- |
| [GithubDataSource](./datasources/github.md) | `github` | Read pull requests from a Github repository | None |
| [FakeDataSource](./datasources/fake.md) | `fake` | Generate fake data using the `Faker` library | `faker` |
| [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` |
| [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None |
| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None |
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None |
| [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| Data Source | Short Name | Description | Dependencies |
|---------------------------------------------------------|---------------------------------|----------------------------------------------------|---------------------------------|
| [GithubDataSource](./datasources/github.md) | `github` | Read pull requests from a Github repository | None |
| [FakeDataSource](./datasources/fake.md) | `fake` | Generate fake data using the `Faker` library | `faker` |
| [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` |
| [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None |
| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects | `simple-salesforce` |
| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None |
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None |
| [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects | `simple-salesforce` |
| [SharepointDataSource](./datasources/sharepoint.md) | `sharepoint` | Write streaming data to Sharepoint Lists | `azure-identity`, `msgraph-sdk` |
2 changes: 1 addition & 1 deletion examples/salesforce_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def example_3_checkpoint_demonstration():
col("industry").alias("Industry"),
col("revenue").alias("AnnualRevenue"),
)

query1 = (
account_df1.writeStream.format("pyspark.datasource.salesforce")
.option("username", username)
Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ robinhood = [
salesforce = [
"simple-salesforce>=1.12.0,<2.0.0",
]
sharepoint = [
"azure-identity>=1.25.1",
"msgraph-sdk>=1.52.0",
]
all = [
"faker>=23.1.0,<24.0.0",
"datasets>=2.17.0,<3.0.0",
"databricks-sdk>=0.28.0,<0.29.0",
"kagglehub[pandas-datasets]>=0.3.10,<0.4.0",
"pynacl>=1.5.0,<2.0.0",
"simple-salesforce>=1.12.0,<2.0.0",
"azure-identity>=1.25.1",
"msgraph-sdk>=1.52.0",
]

[tool.uv]
Expand All @@ -56,6 +62,7 @@ dev-dependencies = [
"mkdocs-material>=9.5.40,<10.0.0",
"pyspark==4.0.0",
"ruff>=0.6.0,<0.7.0",
"pre-commit>=4.3.0",
]

[tool.ruff]
Expand Down
1 change: 1 addition & 0 deletions pyspark_datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .simplejson import SimpleJsonDataSource
from .stock import StockDataSource
from .jsonplaceholder import JSONPlaceholderDataSource
from .sharepoint import SharepointResource, SharepointDataSource
36 changes: 17 additions & 19 deletions pyspark_datasources/jsonplaceholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,20 @@ def __init__(self, options=None):
self.options = options or {}

def schema(self) -> str:
""" Returns the schema for the selected endpoint."""
"""Returns the schema for the selected endpoint."""
schemas = {
"posts": "userId INT, id INT, title STRING, body STRING",
"users": ("id INT, name STRING, username STRING, email STRING, phone STRING, "
"website STRING, address_street STRING, address_suite STRING, "
"address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
"address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
"company_bs STRING"),
"users": (
"id INT, name STRING, username STRING, email STRING, phone STRING, "
"website STRING, address_street STRING, address_suite STRING, "
"address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
"address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
"company_bs STRING"
),
"todos": "userId INT, id INT, title STRING, completed BOOLEAN",
"comments": "postId INT, id INT, name STRING, email STRING, body STRING",
"albums": "userId INT, id INT, title STRING",
"photos": "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING"
"photos": "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING",
}

endpoint = self.options.get("endpoint", "posts")
Expand Down Expand Up @@ -152,7 +154,7 @@ def _process_posts(item):
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", ""),
body=item.get("body", "")
body=item.get("body", ""),
)

def _process_users(item):
Expand All @@ -175,15 +177,15 @@ def _process_users(item):
address_geo_lng=geo.get("lng", ""),
company_name=company.get("name", ""),
company_catchPhrase=company.get("catchPhrase", ""),
company_bs=company.get("bs", "")
company_bs=company.get("bs", ""),
)

def _process_todos(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", ""),
completed=item.get("completed", False)
completed=item.get("completed", False),
)

def _process_comments(item):
Expand All @@ -192,23 +194,19 @@ def _process_comments(item):
id=item.get("id"),
name=item.get("name", ""),
email=item.get("email", ""),
body=item.get("body", "")
body=item.get("body", ""),
)

def _process_albums(item):
return Row(
userId=item.get("userId"),
id=item.get("id"),
title=item.get("title", "")
)
return Row(userId=item.get("userId"), id=item.get("id"), title=item.get("title", ""))

def _process_photos(item):
return Row(
albumId=item.get("albumId"),
id=item.get("id"),
title=item.get("title", ""),
url=item.get("url", ""),
thumbnailUrl=item.get("thumbnailUrl", "")
thumbnailUrl=item.get("thumbnailUrl", ""),
)

processors = {
Expand All @@ -217,8 +215,8 @@ def _process_photos(item):
"todos": _process_todos,
"comments": _process_comments,
"albums": _process_albums,
"photos": _process_photos
"photos": _process_photos,
}

processor = processors.get(self.endpoint, _process_posts)
return processor(item)
return processor(item)
Loading