Skip to content

Commit 311c0cb

Browse files
committed
feat(data-factory): update connection handling to support item-specific connections and enhance documentation
1 parent 1a3ba53 commit 311c0cb

File tree

5 files changed

+76
-53
lines changed

5 files changed

+76
-53
lines changed

metadata-ingestion/docs/sources/fabric-data-factory/fabric-data-factory_pre.md

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The `fabric-data-factory` module ingests metadata from Microsoft Fabric Data Fac
66

77
1. **Set up authentication** — Configure Azure credentials (see [Prerequisites](#prerequisites))
88
2. **Enable API access** — Ensure a Fabric admin has enabled service principal API access (if using SP or managed identity)
9-
3. **Grant permissions** — Add your identity as a workspace member with Viewer role or higher
9+
3. **Grant permissions** — Add your identity as a workspace **Contributor** (required for pipeline definitions and lineage)
1010
4. **Configure recipe** — Use `fabric-data-factory_recipe.yml` as a template
1111
5. **Run ingestion** — Execute `datahub ingest -c fabric-data-factory_recipe.yml`
1212
:::
@@ -41,33 +41,29 @@ Fabric Data Factory Concepts
4141

4242
#### Required Permissions
4343

44-
The connector requires read access to Fabric workspaces, items, and connections. The specific permissions depend on your authentication method, but the identity you use must have:
45-
46-
- **Workspace access**: The identity must be added as a workspace member (Viewer or above) for each workspace you want to ingest
44+
The connector requires **Contributor** role on each workspace. Contributor is needed to fetch pipeline definitions without it, the connector will list pipelines but fail to read their activities and lineage.
4745

4846
##### Delegated (on behalf of a user) authentication
4947

50-
If using delegated auth (e.g., Azure CLI), request the following Fabric API scopes via Microsoft Entra ID:
48+
If using delegated auth (e.g., Azure CLI), the signed-in user's existing Fabric permissions apply directly. The connector requires the following delegated scopes:
5149

52-
- `Workspace.Read.All` — to list and read workspaces
53-
- `Item.Read.All` — to list and read items (pipelines, activities)
54-
- `Connection.Read.All` — to list and read connections for lineage resolution
55-
For execution history, additionally request:
50+
- `Workspace.Read.All` or `Workspace.ReadWrite.All` — for listing workspaces and items
51+
- `Item.ReadWrite.All` or `DataPipeline.ReadWrite.All` — for Get Item Definition, List Item Connections, and Query Activity Runs (`Item.Read.All` is **not** sufficient for definitions and connections)
52+
- `Item.Read.All` or `DataPipeline.Read.All` — sufficient for List Item Job Instances (execution history)
5653

57-
- `Item.Execute.All` — to query pipeline and activity runs
54+
The Azure CLI token includes the necessary Fabric API scopes by default.
5855

5956
##### Service Principal and Managed Identity authentication
6057

61-
Service principals and managed identities do not use delegated scopes. Instead, you need to:
58+
Service principals and managed identities do not inherit any permissions by default. You need to:
6259

6360
1. **Enable API access**: A Fabric admin must enable the service principal tenant settings (see **Fabric Admin Settings** below)
64-
2. **Grant workspace access**: Add the SP or MI as a workspace member (Viewer or above) for each workspace you want to ingest
65-
3. **Grant connection access**: The SP or MI must have permission on the Fabric connections used by pipelines, so that the connector can read connection details for lineage resolution
61+
2. **Grant workspace access**: Add the SP or MI as a workspace **Contributor** for each workspace you want to ingest
6662

6763
#### Fabric Admin Settings
6864

6965
:::warning
70-
For **service principal** and **managed identity** authentication, a Fabric administrator must enable API access for service principals in the Fabric admin portal. Without this, API calls will fail with 401/403 errors even if permissions are correctly assigned.
66+
For **service principal** and **managed identity** authentication, a Fabric administrator must enable API access for service principals in the Fabric admin portal. Without this, API calls will fail with 401 errors even if workspace permissions are correctly assigned.
7167
:::
7268

7369
As of mid-2025, Microsoft split the original single tenant setting into two separate settings. Configure them as follows:
@@ -76,10 +72,12 @@ As of mid-2025, Microsoft split the original single tenant setting into two sepa
7672
2. Under **Developer settings**, enable the applicable setting(s):
7773
- **Service principals can call Fabric public APIs** — Controls access to CRUD APIs protected by the Fabric permission model (e.g., reading workspaces and items). This is **enabled by default** for new tenants since August 2025.
7874
- **Service principals can create workspaces, connections, and deployment pipelines** — Controls access to global APIs not protected by Fabric permissions. This is **disabled by default**. Enable only if needed.
79-
3. Choose whether to enable for the entire organization or restrict to a specific security group. It is recommended to restrict access to a dedicated security group containing only the service principals that need API access.
75+
3. Restrict access to a dedicated **security group** containing only the service principals that need API access. This is the recommended approach.
8076

8177
> **Note**: If you are on an older tenant where the legacy single setting **Service principals can use Fabric APIs** is still visible, enable that instead. It will be automatically migrated to the two new settings.
8278
79+
> **Note**: Tenant setting changes can take **up to 15 minutes** to propagate. If you receive 401 errors immediately after enabling, wait and retry.
80+
8381
For detailed instructions, see [Developer admin settings](https://learn.microsoft.com/en-us/fabric/admin/service-admin-portal-developer) and [Identity support for Fabric REST APIs](https://learn.microsoft.com/en-us/rest/api/fabric/articles/identity-support).
8482

8583
#### Authentication
@@ -88,7 +86,11 @@ The connector supports four authentication methods via the shared `credential` c
8886

8987
##### Service Principal (recommended for production)
9088

91-
Register an application in [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app) and note the `client_id`, `client_secret`, and `tenant_id`. Then ensure the Fabric admin has enabled service principal API access (see **Fabric Admin Settings** above) and add the service principal as a member of the target workspaces.
89+
Register an application in [Microsoft Entra ID](https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app) and note the `client_id`, `client_secret`, and `tenant_id`. Then:
90+
91+
1. Ensure the Fabric admin has enabled service principal API access (see **Fabric Admin Settings** above)
92+
2. Create a security group in Entra ID and add the service principal as a member
93+
3. Add the security group as **Contributor** in each target workspace (Contributor role grants access to pipeline definitions and item connections for lineage)
9294

9395
```yaml
9496
credential:
@@ -102,7 +104,7 @@ All three fields are required when using this method.
102104
103105
##### Managed Identity (for Azure-hosted deployments)
104106
105-
Use this when running DataHub ingestion on an Azure VM, AKS, App Service, or other Azure compute that supports [managed identities](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). The managed identity must be added as a workspace member in Fabric. A Fabric admin must also enable the tenant settings described in **Fabric Admin Settings** above — these settings govern API access for both service principals and managed identities, despite the setting name referencing only service principals.
107+
Use this when running DataHub ingestion on an Azure VM, AKS, App Service, or other Azure compute that supports [managed identities](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). The managed identity must be added as a workspace Contributor in Fabric. A Fabric admin must also enable the tenant settings described in **Fabric Admin Settings** above — these settings govern API access for both service principals and managed identities, despite the setting name referencing only service principals.
106108
107109
```yaml
108110
# System-assigned managed identity (no additional config needed)
@@ -118,15 +120,17 @@ credential:
118120
managed_identity_client_id: "<your-managed-identity-client-id>"
119121
```
120122
121-
##### Azure CLI (for local development)
123+
##### Azure CLI (for local development and testing)
122124
123-
Uses the credentials from your local `az login` session. Run `az login` before starting ingestion. The signed-in user must have workspace access in Fabric.
125+
Uses the credentials from your local `az login` session. The signed-in user's existing Fabric permissions apply directly — no additional setup needed beyond workspace access.
124126

125127
```yaml
126128
credential:
127129
authentication_method: cli
128130
```
129131

132+
Run `az login` before starting ingestion. For remote servers without a browser, use `az login`.
133+
130134
##### DefaultAzureCredential (flexible auto-detection)
131135

132136
Uses Azure's [DefaultAzureCredential](https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential) chain, which tries multiple credential sources in order: environment variables, workload identity, managed identity, shared token cache, Azure CLI, Azure PowerShell, Azure Developer CLI, and more.
@@ -149,6 +153,8 @@ credential:
149153
#### Setup
150154

151155
1. Choose an authentication method from above and configure the `credential` block.
152-
2. If using service principal or managed identity, ensure the Fabric admin has enabled the appropriate developer settings for service principal API access (see **Fabric Admin Settings** above).
153-
3. Add the identity as a member of the target workspaces with Viewer role or above.
156+
2. If using service principal or managed identity:
157+
- Ensure the Fabric admin has enabled the appropriate developer settings (see **Fabric Admin Settings**)
158+
- Create a security group, add your identity, and grant **Contributor** on target workspaces
159+
3. If using Azure CLI, run `az login` (or `az login --use-device-code` on remote servers).
154160
4. Configure the ingestion recipe with optional workspace and pipeline filters.

metadata-ingestion/src/datahub/ingestion/source/fabric/common/core_client.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,24 +132,37 @@ def get_item_definition(
132132

133133
return result
134134

135-
def list_connections(self) -> Iterator[FabricConnection]:
136-
"""List all tenant-scoped connections accessible to the caller.
135+
def list_item_connections(
136+
self, workspace_id: str, item_id: str
137+
) -> Iterator[FabricConnection]:
138+
"""List connections for a specific item.
139+
140+
Returns only connections bound to the given item.
137141
138-
Connections are referenced by pipeline activities via
139-
externalReferences.connection (the connection GUID).
142+
Some connections (e.g. Automatic/SSO) may not have an id —
143+
these are skipped since they cannot be referenced by activities.
140144
141-
Reference: https://learn.microsoft.com/en-us/rest/api/fabric/core/connections/list-connections
145+
Reference: https://learn.microsoft.com/en-us/rest/api/fabric/core/items/list-item-connections
146+
147+
Args:
148+
workspace_id: Workspace GUID
149+
item_id: Item GUID
142150
143151
Yields:
144-
FabricConnection objects
152+
FabricConnection objects (only those with an id)
145153
"""
146-
logger.info("Listing Fabric connections")
147-
for raw in self._paginate("connections"):
154+
endpoint = f"workspaces/{workspace_id}/items/{item_id}/connections"
155+
logger.debug(f"Listing connections for item {item_id}")
156+
for raw in self._paginate(endpoint):
157+
# Skip connections without an id (e.g. Automatic/SSO)
158+
if not raw.get("id"):
159+
continue
148160
try:
149161
yield FabricConnection.from_dict(raw)
150162
except KeyError as e:
151163
self.report.report_parse_failure(
152-
f"Skipping malformed connection: missing required field {e}"
164+
f"Skipping malformed item connection for "
165+
f"item {item_id}: missing required field {e}"
153166
)
154167

155168
def list_item_job_instances(

metadata-ingestion/src/datahub/ingestion/source/fabric/data_factory/source.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,9 @@ def __init__(self, config: FabricDataFactorySourceConfig, ctx: PipelineContext):
190190
self.report.client_report = self.client_report
191191

192192
# Connection cache: connection GUID → FabricConnection
193-
# Populated once before processing pipelines, used by lineage
194-
# resolvers to map activity connection refs to DataHub platforms.
193+
# Populated per-item via list_item_connections during pipeline
194+
# fetching, used by lineage resolvers to map activity connection
195+
# refs to DataHub platforms.
195196
self._connections_cache: dict[str, FabricConnection] = {}
196197

197198
# Pipeline run ID → DPI URN, populated during pipeline run emission,
@@ -209,21 +210,6 @@ def __init__(self, config: FabricDataFactorySourceConfig, ctx: PipelineContext):
209210
# when emitting the child activity's DataJobInputOutput to merge the edge in.
210211
self._cross_pipeline_edges: dict[str, list[str]] = {}
211212

212-
def _cache_connections(self) -> None:
213-
"""Fetch all tenant connections and cache them keyed by ID."""
214-
try:
215-
for conn in self.client.list_connections():
216-
self._connections_cache[conn.id] = conn
217-
logger.info(f"Cached {len(self._connections_cache)} Fabric connection(s)")
218-
except Exception as e:
219-
self.report.report_warning(
220-
title="Failed to Cache Connections",
221-
message="Could not retrieve tenant connections. "
222-
"Copy activity lineage may be incomplete.",
223-
context="",
224-
exc=e,
225-
)
226-
227213
@classmethod
228214
def create(
229215
cls, config_dict: dict, ctx: PipelineContext
@@ -251,9 +237,8 @@ def get_workunits_internal(
251237
logger.info("Starting Fabric Data Factory ingestion")
252238

253239
try:
254-
# Lineage extractors and connection cache are only needed when lineage is enabled
240+
# Lineage extractors are only needed when lineage is enabled
255241
if self.config.include_lineage:
256-
self._cache_connections()
257242
self._copy_lineage_extractor = CopyActivityLineageExtractor(
258243
connections_cache=self._connections_cache,
259244
report=self.report,
@@ -324,7 +309,8 @@ def _fetch_pipeline_activities(self, workspace_id: str) -> list[FabricItem]:
324309
"""Fetch pipelines and their activities for a workspace into cache.
325310
326311
Applies pipeline_pattern filter. Populates
327-
``self._pipeline_activities_cache``.
312+
``self._pipeline_activities_cache``. When lineage is enabled,
313+
also fetches per-item connections into ``self._connections_cache``.
328314
"""
329315
pipeline_items: list[FabricItem] = []
330316
for item in self.client.list_items(
@@ -349,8 +335,26 @@ def _fetch_pipeline_activities(self, workspace_id: str) -> list[FabricItem]:
349335
exc=e,
350336
)
351337
self._pipeline_activities_cache[(workspace_id, item.id)] = []
338+
339+
if self.config.include_lineage:
340+
self._cache_item_connections(workspace_id, item)
352341
return pipeline_items
353342

343+
def _cache_item_connections(self, workspace_id: str, item: FabricItem) -> None:
344+
"""Fetch per-item connections and add to the connections cache."""
345+
try:
346+
for conn in self.client.list_item_connections(workspace_id, item.id):
347+
if conn.id not in self._connections_cache:
348+
self._connections_cache[conn.id] = conn
349+
except Exception as e:
350+
self.report.report_warning(
351+
title="Failed to Fetch Item Connections",
352+
message="Could not retrieve connections for item. "
353+
"Lineage may be incomplete for this pipeline.",
354+
context=f"pipeline={item.name}",
355+
exc=e,
356+
)
357+
354358
def _create_workspace_container(
355359
self, workspace: FabricWorkspace
356360
) -> Iterable[Entity]:

metadata-ingestion/tests/integration/fabric_data_factory/test_fabric_data_factory_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def activities_side_effect(workspace_id: str, pipeline_id: str) -> List[Any]:
201201
),
202202
patch.object(
203203
FabricDataFactoryClient,
204-
"list_connections",
204+
"list_item_connections",
205205
return_value=iter(connections),
206206
),
207207
patch.object(
@@ -266,7 +266,7 @@ def test_no_execution_history(pytestconfig: pytest.Config, tmp_path: Path) -> No
266266
),
267267
patch.object(
268268
FabricDataFactoryClient,
269-
"list_connections",
269+
"list_item_connections",
270270
return_value=iter([]),
271271
),
272272
patch.object(

metadata-ingestion/tests/integration/fabric_data_factory/test_factories.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def create_connection(
5656
connection_type: str,
5757
path: Optional[str] = None,
5858
) -> FabricConnection:
59-
"""Create a FabricConnection matching list_connections() output."""
59+
"""Create a FabricConnection matching list_item_connections() output."""
6060
return FabricConnection(
6161
id=connection_id,
6262
display_name=display_name,

0 commit comments

Comments
 (0)