Skip to content

Commit 322dcb3

Browse files
authored
Add msgraphfs provider (apache#55454)
This enables integration with ObjectStoragePath for onedrive, sharepoint, and teams files. It extends schema support for these as well, so in addition to msgd:// it also accepts sharepoint://, onedrive:// .
1 parent 018ff3a commit 322dcb3

File tree

10 files changed

+491
-1
lines changed

10 files changed

+491
-1
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Filesystems
19+
===========
20+
21+
.. toctree::
22+
:maxdepth: 1
23+
:caption: Filesystem Providers
24+
:glob:
25+
26+
*
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Microsoft Graph Filesystem
19+
===========================
20+
21+
The Microsoft Graph filesystem provides access to OneDrive, SharePoint, and Teams document libraries through Airflow's ObjectStoragePath interface.
22+
23+
Supported URL formats:
24+
25+
* ``msgraph://connection_id/drive_id/path/to/file``
26+
* ``sharepoint://connection_id/drive_id/path/to/file``
27+
* ``onedrive://connection_id/drive_id/path/to/file``
28+
* ``msgd://connection_id/drive_id/path/to/file``
29+
30+
Connection Configuration
31+
------------------------
32+
33+
Create a Microsoft Graph connection in Airflow with the following parameters:
34+
35+
* **Connection Type**: msgraph
36+
* **Host**: Tenant ID
37+
* **Login**: Client ID
38+
* **Password**: Client Secret
39+
40+
The connection form provides additional configuration fields:
41+
42+
* **Tenant ID**: Azure AD tenant identifier
43+
* **Drive ID**: Specific drive to access (optional - leave empty for general access)
44+
* **Scopes**: OAuth2 scopes (default: https://graph.microsoft.com/.default)
45+
46+
Additional OAuth2 parameters supported via connection extras:
47+
48+
* **scope**: OAuth2 access scope
49+
* **token_endpoint**: Custom token endpoint URL
50+
* **redirect_uri**: OAuth2 redirect URI for authorization code flow
51+
* **token_endpoint_auth_method**: Client authentication method (default: client_secret_basic)
52+
* **code_challenge_method**: PKCE code challenge method (e.g., 'S256')
53+
* **username**: Username for password grant flow
54+
* **password**: Password for password grant flow
55+
56+
Connection extra field configuration example:
57+
58+
.. code-block:: json
59+
60+
{
61+
"drive_id": "b!abc123...",
62+
"scope": "https://graph.microsoft.com/.default",
63+
"token_endpoint": "https://login.microsoftonline.com/your-tenant/oauth2/v2.0/token",
64+
"redirect_uri": "http://localhost:8080/callback",
65+
"token_endpoint_auth_method": "client_secret_post"
66+
}
67+
68+
Usage Examples
69+
--------------
70+
71+
Reading Files
72+
^^^^^^^^^^^^^
73+
74+
.. code-block:: python
75+
76+
from airflow.sdk.io.path import ObjectStoragePath
77+
78+
# Access a file in OneDrive
79+
path = ObjectStoragePath("onedrive://my_conn/drive123/Documents/data.csv")
80+
81+
# Read file content
82+
with path.open("r") as f:
83+
content = f.read()
84+
85+
Directory Operations
86+
^^^^^^^^^^^^^^^^^^^^
87+
88+
.. code-block:: python
89+
90+
# List directory contents in SharePoint
91+
sharepoint_path = ObjectStoragePath("sharepoint://sp_conn/site_drive/Shared Documents/")
92+
93+
for item in sharepoint_path.iterdir():
94+
print(f"Found: {item.name}")
95+
if item.is_file():
96+
print(f" Size: {item.stat().st_size} bytes")
97+
98+
File Operations
99+
^^^^^^^^^^^^^^^
100+
101+
.. code-block:: python
102+
103+
# Copy file between drives
104+
source = ObjectStoragePath("msgraph://conn1/drive1/source.txt")
105+
target = ObjectStoragePath("msgraph://conn2/drive2/backup/source.txt")
106+
source.copy(target)
107+
108+
# Move file
109+
old_path = ObjectStoragePath("onedrive://conn/drive/temp/file.txt")
110+
new_path = ObjectStoragePath("onedrive://conn/drive/archive/file.txt")
111+
old_path.move(new_path)
112+
113+
# Delete file
114+
file_to_delete = ObjectStoragePath("msgraph://conn/drive/old_data.csv")
115+
file_to_delete.unlink()
116+
117+
Writing Files
118+
^^^^^^^^^^^^^
119+
120+
.. code-block:: python
121+
122+
# Write new file
123+
output_path = ObjectStoragePath("sharepoint://sp_conn/docs/reports/report.txt")
124+
125+
with output_path.open("w") as f:
126+
f.write("Generated report data\n")
127+
f.write(f"Created at: {datetime.now()}\n")
128+
129+
Drive Discovery
130+
^^^^^^^^^^^^^^^
131+
132+
When you need to find the correct drive ID for your URLs, you can use the Microsoft Graph API operators:
133+
134+
.. code-block:: python
135+
136+
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
137+
138+
# List all drives for a user
139+
list_drives = MSGraphAsyncOperator(
140+
task_id="list_drives",
141+
conn_id="msgraph_conn",
142+
url="me/drives",
143+
result_processor=lambda response: [
144+
{"id": drive["id"], "name": drive["name"]} for drive in response["value"]
145+
],
146+
)
147+
148+
URL Scheme Mapping
149+
------------------
150+
151+
The different URL schemes map to specific Microsoft Graph endpoints:
152+
153+
* ``msgraph://`` - General Microsoft Graph access
154+
* ``onedrive://`` - OneDrive personal and business drives
155+
* ``sharepoint://`` - SharePoint document libraries
156+
* ``msgd://`` - Shortened form of msgraph://
157+
158+
All schemes use the same underlying Microsoft Graph API and authentication.
159+
160+
Requirements
161+
------------
162+
163+
The Microsoft Graph filesystem requires:
164+
165+
* ``msgraphfs`` Python package
166+
* Valid Azure AD application registration with appropriate permissions
167+
* Microsoft Graph API access for your tenant
168+
169+
Required Microsoft Graph permissions:
170+
171+
* ``Files.Read`` - To read files
172+
* ``Files.ReadWrite`` - To read and write files
173+
* ``Sites.Read.All`` - To access SharePoint sites (if using ``sharepoint://`` URLs)
174+
175+
Cross-References
176+
----------------
177+
178+
* :doc:`Microsoft Graph API Operators </operators/msgraph>` - For API operations and drive discovery
179+
180+
Reference
181+
---------
182+
183+
For further information, look at:
184+
185+
* `Microsoft Graph Files API <https://learn.microsoft.com/en-us/graph/api/resources/onedrive>`__
186+
* `msgraphfs Python package <https://pypi.org/project/msgraphfs/>`__
187+
* `Use the Microsoft Graph API <https://learn.microsoft.com/en-us/graph/use-the-api/>`__

providers/microsoft/azure/docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
Connection types <connections/index>
3838
Operators <operators/index>
3939
Transfers <transfer/index>
40+
Filesystems <filesystems/index>
4041
Secrets backends <secrets-backends/azure-key-vault>
4142
Logging for Tasks <logging/index>
4243
Sensors <sensors/index>

providers/microsoft/azure/docs/operators/msgraph.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ Below is an example of using this operator to create an item schedule in Fabric.
8080
:start-after: [START howto_operator_ms_fabric_create_item_schedule]
8181
:end-before: [END howto_operator_ms_fabric_create_item_schedule]
8282

83+
Cross-References
84+
----------------
85+
86+
* :doc:`Microsoft Graph Filesystem </filesystems/msgraph>` - For file operations using ObjectStoragePath
8387

8488
Reference
8589
---------

providers/microsoft/azure/provider.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ sensors:
226226

227227
filesystems:
228228
- airflow.providers.microsoft.azure.fs.adls
229+
- airflow.providers.microsoft.azure.fs.msgraphfs
229230

230231
hooks:
231232
- integration-name: Microsoft Azure Container Instances

providers/microsoft/azure/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ dependencies = [
8181
"azure-mgmt-containerregistry>=8.0.0",
8282
"azure-mgmt-containerinstance>=10.1.0",
8383
"msgraph-core>=1.3.3",
84+
"msgraphfs>=0.3.0",
8485
"microsoft-kiota-http>=1.9.4,<2.0.0",
8586
"microsoft-kiota-serialization-json>=1.9.4",
8687
"microsoft-kiota-serialization-text>=1.9.4",
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from typing import TYPE_CHECKING, Any
21+
22+
from airflow.providers.microsoft.azure.utils import get_field
23+
from airflow.providers.microsoft.azure.version_compat import BaseHook
24+
25+
if TYPE_CHECKING:
26+
from fsspec import AbstractFileSystem
27+
28+
schemes = ["msgraph", "sharepoint", "onedrive", "msgd"]
29+
30+
31+
def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem:
32+
from msgraphfs import MSGDriveFS
33+
34+
if conn_id is None:
35+
return MSGDriveFS({})
36+
37+
conn = BaseHook.get_connection(conn_id)
38+
extras = conn.extra_dejson
39+
conn_type = conn.conn_type or "msgraph"
40+
41+
options: dict[str, Any] = {}
42+
43+
# Get authentication parameters with fallback handling
44+
client_id = conn.login or get_field(
45+
conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="client_id"
46+
)
47+
client_secret = conn.password or get_field(
48+
conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="client_secret"
49+
)
50+
tenant_id = conn.host or get_field(
51+
conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="tenant_id"
52+
)
53+
54+
if client_id:
55+
options["client_id"] = client_id
56+
if client_secret:
57+
options["client_secret"] = client_secret
58+
if tenant_id:
59+
options["tenant_id"] = tenant_id
60+
61+
# Process additional fields from extras
62+
fields = [
63+
"drive_id",
64+
"scope",
65+
"token_endpoint",
66+
"redirect_uri",
67+
"token_endpoint_auth_method",
68+
"code_challenge_method",
69+
"update_token",
70+
"username",
71+
"password",
72+
]
73+
for field in fields:
74+
value = get_field(conn_id=conn_id, conn_type=conn_type, extras=extras, field_name=field)
75+
if value is not None:
76+
if value == "":
77+
options.pop(field, "")
78+
else:
79+
options[field] = value
80+
81+
# Update with storage options
82+
options.update(storage_options or {})
83+
84+
# Create oauth2 client parameters if authentication is provided
85+
oauth2_client_params = {}
86+
if options.get("client_id") and options.get("client_secret") and options.get("tenant_id"):
87+
oauth2_client_params = {
88+
"client_id": options["client_id"],
89+
"client_secret": options["client_secret"],
90+
"tenant_id": options["tenant_id"],
91+
}
92+
93+
# Add additional oauth2 parameters supported by authlib
94+
oauth2_params = [
95+
"scope",
96+
"token_endpoint",
97+
"redirect_uri",
98+
"token_endpoint_auth_method",
99+
"code_challenge_method",
100+
"update_token",
101+
"username",
102+
"password",
103+
]
104+
for param in oauth2_params:
105+
if param in options:
106+
oauth2_client_params[param] = options[param]
107+
108+
# Determine which filesystem to return based on drive_id
109+
drive_id = options.get("drive_id")
110+
111+
return MSGDriveFS(drive_id=drive_id, oauth2_client_params=oauth2_client_params)

providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,10 @@ def get_provider_info():
191191
"python-modules": ["airflow.providers.microsoft.azure.sensors.msgraph"],
192192
},
193193
],
194-
"filesystems": ["airflow.providers.microsoft.azure.fs.adls"],
194+
"filesystems": [
195+
"airflow.providers.microsoft.azure.fs.adls",
196+
"airflow.providers.microsoft.azure.fs.msgraphfs",
197+
],
195198
"hooks": [
196199
{
197200
"integration-name": "Microsoft Azure Container Instances",

providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]:
152152

153153
return {
154154
"tenant_id": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()),
155+
"drive_id": StringField(lazy_gettext("Drive ID"), widget=BS3TextFieldWidget()),
155156
"api_version": StringField(
156157
lazy_gettext("API Version"), widget=BS3TextFieldWidget(), default=APIVersion.v1.value
157158
),

0 commit comments

Comments
 (0)