From b858b46942ff355436b52623779f3b7dc4ddcc1f Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Fri, 12 Sep 2025 11:34:46 +0200 Subject: [PATCH 1/4] feat: add onelake support OneLake uses (yet) another protocol. This adds support for that protocol through onelake:// but also abfss://...@onelake.dfs.fabric... URLs. Backwards compatiblity is there so that traditional are routed to AzureBlobFileSystem. --- adlfs/__init__.py | 3 +- adlfs/onelake.py | 455 +++++++++++++++++++++++++ adlfs/spec.py | 6 + adlfs/tests/test_onelake.py | 592 +++++++++++++++++++++++++++++++++ adlfs/tests/test_uri_format.py | 54 +++ pyproject.toml | 12 + requirements/latest.txt | 3 +- 7 files changed, 1123 insertions(+), 2 deletions(-) create mode 100644 adlfs/onelake.py create mode 100644 adlfs/tests/test_onelake.py diff --git a/adlfs/__init__.py b/adlfs/__init__.py index 54cf1a9b..2fc8a8b8 100644 --- a/adlfs/__init__.py +++ b/adlfs/__init__.py @@ -1,5 +1,6 @@ from .gen1 import AzureDatalakeFileSystem +from .onelake import OneLakeFile, OneLakeFileSystem from .spec import AzureBlobFile, AzureBlobFileSystem from .utils import __version__, version_tuple # noqa: F401 -__all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"] +__all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem", "OneLakeFileSystem", "OneLakeFile"] diff --git a/adlfs/onelake.py b/adlfs/onelake.py new file mode 100644 index 00000000..1a5c5966 --- /dev/null +++ b/adlfs/onelake.py @@ -0,0 +1,455 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function + +import logging +import os + +from azure.core.exceptions import ResourceNotFoundError +from azure.storage.filedatalake.aio import DataLakeServiceClient as AIODataLakeServiceClient +from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper +from fsspec.spec import AbstractBufferedFile +from fsspec.utils import infer_storage_options + +logger = logging.getLogger(__name__) + + +class OneLakeFileSystem(AsyncFileSystem): + """ + Access Microsoft OneLake using Azure Data Lake Storage Gen2 API. + + OneLake is the built-in, multi-cloud data lake for Microsoft Fabric that's automatically provisioned + with every Microsoft Fabric tenant. It's built on top of Azure Data Lake Storage Gen2 and provides + a unified data layer for analytics. + + Parameters + ---------- + account_name : str + OneLake account name (typically in the format: onelake.dfs.fabric.microsoft.com) + workspace_name : str, optional + The name of the Fabric workspace + lakehouse_name : str, optional + The name of the lakehouse within the workspace + tenant_id : str + Azure tenant ID for authentication + client_id : str + Azure AD application client ID + client_secret : str + Azure AD application client secret + credential : azure.core.credentials.TokenCredential, optional + Azure credential object for authentication + anon : bool, optional + Use anonymous access (default: False) + + Examples + -------- + >>> from adlfs import OneLakeFileSystem + >>> fs = OneLakeFileSystem( + ... account_name="onelake", + ... tenant_id="your-tenant-id", + ... client_id="your-client-id", + ... client_secret="your-client-secret" + ... ) + >>> fs.ls("") + """ + + protocol = ("onelake", "abfss") + + def __init__( + self, + account_name: str = None, + workspace_name: str = None, + lakehouse_name: str = None, + tenant_id: str = None, + client_id: str = None, + client_secret: str = None, + credential=None, + anon: bool = False, + loop=None, + asynchronous: bool = False, + **kwargs + ): + # Import here to avoid circular imports + from fsspec.asyn import get_loop + + super().__init__( + asynchronous=asynchronous, + loop=loop or get_loop(), + **kwargs + ) + + self.account_name = account_name or "onelake" + self.workspace_name = workspace_name + self.lakehouse_name = lakehouse_name + self.tenant_id = tenant_id or os.getenv("AZURE_TENANT_ID") + self.client_id = client_id or os.getenv("AZURE_CLIENT_ID") + self.client_secret = client_secret or os.getenv("AZURE_CLIENT_SECRET") + self.credential = credential + self.anon = anon + + # OneLake uses a specific endpoint format + self.account_url = f"https://{self.account_name}.dfs.fabric.microsoft.com" + + self._setup_credentials() + self.do_connect() + + def _setup_credentials(self): + """Setup authentication credentials for OneLake access.""" + if not self.anon and not self.credential and self.client_id and self.client_secret and self.tenant_id: + from azure.identity import ClientSecretCredential + from azure.identity.aio import ClientSecretCredential as AIOClientSecretCredential + + self.credential = AIOClientSecretCredential( + tenant_id=self.tenant_id, + client_id=self.client_id, + client_secret=self.client_secret + ) + self.sync_credential = ClientSecretCredential( + tenant_id=self.tenant_id, + client_id=self.client_id, + client_secret=self.client_secret + ) + + def do_connect(self): + """Establish connection to OneLake service.""" + import weakref + + if self.credential: + self.service_client = AIODataLakeServiceClient( + account_url=self.account_url, + credential=self.credential + ) + elif self.anon: + self.service_client = AIODataLakeServiceClient( + account_url=self.account_url + ) + else: + raise ValueError("OneLake requires authentication. Provide credentials or set anon=True") + + # Setup cleanup for service client + weakref.finalize(self, self._cleanup_service_client, self.service_client) + + @staticmethod + def _cleanup_service_client(service_client): + """Cleanup service client resources""" + try: + if hasattr(service_client, 'close'): + # For sync cleanup, we need to use asyncio + import asyncio + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # Can't run cleanup in same loop, schedule it + loop.create_task(service_client.close()) + else: + loop.run_until_complete(service_client.close()) + except Exception: + # If we can't clean up properly, at least try to close the session + pass + except Exception: + pass + + @classmethod + def _strip_protocol(cls, path: str): + """Remove the protocol from the path.""" + # Handle both onelake:// and abfss:// protocols + if path.startswith("onelake://"): + path = path[10:] + elif path.startswith("abfss://"): + # For abfss URLs, we need to parse the URL and extract the path + from urllib.parse import urlparse + parsed = urlparse(path) + if "onelake.dfs.fabric.microsoft.com" in parsed.netloc: + # Extract workspace from username part and combine with path + if "@" in parsed.netloc: + workspace = parsed.netloc.split("@")[0] + path = f"{workspace}{parsed.path}" + else: + path = parsed.path + else: + # Not a OneLake URL, return as-is without protocol + path = path[8:] # Remove "abfss://" + return path.lstrip("/") + + @staticmethod + def _get_kwargs_from_urls(urlpath): + """Extract parameters from OneLake URLs.""" + ops = infer_storage_options(urlpath) + out = {} + + # Parse OneLake-specific URL structure + host = ops.get("host", "") + if "onelake.dfs.fabric.microsoft.com" in host: + out["account_name"] = "onelake" + + # Check if we have username (from abfss://username@host format) + username = ops.get("username") + if username: + # For abfss URLs, workspace is the username part + out["workspace_name"] = username + + # Lakehouse is the first part of the path + path_parts = ops.get("path", "").strip("/").split("/") + if len(path_parts) >= 1 and path_parts[0]: + out["lakehouse_name"] = path_parts[0] + else: + # For regular onelake:// URLs, extract from path + path_parts = ops.get("path", "").strip("/").split("/") + if len(path_parts) >= 1 and path_parts[0]: + out["workspace_name"] = path_parts[0] + if len(path_parts) >= 2 and path_parts[1]: + out["lakehouse_name"] = path_parts[1] + + return out + + def split_path(self, path: str): + """Split OneLake path into workspace, lakehouse, and file path components.""" + path = self._strip_protocol(path).strip("/") + + if not path: + return "", "", "" + + parts = path.split("/", 2) + workspace = parts[0] if len(parts) > 0 else "" + lakehouse = parts[1] if len(parts) > 1 else "" + file_path = parts[2] if len(parts) > 2 else "" + + return workspace, lakehouse, file_path + + async def _ls(self, path: str = "", detail: bool = True, **kwargs): + """List files and directories in OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace: + # List workspaces (this would require Fabric API, simplified for now) + if self.workspace_name: + return [{"name": self.workspace_name, "type": "directory", "size": None}] + else: + raise NotImplementedError("Listing all workspaces requires Fabric API access") + + if not lakehouse: + # List lakehouses in workspace (simplified) + if self.lakehouse_name: + full_path = f"{workspace}/{self.lakehouse_name}" + return [{"name": full_path, "type": "directory", "size": None}] + else: + raise NotImplementedError("Listing lakehouses requires Fabric API access") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_path = f"{lakehouse}/{file_path}" if file_path else lakehouse + + try: + async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + paths = file_system_client.get_paths(path=lakehouse_path, recursive=False) + + results = [] + async for path_item in paths: + # Construct the full path correctly based on the Azure response + full_name = f"{workspace}/{path_item.name}" + results.append({ + "name": full_name, + "type": "directory" if path_item.is_directory else "file", + "size": path_item.content_length if hasattr(path_item, 'content_length') else None, + "last_modified": path_item.last_modified if hasattr(path_item, 'last_modified') else None + }) + + return results if detail else [r["name"] for r in results] + + except ResourceNotFoundError: + raise FileNotFoundError(f"Path not found: {path}") + + async def _info(self, path: str, **kwargs): + """Get information about a file or directory.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse: + return {"name": path, "type": "directory", "size": None} + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" if file_path else lakehouse + + try: + async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + if file_path: + async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + properties = await file_client.get_file_properties() + return { + "name": path, + "type": "file", + "size": properties.size, + "last_modified": properties.last_modified + } + else: + # Directory + return {"name": path, "type": "directory", "size": None} + + except ResourceNotFoundError: + raise FileNotFoundError(f"Path not found: {path}") + + async def _cat_file(self, path: str, start: int = None, end: int = None, **kwargs): + """Read file content from OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid path format for OneLake file") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + download_stream = await file_client.download_file(offset=start, length=end-start if end else None) + return await download_stream.readall() + + except ResourceNotFoundError: + raise FileNotFoundError(f"File not found: {path}") + + async def _pipe_file(self, path: str, data: bytes, **kwargs): + """Write data to a file in OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid path format for OneLake file") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + await file_client.create_file() + await file_client.append_data(data, offset=0, length=len(data)) + await file_client.flush_data(len(data)) + + except Exception as e: + raise IOError(f"Failed to write file {path}: {e}") + + async def _rm_file(self, path: str, **kwargs): + """Delete a file from OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid path format for OneLake file") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_file_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + await file_client.delete_file() + + except ResourceNotFoundError: + pass # File already deleted + + async def _mkdir(self, path: str, create_parents: bool = True, **kwargs): + """Create a directory in OneLake.""" + path = self._strip_protocol(path).strip("/") + workspace, lakehouse, file_path = self.split_path(path) + + if not workspace or not lakehouse: + # Can't create workspaces or lakehouses through this API + raise NotImplementedError("Creating workspaces/lakehouses requires Fabric API") + + # For OneLake, the file system is the workspace, and lakehouse is part of the path + file_system_name = workspace + lakehouse_dir_path = f"{lakehouse}/{file_path}" + + try: + async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + async with file_system_client.get_directory_client(lakehouse_dir_path) as directory_client: + await directory_client.create_directory() + + except Exception as e: + raise IOError(f"Failed to create directory {path}: {e}") + + # Sync wrappers + ls = sync_wrapper(_ls) + info = sync_wrapper(_info) + cat_file = sync_wrapper(_cat_file) + pipe_file = sync_wrapper(_pipe_file) + rm_file = sync_wrapper(_rm_file) + mkdir = sync_wrapper(_mkdir) + + def _open( + self, + path: str, + mode: str = "rb", + block_size: int = None, + autocommit: bool = True, + cache_options: dict = None, + **kwargs + ): + """Open a file for reading or writing.""" + return OneLakeFile( + fs=self, + path=path, + mode=mode, + block_size=block_size, + autocommit=autocommit, + cache_options=cache_options or {}, + **kwargs + ) + + +class OneLakeFile(AbstractBufferedFile): + """File-like operations on OneLake files.""" + + def __init__( + self, + fs: OneLakeFileSystem, + path: str, + mode: str = "rb", + block_size: int = None, + autocommit: bool = True, + cache_options: dict = None, + **kwargs + ): + self.fs = fs + self.path = path + self.mode = mode + self.autocommit = autocommit + + workspace, lakehouse, file_path = fs.split_path(path) + self.workspace = workspace + self.lakehouse = lakehouse + self.file_path = file_path + + if not workspace or not lakehouse or not file_path: + raise ValueError("Invalid OneLake path format") + + self.container_path = f"{workspace}/{lakehouse}" + + super().__init__( + fs=fs, + path=path, + mode=mode, + block_size=block_size, + cache_options=cache_options or {}, + **kwargs + ) + + def _fetch_range(self, start: int, end: int): + """Fetch a range of bytes from the file.""" + return sync(self.fs.loop, self.fs._cat_file, self.path, start=start, end=end) + + def _upload_chunk(self, final: bool = False, **kwargs): + """Upload a chunk of data.""" + if self.mode in {"wb", "ab"}: + data = self.buffer.getvalue() + if data or final: + sync(self.fs.loop, self.fs._pipe_file, self.path, data) + self.offset = self.offset + len(data) if self.offset else len(data) + return True + return False \ No newline at end of file diff --git a/adlfs/spec.py b/adlfs/spec.py index 5ff509e1..a7efb138 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -436,6 +436,12 @@ def _get_kwargs_from_urls(urlpath): out = {} host = ops.get("host", None) if host: + # Check if this is a OneLake URL (should be routed to OneLakeFileSystem) + if "onelake.dfs.fabric.microsoft.com" in host: + # This is a OneLake URL, don't process it here + # The fsspec registry should route to OneLakeFileSystem + return out + match = re.match( r"(?P.+)\.(dfs|blob)\.core\.windows\.net", host ) diff --git a/adlfs/tests/test_onelake.py b/adlfs/tests/test_onelake.py new file mode 100644 index 00000000..a0cb9dca --- /dev/null +++ b/adlfs/tests/test_onelake.py @@ -0,0 +1,592 @@ +import os +import pytest +from unittest import mock +from unittest.mock import AsyncMock, MagicMock, patch + +from adlfs import OneLakeFileSystem, OneLakeFile +from azure.core.exceptions import ResourceNotFoundError + + +def create_async_context_manager_mock(): + """Helper to create a proper async context manager mock.""" + mock_context = AsyncMock() + mock_context.__aenter__ = AsyncMock(return_value=mock_context) + mock_context.__aexit__ = AsyncMock(return_value=False) + return mock_context + + +class TestOneLakeFileSystem: + """Test cases for OneLakeFileSystem""" + + def test_init_with_credentials(self): + """Test initialization with client credentials""" + fs = OneLakeFileSystem( + account_name="onelake", + workspace_name="test_workspace", + lakehouse_name="test_lakehouse", + tenant_id="test-tenant", + client_id="test-client", + client_secret="test-secret" + ) + + assert fs.account_name == "onelake" + assert fs.workspace_name == "test_workspace" + assert fs.lakehouse_name == "test_lakehouse" + assert fs.tenant_id == "test-tenant" + assert fs.client_id == "test-client" + assert fs.client_secret == "test-secret" + assert not fs.anon + assert fs.account_url == "https://onelake.dfs.fabric.microsoft.com" + + def test_init_anonymous(self): + """Test initialization with anonymous access""" + fs = OneLakeFileSystem(anon=True) + + assert fs.account_name == "onelake" + assert fs.anon is True + + def test_init_with_env_vars(self): + """Test initialization using environment variables""" + with mock.patch.dict(os.environ, { + 'AZURE_TENANT_ID': 'env-tenant', + 'AZURE_CLIENT_ID': 'env-client', + 'AZURE_CLIENT_SECRET': 'env-secret' + }): + fs = OneLakeFileSystem() + + assert fs.tenant_id == "env-tenant" + assert fs.client_id == "env-client" + assert fs.client_secret == "env-secret" + + def test_strip_protocol(self): + """Test URL protocol stripping""" + assert OneLakeFileSystem._strip_protocol("onelake://workspace/lakehouse/file.txt") == "workspace/lakehouse/file.txt" + assert OneLakeFileSystem._strip_protocol("workspace/lakehouse/file.txt") == "workspace/lakehouse/file.txt" + assert OneLakeFileSystem._strip_protocol("/workspace/lakehouse/file.txt") == "workspace/lakehouse/file.txt" + + # Test abfss URL stripping for OneLake + assert OneLakeFileSystem._strip_protocol("abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/file.txt") == "workspace/lakehouse/Files/file.txt" + + def test_get_kwargs_from_urls(self): + """Test URL parsing for kwargs extraction""" + kwargs = OneLakeFileSystem._get_kwargs_from_urls("onelake://onelake.dfs.fabric.microsoft.com/workspace/lakehouse/file.txt") + + assert kwargs.get("account_name") == "onelake" + assert kwargs.get("workspace_name") == "workspace" + assert kwargs.get("lakehouse_name") == "lakehouse" + + # Test abfss URL parsing for OneLake + abfss_kwargs = OneLakeFileSystem._get_kwargs_from_urls("abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test") + + assert abfss_kwargs.get("account_name") == "onelake" + assert abfss_kwargs.get("workspace_name") == "q_dev_workspace" + assert abfss_kwargs.get("lakehouse_name") == "qdata_dev_lh.Lakehouse" + + def test_split_path(self): + """Test path splitting into components""" + fs = OneLakeFileSystem(anon=True) + + # Test full path + workspace, lakehouse, file_path = fs.split_path("workspace/lakehouse/folder/file.txt") + assert workspace == "workspace" + assert lakehouse == "lakehouse" + assert file_path == "folder/file.txt" + + # Test partial paths + workspace, lakehouse, file_path = fs.split_path("workspace/lakehouse") + assert workspace == "workspace" + assert lakehouse == "lakehouse" + assert file_path == "" + + workspace, lakehouse, file_path = fs.split_path("workspace") + assert workspace == "workspace" + assert lakehouse == "" + assert file_path == "" + + # Test empty path + workspace, lakehouse, file_path = fs.split_path("") + assert workspace == "" + assert lakehouse == "" + assert file_path == "" + + @pytest.mark.asyncio + async def test_ls_with_workspace_and_lakehouse(self): + """Test listing files when workspace and lakehouse are known""" + with patch('adlfs.onelake.AIODataLakeServiceClient'): + fs = OneLakeFileSystem( + workspace_name="test_workspace", + lakehouse_name="test_lakehouse", + anon=True + ) + + # Mock the service client - don't make it an AsyncMock since we need regular return values + mock_service_client = MagicMock() + fs.service_client = mock_service_client + + # Create the file system client as an async context manager + mock_file_system_client = MagicMock() # Changed from AsyncMock to MagicMock for non-async methods + mock_fs_context = create_async_context_manager_mock() + mock_fs_context.__aenter__.return_value = mock_file_system_client + mock_service_client.get_file_system_client.return_value = mock_fs_context + + # Mock path items + mock_path1 = MagicMock() + mock_path1.name = "file1.txt" + mock_path1.is_directory = False + mock_path1.content_length = 1024 + + mock_path2 = MagicMock() + mock_path2.name = "folder1" + mock_path2.is_directory = True + + class MockAsyncIterator: + def __init__(self, items): + self.items = items + self.index = 0 + + def __aiter__(self): + return self + + async def __anext__(self): + if self.index >= len(self.items): + raise StopAsyncIteration + item = self.items[self.index] + self.index += 1 + return item + + mock_file_system_client.get_paths.return_value = MockAsyncIterator([mock_path1, mock_path2]) + + result = await fs._ls("test_workspace/test_lakehouse/") + + assert len(result) == 2 + assert result[0]["name"] == "test_workspace/test_lakehouse/file1.txt" + assert result[0]["type"] == "file" + assert result[0]["size"] == 1024 + assert result[1]["name"] == "test_workspace/test_lakehouse/folder1" + assert result[1]["type"] == "directory" + + @pytest.mark.asyncio + async def test_ls_file_not_found(self): + """Test listing when path doesn't exist""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_paths.side_effect = ResourceNotFoundError("Path not found") + + fs.service_client = mock_service_client + + with pytest.raises(FileNotFoundError): + await fs._ls("workspace/lakehouse/nonexistent") + + @pytest.mark.asyncio + async def test_info_file(self): + """Test getting file information""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + + mock_properties = MagicMock() + mock_properties.size = 2048 + mock_properties.last_modified = "2023-01-01T00:00:00Z" + + mock_file_client.get_file_properties.return_value = mock_properties + fs.service_client = mock_service_client + + result = await fs._info("workspace/lakehouse/file.txt") + + assert result["name"] == "workspace/lakehouse/file.txt" + assert result["type"] == "file" + assert result["size"] == 2048 + + @pytest.mark.asyncio + async def test_info_directory(self): + """Test getting directory information""" + fs = OneLakeFileSystem(anon=True) + + result = await fs._info("workspace/lakehouse") + + assert result["name"] == "workspace/lakehouse" + assert result["type"] == "directory" + assert result["size"] is None + + @pytest.mark.asyncio + async def test_cat_file(self): + """Test reading file content""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + mock_download_stream = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.download_file.return_value = mock_download_stream + mock_download_stream.readall.return_value = b"file content" + + fs.service_client = mock_service_client + + result = await fs._cat_file("workspace/lakehouse/file.txt") + + assert result == b"file content" + mock_file_client.download_file.assert_called_once_with(offset=None, length=None) + + @pytest.mark.asyncio + async def test_cat_file_with_range(self): + """Test reading file content with byte range""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + mock_download_stream = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.download_file.return_value = mock_download_stream + mock_download_stream.readall.return_value = b"content" + + fs.service_client = mock_service_client + + result = await fs._cat_file("workspace/lakehouse/file.txt", start=10, end=20) + + assert result == b"content" + mock_file_client.download_file.assert_called_once_with(offset=10, length=10) + + @pytest.mark.asyncio + async def test_cat_file_not_found(self): + """Test reading non-existent file""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.download_file.side_effect = ResourceNotFoundError("File not found") + + fs.service_client = mock_service_client + + with pytest.raises(FileNotFoundError): + await fs._cat_file("workspace/lakehouse/nonexistent.txt") + + @pytest.mark.asyncio + async def test_pipe_file(self): + """Test writing file content""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + + fs.service_client = mock_service_client + + test_data = b"test data" + await fs._pipe_file("workspace/lakehouse/newfile.txt", test_data) + + mock_file_client.create_file.assert_called_once() + mock_file_client.append_data.assert_called_once_with(test_data, offset=0, length=len(test_data)) + mock_file_client.flush_data.assert_called_once_with(len(test_data)) + + @pytest.mark.asyncio + async def test_rm_file(self): + """Test deleting a file""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + + fs.service_client = mock_service_client + + await fs._rm_file("workspace/lakehouse/file.txt") + + mock_file_client.delete_file.assert_called_once() + + @pytest.mark.asyncio + async def test_rm_file_not_found(self): + """Test deleting non-existent file (should not raise)""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_file_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_file_client.return_value = mock_file_client + mock_file_client.delete_file.side_effect = ResourceNotFoundError("File not found") + + fs.service_client = mock_service_client + + # Should not raise an exception + await fs._rm_file("workspace/lakehouse/nonexistent.txt") + + @pytest.mark.asyncio + async def test_mkdir(self): + """Test creating a directory""" + fs = OneLakeFileSystem(anon=True) + + mock_service_client = AsyncMock() + mock_file_system_client = AsyncMock() + mock_directory_client = AsyncMock() + + mock_service_client.get_file_system_client.return_value = mock_file_system_client + mock_file_system_client.get_directory_client.return_value = mock_directory_client + + fs.service_client = mock_service_client + + await fs._mkdir("workspace/lakehouse/newfolder") + + mock_directory_client.create_directory.assert_called_once() + + def test_mkdir_invalid_path(self): + """Test creating directory with invalid path""" + fs = OneLakeFileSystem(anon=True) + + with pytest.raises(NotImplementedError): + fs.mkdir("workspace") # Can't create workspace + + def test_invalid_path_formats(self): + """Test handling of invalid path formats""" + fs = OneLakeFileSystem(anon=True) + + # Test invalid paths for file operations + with pytest.raises(ValueError): + OneLakeFile(fs, "invalid_path", mode="rb") + + with pytest.raises(ValueError): + OneLakeFile(fs, "workspace", mode="rb") # Missing lakehouse and file + + with pytest.raises(ValueError): + OneLakeFile(fs, "workspace/lakehouse", mode="rb") # Missing file + + +class TestOneLakeFile: + """Test cases for OneLakeFile""" + + def test_init_valid_path(self): + """Test file initialization with valid path""" + fs = OneLakeFileSystem(anon=True) + + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="rb") + + assert file_obj.workspace == "workspace" + assert file_obj.lakehouse == "lakehouse" + assert file_obj.file_path == "file.txt" + assert file_obj.container_path == "workspace/lakehouse" + + def test_init_invalid_path(self): + """Test file initialization with invalid path""" + fs = OneLakeFileSystem(anon=True) + + with pytest.raises(ValueError): + OneLakeFile(fs, "invalid", mode="rb") + + @patch('adlfs.onelake.sync') + def test_fetch_range(self, mock_sync): + """Test fetching byte range from file""" + fs = OneLakeFileSystem(anon=True) + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="rb") + + mock_sync.return_value = b"test data" + + result = file_obj._fetch_range(0, 10) + + assert result == b"test data" + mock_sync.assert_called_once() + + @patch('adlfs.onelake.sync') + def test_upload_chunk(self, mock_sync): + """Test uploading chunk of data""" + fs = OneLakeFileSystem(anon=True) + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="wb") + + # Mock the buffer + file_obj.buffer = MagicMock() + file_obj.buffer.getvalue.return_value = b"test data" + file_obj.offset = 0 + + result = file_obj._upload_chunk(final=True) + + assert result is True + mock_sync.assert_called_once() + + def test_protocols(self): + """Test that OneLake protocols are registered""" + assert "onelake" in OneLakeFileSystem.protocol + assert "abfss" in OneLakeFileSystem.protocol + + +class TestOneLakeURLRouting: + """Test URL routing between AzureBlobFileSystem and OneLakeFileSystem""" + + def test_onelake_url_routing(self): + """Test that OneLake URLs are properly parsed and routed.""" + # OneLake URLs should be handled by OneLakeFileSystem + onelake_url = "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test" + + # Test OneLakeFileSystem can handle both protocols + assert "abfss" in OneLakeFileSystem.protocol + assert "onelake" in OneLakeFileSystem.protocol + + # Test URL parsing for OneLake + kwargs = OneLakeFileSystem._get_kwargs_from_urls(onelake_url) + assert kwargs.get("account_name") == "onelake" + assert kwargs.get("workspace_name") == "q_dev_workspace" + assert kwargs.get("lakehouse_name") == "qdata_dev_lh.Lakehouse" + + # Test path stripping for OneLake URLs + stripped = OneLakeFileSystem._strip_protocol(onelake_url) + assert stripped == "q_dev_workspace/qdata_dev_lh.Lakehouse/Files/Upload_Test" + + def test_azure_blob_url_routing(self): + """Test that regular Azure Storage URLs are handled by AzureBlobFileSystem.""" + from adlfs import AzureBlobFileSystem + + # Regular Azure Storage URL + azure_url = "abfss://container@storageaccount.dfs.core.windows.net/path/to/file" + + # Test that AzureBlobFileSystem doesn't process OneLake URLs + onelake_url = "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/path" + onelake_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(onelake_url) + assert not onelake_kwargs # Should return empty dict for OneLake URLs + + # Test that AzureBlobFileSystem handles regular Azure URLs + azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) + assert azure_kwargs.get("account_name") == "storageaccount" + + def test_onelake_strip_protocol_variations(self): + """Test OneLake URL stripping with different URL formats.""" + + test_cases = [ + # (input_url, expected_stripped_path) + ("abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt"), + ("onelake://workspace/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt"), + ("workspace/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt"), + ] + + for url, expected in test_cases: + result = OneLakeFileSystem._strip_protocol(url) + assert result == expected, f"Failed for URL: {url}" + + def test_onelake_get_kwargs_variations(self): + """Test OneLake URL parameter extraction with different formats.""" + + test_cases = [ + # abfss format with workspace in host part + { + "url": "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test", + "expected": { + "account_name": "onelake", + "workspace_name": "q_dev_workspace", + "lakehouse_name": "qdata_dev_lh.Lakehouse" + } + }, + # onelake format with workspace in path + { + "url": "onelake://onelake.dfs.fabric.microsoft.com/workspace/lakehouse/Files/test.txt", + "expected": { + "account_name": "onelake", + "workspace_name": "workspace", + "lakehouse_name": "lakehouse" + } + } + ] + + for test_case in test_cases: + kwargs = OneLakeFileSystem._get_kwargs_from_urls(test_case["url"]) + for key, expected_value in test_case["expected"].items(): + assert kwargs.get(key) == expected_value, \ + f"Failed for URL: {test_case['url']}, key: {key}, got: {kwargs.get(key)}, expected: {expected_value}" + + def test_azure_blob_ignores_onelake_domains(self): + """Test that AzureBlobFileSystem ignores OneLake domain URLs.""" + from adlfs import AzureBlobFileSystem + + onelake_urls = [ + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + "abfs://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + ] + + for url in onelake_urls: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + # Should return empty dict (no account_name extracted) + assert kwargs == {}, f"AzureBlobFileSystem should ignore OneLake URL: {url}" + + def test_protocol_overlap_handling(self): + """Test that protocol overlap between filesystems is handled correctly.""" + from adlfs import AzureBlobFileSystem + + # Both filesystems support abfss protocol + assert "abfss" in AzureBlobFileSystem.protocol + assert "abfss" in OneLakeFileSystem.protocol + + # But they should handle different domains + azure_url = "abfss://container@account.dfs.core.windows.net/file" + onelake_url = "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file" + + # Azure should handle core.windows.net, ignore fabric.microsoft.com + azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) + onelake_kwargs_from_azure = AzureBlobFileSystem._get_kwargs_from_urls(onelake_url) + + assert azure_kwargs.get("account_name") == "account" + assert onelake_kwargs_from_azure == {} # Should be empty + + # OneLake should handle fabric.microsoft.com URLs + onelake_kwargs = OneLakeFileSystem._get_kwargs_from_urls(onelake_url) + assert onelake_kwargs.get("account_name") == "onelake" + assert onelake_kwargs.get("workspace_name") == "workspace" + + +class TestOneLakeIntegration: + """Integration tests for OneLake functionality""" + + def test_fsspec_integration(self): + """Test that OneLake can be used with fsspec.open""" + import fsspec + + # Register the protocol + fsspec.register_implementation("onelake", OneLakeFileSystem) + + # Test that the protocol is registered + assert "onelake" in fsspec.available_protocols() + + # Test URL parsing + with mock.patch('adlfs.onelake.OneLakeFileSystem.do_connect'): + fs = fsspec.filesystem("onelake", anon=True) + assert isinstance(fs, OneLakeFileSystem) + + def test_sync_methods(self): + """Test that sync wrapper methods work""" + fs = OneLakeFileSystem(anon=True) + + # These should be callable (though they might raise without proper mocking) + assert hasattr(fs, 'ls') + assert hasattr(fs, 'info') + assert hasattr(fs, 'cat_file') + assert hasattr(fs, 'pipe_file') + assert hasattr(fs, 'rm_file') + assert hasattr(fs, 'mkdir') + assert callable(fs.ls) + assert callable(fs.info) + assert callable(fs.cat_file) + assert callable(fs.pipe_file) + assert callable(fs.rm_file) + assert callable(fs.mkdir) \ No newline at end of file diff --git a/adlfs/tests/test_uri_format.py b/adlfs/tests/test_uri_format.py index 5f42ef03..e752e187 100644 --- a/adlfs/tests/test_uri_format.py +++ b/adlfs/tests/test_uri_format.py @@ -54,3 +54,57 @@ def test_account_name_from_url(): "abfs://test@some_account_name.dfs.core.windows.net/some_file" ) assert kwargs["account_name"] == "some_account_name" + + +def test_azure_storage_url_routing(): + """Test that AzureBlobFileSystem correctly handles Azure Storage URLs""" + + # Test various Azure Storage URL formats + azure_urls_and_expected = [ + ("abfss://container@account.dfs.core.windows.net/file", "account"), + ("abfs://container@account.dfs.core.windows.net/file", "account"), + ("abfss://container@account.blob.core.windows.net/file", "account"), + ("az://container@account.blob.core.windows.net/file", "account"), + ] + + for url, expected_account in azure_urls_and_expected: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + assert kwargs.get("account_name") == expected_account, f"Failed for URL: {url}" + + +def test_onelake_url_ignored_by_azure_blob_fs(): + """Test that AzureBlobFileSystem ignores OneLake URLs""" + + # OneLake URLs should be ignored by AzureBlobFileSystem + onelake_urls = [ + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + "abfs://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + ] + + for url in onelake_urls: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + # Should return empty dict (no account_name extracted) + assert kwargs == {}, f"AzureBlobFileSystem should ignore OneLake URL: {url}" + + +def test_azure_vs_onelake_domain_routing(): + """Test that domain-based routing works correctly""" + + # Azure Storage domains should be handled by AzureBlobFileSystem + azure_domains = [ + "abfss://container@account.dfs.core.windows.net/file", + "abfss://container@account.blob.core.windows.net/file", + ] + + for url in azure_domains: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + assert kwargs.get("account_name") == "account", f"Azure domain not handled correctly: {url}" + + # OneLake domains should be ignored by AzureBlobFileSystem + onelake_domains = [ + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", + ] + + for url in onelake_domains: + kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) + assert kwargs == {}, f"OneLake domain should be ignored: {url}" diff --git a/pyproject.toml b/pyproject.toml index efe3b32a..4457c6d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "azure-datalake-store>=0.0.53,<0.1", "azure-identity", "azure-storage-blob>=12.17.0", + "azure-storage-file-datalake>=12.17.0", "fsspec>=2023.12.0", "aiohttp>=3.7.0", ] @@ -44,6 +45,17 @@ tests = ["pytest", "docker", "pytest-mock", "arrow", "dask[dataframe]"] [project.entry-points."fsspec.specs"] abfss = "adlfs:AzureBlobFileSystem" +onelake = "adlfs:OneLakeFileSystem" + +[dependency-groups] +dev = [ + "arrow>=1.3.0", + "dask[dataframe]>=2024.8.0", + "docker>=7.1.0", + "pytest>=8.4.2", + "pytest-asyncio>=1.2.0", + "pytest-mock>=3.15.0", +] [tool.setuptools.packages.find] include = ["adlfs*"] diff --git a/requirements/latest.txt b/requirements/latest.txt index 0ef2232b..c4b7f4d1 100644 --- a/requirements/latest.txt +++ b/requirements/latest.txt @@ -2,4 +2,5 @@ fsspec azure-core<2.0.0 azure-datalake-store -azure-storage-blob \ No newline at end of file +azure-storage-blob +azure-storage-file-datalake \ No newline at end of file From de48a927d22251013d548707413afb400fbc4bed Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Fri, 12 Sep 2025 13:43:54 +0200 Subject: [PATCH 2/4] fixes --- adlfs/onelake.py | 2 +- pyproject.toml | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/adlfs/onelake.py b/adlfs/onelake.py index 1a5c5966..bb149df8 100644 --- a/adlfs/onelake.py +++ b/adlfs/onelake.py @@ -452,4 +452,4 @@ def _upload_chunk(self, final: bool = False, **kwargs): sync(self.fs.loop, self.fs._pipe_file, self.path, data) self.offset = self.offset + len(data) if self.offset else len(data) return True - return False \ No newline at end of file + return False diff --git a/pyproject.toml b/pyproject.toml index 4457c6d7..4c60f5df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,16 +47,6 @@ tests = ["pytest", "docker", "pytest-mock", "arrow", "dask[dataframe]"] abfss = "adlfs:AzureBlobFileSystem" onelake = "adlfs:OneLakeFileSystem" -[dependency-groups] -dev = [ - "arrow>=1.3.0", - "dask[dataframe]>=2024.8.0", - "docker>=7.1.0", - "pytest>=8.4.2", - "pytest-asyncio>=1.2.0", - "pytest-mock>=3.15.0", -] - [tool.setuptools.packages.find] include = ["adlfs*"] exclude = ["tests"] From ba7da88d0ea49826be1f0f9c9f494454fc3181e0 Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Fri, 12 Sep 2025 13:58:45 +0200 Subject: [PATCH 3/4] shorten --- adlfs/onelake.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/adlfs/onelake.py b/adlfs/onelake.py index bb149df8..68233c1d 100644 --- a/adlfs/onelake.py +++ b/adlfs/onelake.py @@ -18,9 +18,8 @@ class OneLakeFileSystem(AsyncFileSystem): """ Access Microsoft OneLake using Azure Data Lake Storage Gen2 API. - OneLake is the built-in, multi-cloud data lake for Microsoft Fabric that's automatically provisioned - with every Microsoft Fabric tenant. It's built on top of Azure Data Lake Storage Gen2 and provides - a unified data layer for analytics. + OneLake is the built-in, data lake for Microsoft Fabric that's automatically provisioned + with every Microsoft Fabric tenant. Parameters ---------- From 2bd12d430aed79fed0ab2483bd416e41d701dcf1 Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Fri, 12 Sep 2025 16:57:31 +0200 Subject: [PATCH 4/4] docs and linting --- adlfs/__init__.py | 8 +- adlfs/onelake.py | 275 ++++++++++++--------- adlfs/spec.py | 2 +- adlfs/tests/test_onelake.py | 424 +++++++++++++++++++-------------- adlfs/tests/test_uri_format.py | 20 +- docs/api.md | 28 ++- docs/index.md | 91 ++++++- 7 files changed, 543 insertions(+), 305 deletions(-) diff --git a/adlfs/__init__.py b/adlfs/__init__.py index 2fc8a8b8..f55776e6 100644 --- a/adlfs/__init__.py +++ b/adlfs/__init__.py @@ -3,4 +3,10 @@ from .spec import AzureBlobFile, AzureBlobFileSystem from .utils import __version__, version_tuple # noqa: F401 -__all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem", "OneLakeFileSystem", "OneLakeFile"] +__all__ = [ + "AzureBlobFileSystem", + "AzureBlobFile", + "AzureDatalakeFileSystem", + "OneLakeFileSystem", + "OneLakeFile", +] diff --git a/adlfs/onelake.py b/adlfs/onelake.py index 68233c1d..5cb2551d 100644 --- a/adlfs/onelake.py +++ b/adlfs/onelake.py @@ -6,7 +6,9 @@ import os from azure.core.exceptions import ResourceNotFoundError -from azure.storage.filedatalake.aio import DataLakeServiceClient as AIODataLakeServiceClient +from azure.storage.filedatalake.aio import ( + DataLakeServiceClient as AIODataLakeServiceClient, +) from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper from fsspec.spec import AbstractBufferedFile from fsspec.utils import infer_storage_options @@ -17,10 +19,10 @@ class OneLakeFileSystem(AsyncFileSystem): """ Access Microsoft OneLake using Azure Data Lake Storage Gen2 API. - + OneLake is the built-in, data lake for Microsoft Fabric that's automatically provisioned with every Microsoft Fabric tenant. - + Parameters ---------- account_name : str @@ -39,21 +41,21 @@ class OneLakeFileSystem(AsyncFileSystem): Azure credential object for authentication anon : bool, optional Use anonymous access (default: False) - + Examples -------- >>> from adlfs import OneLakeFileSystem >>> fs = OneLakeFileSystem( ... account_name="onelake", ... tenant_id="your-tenant-id", - ... client_id="your-client-id", + ... client_id="your-client-id", ... client_secret="your-client-secret" ... ) >>> fs.ls("") """ - + protocol = ("onelake", "abfss") - + def __init__( self, account_name: str = None, @@ -66,17 +68,13 @@ def __init__( anon: bool = False, loop=None, asynchronous: bool = False, - **kwargs + **kwargs, ): # Import here to avoid circular imports from fsspec.asyn import get_loop - - super().__init__( - asynchronous=asynchronous, - loop=loop or get_loop(), - **kwargs - ) - + + super().__init__(asynchronous=asynchronous, loop=loop or get_loop(), **kwargs) + self.account_name = account_name or "onelake" self.workspace_name = workspace_name self.lakehouse_name = lakehouse_name @@ -85,56 +83,64 @@ def __init__( self.client_secret = client_secret or os.getenv("AZURE_CLIENT_SECRET") self.credential = credential self.anon = anon - + # OneLake uses a specific endpoint format self.account_url = f"https://{self.account_name}.dfs.fabric.microsoft.com" - + self._setup_credentials() self.do_connect() - + def _setup_credentials(self): """Setup authentication credentials for OneLake access.""" - if not self.anon and not self.credential and self.client_id and self.client_secret and self.tenant_id: + if ( + not self.anon + and not self.credential + and self.client_id + and self.client_secret + and self.tenant_id + ): from azure.identity import ClientSecretCredential - from azure.identity.aio import ClientSecretCredential as AIOClientSecretCredential - + from azure.identity.aio import ( + ClientSecretCredential as AIOClientSecretCredential, + ) + self.credential = AIOClientSecretCredential( tenant_id=self.tenant_id, client_id=self.client_id, - client_secret=self.client_secret + client_secret=self.client_secret, ) self.sync_credential = ClientSecretCredential( tenant_id=self.tenant_id, client_id=self.client_id, - client_secret=self.client_secret + client_secret=self.client_secret, ) - + def do_connect(self): """Establish connection to OneLake service.""" import weakref - + if self.credential: self.service_client = AIODataLakeServiceClient( - account_url=self.account_url, - credential=self.credential + account_url=self.account_url, credential=self.credential ) elif self.anon: - self.service_client = AIODataLakeServiceClient( - account_url=self.account_url - ) + self.service_client = AIODataLakeServiceClient(account_url=self.account_url) else: - raise ValueError("OneLake requires authentication. Provide credentials or set anon=True") - + raise ValueError( + "OneLake requires authentication. Provide credentials or set anon=True" + ) + # Setup cleanup for service client weakref.finalize(self, self._cleanup_service_client, self.service_client) - + @staticmethod def _cleanup_service_client(service_client): """Cleanup service client resources""" try: - if hasattr(service_client, 'close'): + if hasattr(service_client, "close"): # For sync cleanup, we need to use asyncio import asyncio + try: loop = asyncio.get_event_loop() if loop.is_running(): @@ -147,7 +153,7 @@ def _cleanup_service_client(service_client): pass except Exception: pass - + @classmethod def _strip_protocol(cls, path: str): """Remove the protocol from the path.""" @@ -157,6 +163,7 @@ def _strip_protocol(cls, path: str): elif path.startswith("abfss://"): # For abfss URLs, we need to parse the URL and extract the path from urllib.parse import urlparse + parsed = urlparse(path) if "onelake.dfs.fabric.microsoft.com" in parsed.netloc: # Extract workspace from username part and combine with path @@ -169,24 +176,24 @@ def _strip_protocol(cls, path: str): # Not a OneLake URL, return as-is without protocol path = path[8:] # Remove "abfss://" return path.lstrip("/") - + @staticmethod def _get_kwargs_from_urls(urlpath): """Extract parameters from OneLake URLs.""" ops = infer_storage_options(urlpath) out = {} - + # Parse OneLake-specific URL structure host = ops.get("host", "") if "onelake.dfs.fabric.microsoft.com" in host: out["account_name"] = "onelake" - + # Check if we have username (from abfss://username@host format) username = ops.get("username") if username: # For abfss URLs, workspace is the username part out["workspace_name"] = username - + # Lakehouse is the first part of the path path_parts = ops.get("path", "").strip("/").split("/") if len(path_parts) >= 1 and path_parts[0]: @@ -198,181 +205,221 @@ def _get_kwargs_from_urls(urlpath): out["workspace_name"] = path_parts[0] if len(path_parts) >= 2 and path_parts[1]: out["lakehouse_name"] = path_parts[1] - + return out - + def split_path(self, path: str): """Split OneLake path into workspace, lakehouse, and file path components.""" path = self._strip_protocol(path).strip("/") - + if not path: return "", "", "" - + parts = path.split("/", 2) workspace = parts[0] if len(parts) > 0 else "" lakehouse = parts[1] if len(parts) > 1 else "" file_path = parts[2] if len(parts) > 2 else "" - + return workspace, lakehouse, file_path - + async def _ls(self, path: str = "", detail: bool = True, **kwargs): """List files and directories in OneLake.""" path = self._strip_protocol(path).strip("/") workspace, lakehouse, file_path = self.split_path(path) - + if not workspace: # List workspaces (this would require Fabric API, simplified for now) if self.workspace_name: - return [{"name": self.workspace_name, "type": "directory", "size": None}] + return [ + {"name": self.workspace_name, "type": "directory", "size": None} + ] else: - raise NotImplementedError("Listing all workspaces requires Fabric API access") - + raise NotImplementedError( + "Listing all workspaces requires Fabric API access" + ) + if not lakehouse: # List lakehouses in workspace (simplified) if self.lakehouse_name: full_path = f"{workspace}/{self.lakehouse_name}" return [{"name": full_path, "type": "directory", "size": None}] else: - raise NotImplementedError("Listing lakehouses requires Fabric API access") - + raise NotImplementedError( + "Listing lakehouses requires Fabric API access" + ) + # For OneLake, the file system is the workspace, and lakehouse is part of the path file_system_name = workspace lakehouse_path = f"{lakehouse}/{file_path}" if file_path else lakehouse - + try: - async with self.service_client.get_file_system_client(file_system_name) as file_system_client: - paths = file_system_client.get_paths(path=lakehouse_path, recursive=False) - + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + paths = file_system_client.get_paths( + path=lakehouse_path, recursive=False + ) + results = [] async for path_item in paths: # Construct the full path correctly based on the Azure response full_name = f"{workspace}/{path_item.name}" - results.append({ - "name": full_name, - "type": "directory" if path_item.is_directory else "file", - "size": path_item.content_length if hasattr(path_item, 'content_length') else None, - "last_modified": path_item.last_modified if hasattr(path_item, 'last_modified') else None - }) - + results.append( + { + "name": full_name, + "type": "directory" if path_item.is_directory else "file", + "size": path_item.content_length + if hasattr(path_item, "content_length") + else None, + "last_modified": path_item.last_modified + if hasattr(path_item, "last_modified") + else None, + } + ) + return results if detail else [r["name"] for r in results] - + except ResourceNotFoundError: raise FileNotFoundError(f"Path not found: {path}") - + async def _info(self, path: str, **kwargs): """Get information about a file or directory.""" path = self._strip_protocol(path).strip("/") workspace, lakehouse, file_path = self.split_path(path) - + if not workspace or not lakehouse: return {"name": path, "type": "directory", "size": None} - + # For OneLake, the file system is the workspace, and lakehouse is part of the path file_system_name = workspace lakehouse_file_path = f"{lakehouse}/{file_path}" if file_path else lakehouse - + try: - async with self.service_client.get_file_system_client(file_system_name) as file_system_client: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: if file_path: - async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: properties = await file_client.get_file_properties() return { "name": path, "type": "file", "size": properties.size, - "last_modified": properties.last_modified + "last_modified": properties.last_modified, } else: # Directory return {"name": path, "type": "directory", "size": None} - + except ResourceNotFoundError: raise FileNotFoundError(f"Path not found: {path}") - + async def _cat_file(self, path: str, start: int = None, end: int = None, **kwargs): """Read file content from OneLake.""" path = self._strip_protocol(path).strip("/") workspace, lakehouse, file_path = self.split_path(path) - + if not workspace or not lakehouse or not file_path: raise ValueError("Invalid path format for OneLake file") - + # For OneLake, the file system is the workspace, and lakehouse is part of the path file_system_name = workspace lakehouse_file_path = f"{lakehouse}/{file_path}" - + try: - async with self.service_client.get_file_system_client(file_system_name) as file_system_client: - async with file_system_client.get_file_client(lakehouse_file_path) as file_client: - download_stream = await file_client.download_file(offset=start, length=end-start if end else None) + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: + download_stream = await file_client.download_file( + offset=start, length=end - start if end else None + ) return await download_stream.readall() - + except ResourceNotFoundError: raise FileNotFoundError(f"File not found: {path}") - + async def _pipe_file(self, path: str, data: bytes, **kwargs): """Write data to a file in OneLake.""" path = self._strip_protocol(path).strip("/") workspace, lakehouse, file_path = self.split_path(path) - + if not workspace or not lakehouse or not file_path: raise ValueError("Invalid path format for OneLake file") - + # For OneLake, the file system is the workspace, and lakehouse is part of the path file_system_name = workspace lakehouse_file_path = f"{lakehouse}/{file_path}" - + try: - async with self.service_client.get_file_system_client(file_system_name) as file_system_client: - async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: await file_client.create_file() await file_client.append_data(data, offset=0, length=len(data)) await file_client.flush_data(len(data)) - + except Exception as e: raise IOError(f"Failed to write file {path}: {e}") - + async def _rm_file(self, path: str, **kwargs): """Delete a file from OneLake.""" path = self._strip_protocol(path).strip("/") workspace, lakehouse, file_path = self.split_path(path) - + if not workspace or not lakehouse or not file_path: raise ValueError("Invalid path format for OneLake file") - + # For OneLake, the file system is the workspace, and lakehouse is part of the path file_system_name = workspace lakehouse_file_path = f"{lakehouse}/{file_path}" - + try: - async with self.service_client.get_file_system_client(file_system_name) as file_system_client: - async with file_system_client.get_file_client(lakehouse_file_path) as file_client: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_file_client( + lakehouse_file_path + ) as file_client: await file_client.delete_file() - + except ResourceNotFoundError: pass # File already deleted - + async def _mkdir(self, path: str, create_parents: bool = True, **kwargs): """Create a directory in OneLake.""" path = self._strip_protocol(path).strip("/") workspace, lakehouse, file_path = self.split_path(path) - + if not workspace or not lakehouse: # Can't create workspaces or lakehouses through this API - raise NotImplementedError("Creating workspaces/lakehouses requires Fabric API") - + raise NotImplementedError( + "Creating workspaces/lakehouses requires Fabric API" + ) + # For OneLake, the file system is the workspace, and lakehouse is part of the path file_system_name = workspace lakehouse_dir_path = f"{lakehouse}/{file_path}" - + try: - async with self.service_client.get_file_system_client(file_system_name) as file_system_client: - async with file_system_client.get_directory_client(lakehouse_dir_path) as directory_client: + async with self.service_client.get_file_system_client( + file_system_name + ) as file_system_client: + async with file_system_client.get_directory_client( + lakehouse_dir_path + ) as directory_client: await directory_client.create_directory() - + except Exception as e: raise IOError(f"Failed to create directory {path}: {e}") - + # Sync wrappers ls = sync_wrapper(_ls) info = sync_wrapper(_info) @@ -380,7 +427,7 @@ async def _mkdir(self, path: str, create_parents: bool = True, **kwargs): pipe_file = sync_wrapper(_pipe_file) rm_file = sync_wrapper(_rm_file) mkdir = sync_wrapper(_mkdir) - + def _open( self, path: str, @@ -388,7 +435,7 @@ def _open( block_size: int = None, autocommit: bool = True, cache_options: dict = None, - **kwargs + **kwargs, ): """Open a file for reading or writing.""" return OneLakeFile( @@ -398,13 +445,13 @@ def _open( block_size=block_size, autocommit=autocommit, cache_options=cache_options or {}, - **kwargs + **kwargs, ) class OneLakeFile(AbstractBufferedFile): """File-like operations on OneLake files.""" - + def __init__( self, fs: OneLakeFileSystem, @@ -413,36 +460,36 @@ def __init__( block_size: int = None, autocommit: bool = True, cache_options: dict = None, - **kwargs + **kwargs, ): self.fs = fs self.path = path self.mode = mode self.autocommit = autocommit - + workspace, lakehouse, file_path = fs.split_path(path) self.workspace = workspace self.lakehouse = lakehouse self.file_path = file_path - + if not workspace or not lakehouse or not file_path: raise ValueError("Invalid OneLake path format") - + self.container_path = f"{workspace}/{lakehouse}" - + super().__init__( fs=fs, path=path, mode=mode, block_size=block_size, cache_options=cache_options or {}, - **kwargs + **kwargs, ) - + def _fetch_range(self, start: int, end: int): """Fetch a range of bytes from the file.""" return sync(self.fs.loop, self.fs._cat_file, self.path, start=start, end=end) - + def _upload_chunk(self, final: bool = False, **kwargs): """Upload a chunk of data.""" if self.mode in {"wb", "ab"}: diff --git a/adlfs/spec.py b/adlfs/spec.py index a7efb138..c4b104fb 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -441,7 +441,7 @@ def _get_kwargs_from_urls(urlpath): # This is a OneLake URL, don't process it here # The fsspec registry should route to OneLakeFileSystem return out - + match = re.match( r"(?P.+)\.(dfs|blob)\.core\.windows\.net", host ) diff --git a/adlfs/tests/test_onelake.py b/adlfs/tests/test_onelake.py index a0cb9dca..67f9f384 100644 --- a/adlfs/tests/test_onelake.py +++ b/adlfs/tests/test_onelake.py @@ -1,11 +1,12 @@ import os -import pytest from unittest import mock from unittest.mock import AsyncMock, MagicMock, patch -from adlfs import OneLakeFileSystem, OneLakeFile +import pytest from azure.core.exceptions import ResourceNotFoundError +from adlfs import OneLakeFile, OneLakeFileSystem + def create_async_context_manager_mock(): """Helper to create a proper async context manager mock.""" @@ -22,13 +23,13 @@ def test_init_with_credentials(self): """Test initialization with client credentials""" fs = OneLakeFileSystem( account_name="onelake", - workspace_name="test_workspace", + workspace_name="test_workspace", lakehouse_name="test_lakehouse", tenant_id="test-tenant", client_id="test-client", - client_secret="test-secret" + client_secret="test-secret", ) - + assert fs.account_name == "onelake" assert fs.workspace_name == "test_workspace" assert fs.lakehouse_name == "test_lakehouse" @@ -41,43 +42,64 @@ def test_init_with_credentials(self): def test_init_anonymous(self): """Test initialization with anonymous access""" fs = OneLakeFileSystem(anon=True) - + assert fs.account_name == "onelake" assert fs.anon is True def test_init_with_env_vars(self): """Test initialization using environment variables""" - with mock.patch.dict(os.environ, { - 'AZURE_TENANT_ID': 'env-tenant', - 'AZURE_CLIENT_ID': 'env-client', - 'AZURE_CLIENT_SECRET': 'env-secret' - }): + with mock.patch.dict( + os.environ, + { + "AZURE_TENANT_ID": "env-tenant", + "AZURE_CLIENT_ID": "env-client", + "AZURE_CLIENT_SECRET": "env-secret", + }, + ): fs = OneLakeFileSystem() - + assert fs.tenant_id == "env-tenant" assert fs.client_id == "env-client" assert fs.client_secret == "env-secret" def test_strip_protocol(self): """Test URL protocol stripping""" - assert OneLakeFileSystem._strip_protocol("onelake://workspace/lakehouse/file.txt") == "workspace/lakehouse/file.txt" - assert OneLakeFileSystem._strip_protocol("workspace/lakehouse/file.txt") == "workspace/lakehouse/file.txt" - assert OneLakeFileSystem._strip_protocol("/workspace/lakehouse/file.txt") == "workspace/lakehouse/file.txt" - + assert ( + OneLakeFileSystem._strip_protocol("onelake://workspace/lakehouse/file.txt") + == "workspace/lakehouse/file.txt" + ) + assert ( + OneLakeFileSystem._strip_protocol("workspace/lakehouse/file.txt") + == "workspace/lakehouse/file.txt" + ) + assert ( + OneLakeFileSystem._strip_protocol("/workspace/lakehouse/file.txt") + == "workspace/lakehouse/file.txt" + ) + # Test abfss URL stripping for OneLake - assert OneLakeFileSystem._strip_protocol("abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/file.txt") == "workspace/lakehouse/Files/file.txt" + assert ( + OneLakeFileSystem._strip_protocol( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/file.txt" + ) + == "workspace/lakehouse/Files/file.txt" + ) def test_get_kwargs_from_urls(self): """Test URL parsing for kwargs extraction""" - kwargs = OneLakeFileSystem._get_kwargs_from_urls("onelake://onelake.dfs.fabric.microsoft.com/workspace/lakehouse/file.txt") - + kwargs = OneLakeFileSystem._get_kwargs_from_urls( + "onelake://onelake.dfs.fabric.microsoft.com/workspace/lakehouse/file.txt" + ) + assert kwargs.get("account_name") == "onelake" - assert kwargs.get("workspace_name") == "workspace" + assert kwargs.get("workspace_name") == "workspace" assert kwargs.get("lakehouse_name") == "lakehouse" - + # Test abfss URL parsing for OneLake - abfss_kwargs = OneLakeFileSystem._get_kwargs_from_urls("abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test") - + abfss_kwargs = OneLakeFileSystem._get_kwargs_from_urls( + "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test" + ) + assert abfss_kwargs.get("account_name") == "onelake" assert abfss_kwargs.get("workspace_name") == "q_dev_workspace" assert abfss_kwargs.get("lakehouse_name") == "qdata_dev_lh.Lakehouse" @@ -85,79 +107,85 @@ def test_get_kwargs_from_urls(self): def test_split_path(self): """Test path splitting into components""" fs = OneLakeFileSystem(anon=True) - + # Test full path - workspace, lakehouse, file_path = fs.split_path("workspace/lakehouse/folder/file.txt") + workspace, lakehouse, file_path = fs.split_path( + "workspace/lakehouse/folder/file.txt" + ) assert workspace == "workspace" - assert lakehouse == "lakehouse" + assert lakehouse == "lakehouse" assert file_path == "folder/file.txt" - + # Test partial paths workspace, lakehouse, file_path = fs.split_path("workspace/lakehouse") assert workspace == "workspace" assert lakehouse == "lakehouse" assert file_path == "" - + workspace, lakehouse, file_path = fs.split_path("workspace") assert workspace == "workspace" assert lakehouse == "" assert file_path == "" - + # Test empty path workspace, lakehouse, file_path = fs.split_path("") assert workspace == "" assert lakehouse == "" assert file_path == "" - @pytest.mark.asyncio + @pytest.mark.asyncio async def test_ls_with_workspace_and_lakehouse(self): """Test listing files when workspace and lakehouse are known""" - with patch('adlfs.onelake.AIODataLakeServiceClient'): + with patch("adlfs.onelake.AIODataLakeServiceClient"): fs = OneLakeFileSystem( workspace_name="test_workspace", - lakehouse_name="test_lakehouse", - anon=True + lakehouse_name="test_lakehouse", + anon=True, ) - + # Mock the service client - don't make it an AsyncMock since we need regular return values mock_service_client = MagicMock() fs.service_client = mock_service_client - + # Create the file system client as an async context manager - mock_file_system_client = MagicMock() # Changed from AsyncMock to MagicMock for non-async methods + mock_file_system_client = ( + MagicMock() + ) # Changed from AsyncMock to MagicMock for non-async methods mock_fs_context = create_async_context_manager_mock() mock_fs_context.__aenter__.return_value = mock_file_system_client mock_service_client.get_file_system_client.return_value = mock_fs_context - + # Mock path items mock_path1 = MagicMock() mock_path1.name = "file1.txt" mock_path1.is_directory = False mock_path1.content_length = 1024 - + mock_path2 = MagicMock() mock_path2.name = "folder1" mock_path2.is_directory = True - + class MockAsyncIterator: def __init__(self, items): self.items = items self.index = 0 - + def __aiter__(self): return self - + async def __anext__(self): if self.index >= len(self.items): raise StopAsyncIteration item = self.items[self.index] self.index += 1 return item - - mock_file_system_client.get_paths.return_value = MockAsyncIterator([mock_path1, mock_path2]) - + + mock_file_system_client.get_paths.return_value = MockAsyncIterator( + [mock_path1, mock_path2] + ) + result = await fs._ls("test_workspace/test_lakehouse/") - + assert len(result) == 2 assert result[0]["name"] == "test_workspace/test_lakehouse/file1.txt" assert result[0]["type"] == "file" @@ -169,14 +197,18 @@ async def __anext__(self): async def test_ls_file_not_found(self): """Test listing when path doesn't exist""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() - mock_service_client.get_file_system_client.return_value = mock_file_system_client - mock_file_system_client.get_paths.side_effect = ResourceNotFoundError("Path not found") - + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_paths.side_effect = ResourceNotFoundError( + "Path not found" + ) + fs.service_client = mock_service_client - + with pytest.raises(FileNotFoundError): await fs._ls("workspace/lakehouse/nonexistent") @@ -184,23 +216,25 @@ async def test_ls_file_not_found(self): async def test_info_file(self): """Test getting file information""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client - + mock_properties = MagicMock() mock_properties.size = 2048 mock_properties.last_modified = "2023-01-01T00:00:00Z" - + mock_file_client.get_file_properties.return_value = mock_properties fs.service_client = mock_service_client - + result = await fs._info("workspace/lakehouse/file.txt") - + assert result["name"] == "workspace/lakehouse/file.txt" assert result["type"] == "file" assert result["size"] == 2048 @@ -209,9 +243,9 @@ async def test_info_file(self): async def test_info_directory(self): """Test getting directory information""" fs = OneLakeFileSystem(anon=True) - + result = await fs._info("workspace/lakehouse") - + assert result["name"] == "workspace/lakehouse" assert result["type"] == "directory" assert result["size"] is None @@ -220,21 +254,23 @@ async def test_info_directory(self): async def test_cat_file(self): """Test reading file content""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() mock_download_stream = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client mock_file_client.download_file.return_value = mock_download_stream mock_download_stream.readall.return_value = b"file content" - + fs.service_client = mock_service_client - + result = await fs._cat_file("workspace/lakehouse/file.txt") - + assert result == b"file content" mock_file_client.download_file.assert_called_once_with(offset=None, length=None) @@ -242,21 +278,23 @@ async def test_cat_file(self): async def test_cat_file_with_range(self): """Test reading file content with byte range""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() mock_download_stream = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client mock_file_client.download_file.return_value = mock_download_stream mock_download_stream.readall.return_value = b"content" - + fs.service_client = mock_service_client - + result = await fs._cat_file("workspace/lakehouse/file.txt", start=10, end=20) - + assert result == b"content" mock_file_client.download_file.assert_called_once_with(offset=10, length=10) @@ -264,17 +302,21 @@ async def test_cat_file_with_range(self): async def test_cat_file_not_found(self): """Test reading non-existent file""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client - mock_file_client.download_file.side_effect = ResourceNotFoundError("File not found") - + mock_file_client.download_file.side_effect = ResourceNotFoundError( + "File not found" + ) + fs.service_client = mock_service_client - + with pytest.raises(FileNotFoundError): await fs._cat_file("workspace/lakehouse/nonexistent.txt") @@ -282,56 +324,66 @@ async def test_cat_file_not_found(self): async def test_pipe_file(self): """Test writing file content""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client - + fs.service_client = mock_service_client - + test_data = b"test data" await fs._pipe_file("workspace/lakehouse/newfile.txt", test_data) - + mock_file_client.create_file.assert_called_once() - mock_file_client.append_data.assert_called_once_with(test_data, offset=0, length=len(test_data)) + mock_file_client.append_data.assert_called_once_with( + test_data, offset=0, length=len(test_data) + ) mock_file_client.flush_data.assert_called_once_with(len(test_data)) @pytest.mark.asyncio async def test_rm_file(self): """Test deleting a file""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client - + fs.service_client = mock_service_client - + await fs._rm_file("workspace/lakehouse/file.txt") - + mock_file_client.delete_file.assert_called_once() @pytest.mark.asyncio async def test_rm_file_not_found(self): """Test deleting non-existent file (should not raise)""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_file_client = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) mock_file_system_client.get_file_client.return_value = mock_file_client - mock_file_client.delete_file.side_effect = ResourceNotFoundError("File not found") - + mock_file_client.delete_file.side_effect = ResourceNotFoundError( + "File not found" + ) + fs.service_client = mock_service_client - + # Should not raise an exception await fs._rm_file("workspace/lakehouse/nonexistent.txt") @@ -339,38 +391,42 @@ async def test_rm_file_not_found(self): async def test_mkdir(self): """Test creating a directory""" fs = OneLakeFileSystem(anon=True) - + mock_service_client = AsyncMock() mock_file_system_client = AsyncMock() mock_directory_client = AsyncMock() - - mock_service_client.get_file_system_client.return_value = mock_file_system_client - mock_file_system_client.get_directory_client.return_value = mock_directory_client - + + mock_service_client.get_file_system_client.return_value = ( + mock_file_system_client + ) + mock_file_system_client.get_directory_client.return_value = ( + mock_directory_client + ) + fs.service_client = mock_service_client - + await fs._mkdir("workspace/lakehouse/newfolder") - + mock_directory_client.create_directory.assert_called_once() def test_mkdir_invalid_path(self): """Test creating directory with invalid path""" fs = OneLakeFileSystem(anon=True) - + with pytest.raises(NotImplementedError): fs.mkdir("workspace") # Can't create workspace def test_invalid_path_formats(self): """Test handling of invalid path formats""" fs = OneLakeFileSystem(anon=True) - + # Test invalid paths for file operations with pytest.raises(ValueError): OneLakeFile(fs, "invalid_path", mode="rb") - + with pytest.raises(ValueError): OneLakeFile(fs, "workspace", mode="rb") # Missing lakehouse and file - + with pytest.raises(ValueError): OneLakeFile(fs, "workspace/lakehouse", mode="rb") # Missing file @@ -381,9 +437,9 @@ class TestOneLakeFile: def test_init_valid_path(self): """Test file initialization with valid path""" fs = OneLakeFileSystem(anon=True) - + file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="rb") - + assert file_obj.workspace == "workspace" assert file_obj.lakehouse == "lakehouse" assert file_obj.file_path == "file.txt" @@ -392,36 +448,36 @@ def test_init_valid_path(self): def test_init_invalid_path(self): """Test file initialization with invalid path""" fs = OneLakeFileSystem(anon=True) - + with pytest.raises(ValueError): OneLakeFile(fs, "invalid", mode="rb") - @patch('adlfs.onelake.sync') + @patch("adlfs.onelake.sync") def test_fetch_range(self, mock_sync): """Test fetching byte range from file""" fs = OneLakeFileSystem(anon=True) file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="rb") - + mock_sync.return_value = b"test data" - + result = file_obj._fetch_range(0, 10) - + assert result == b"test data" mock_sync.assert_called_once() - @patch('adlfs.onelake.sync') + @patch("adlfs.onelake.sync") def test_upload_chunk(self, mock_sync): """Test uploading chunk of data""" fs = OneLakeFileSystem(anon=True) file_obj = OneLakeFile(fs, "workspace/lakehouse/file.txt", mode="wb") - + # Mock the buffer file_obj.buffer = MagicMock() file_obj.buffer.getvalue.return_value = b"test data" file_obj.offset = 0 - + result = file_obj._upload_chunk(final=True) - + assert result is True mock_sync.assert_called_once() @@ -437,118 +493,140 @@ class TestOneLakeURLRouting: def test_onelake_url_routing(self): """Test that OneLake URLs are properly parsed and routed.""" # OneLake URLs should be handled by OneLakeFileSystem - onelake_url = "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test" - + onelake_url = ( + "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/" + "qdata_dev_lh.Lakehouse/Files/Upload_Test" + ) + # Test OneLakeFileSystem can handle both protocols assert "abfss" in OneLakeFileSystem.protocol assert "onelake" in OneLakeFileSystem.protocol - + # Test URL parsing for OneLake kwargs = OneLakeFileSystem._get_kwargs_from_urls(onelake_url) assert kwargs.get("account_name") == "onelake" assert kwargs.get("workspace_name") == "q_dev_workspace" assert kwargs.get("lakehouse_name") == "qdata_dev_lh.Lakehouse" - + # Test path stripping for OneLake URLs stripped = OneLakeFileSystem._strip_protocol(onelake_url) assert stripped == "q_dev_workspace/qdata_dev_lh.Lakehouse/Files/Upload_Test" - + def test_azure_blob_url_routing(self): """Test that regular Azure Storage URLs are handled by AzureBlobFileSystem.""" from adlfs import AzureBlobFileSystem - + # Regular Azure Storage URL azure_url = "abfss://container@storageaccount.dfs.core.windows.net/path/to/file" - + # Test that AzureBlobFileSystem doesn't process OneLake URLs - onelake_url = "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/path" + onelake_url = ( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/path" + ) onelake_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(onelake_url) assert not onelake_kwargs # Should return empty dict for OneLake URLs - + # Test that AzureBlobFileSystem handles regular Azure URLs - azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) + azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) assert azure_kwargs.get("account_name") == "storageaccount" - + def test_onelake_strip_protocol_variations(self): """Test OneLake URL stripping with different URL formats.""" - + test_cases = [ # (input_url, expected_stripped_path) - ("abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/test.txt", - "workspace/lakehouse/Files/test.txt"), - ("onelake://workspace/lakehouse/Files/test.txt", - "workspace/lakehouse/Files/test.txt"), - ("workspace/lakehouse/Files/test.txt", - "workspace/lakehouse/Files/test.txt"), + ( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt", + ), + ( + "onelake://workspace/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt", + ), + ( + "workspace/lakehouse/Files/test.txt", + "workspace/lakehouse/Files/test.txt", + ), ] - + for url, expected in test_cases: result = OneLakeFileSystem._strip_protocol(url) assert result == expected, f"Failed for URL: {url}" - + def test_onelake_get_kwargs_variations(self): """Test OneLake URL parameter extraction with different formats.""" - + test_cases = [ # abfss format with workspace in host part { - "url": "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/qdata_dev_lh.Lakehouse/Files/Upload_Test", + "url": ( + "abfss://q_dev_workspace@onelake.dfs.fabric.microsoft.com/" + "qdata_dev_lh.Lakehouse/Files/Upload_Test" + ), "expected": { "account_name": "onelake", - "workspace_name": "q_dev_workspace", - "lakehouse_name": "qdata_dev_lh.Lakehouse" - } + "workspace_name": "q_dev_workspace", + "lakehouse_name": "qdata_dev_lh.Lakehouse", + }, }, # onelake format with workspace in path { - "url": "onelake://onelake.dfs.fabric.microsoft.com/workspace/lakehouse/Files/test.txt", + "url": ( + "onelake://onelake.dfs.fabric.microsoft.com/" + "workspace/lakehouse/Files/test.txt" + ), "expected": { "account_name": "onelake", "workspace_name": "workspace", - "lakehouse_name": "lakehouse" - } - } + "lakehouse_name": "lakehouse", + }, + }, ] - + for test_case in test_cases: kwargs = OneLakeFileSystem._get_kwargs_from_urls(test_case["url"]) for key, expected_value in test_case["expected"].items(): - assert kwargs.get(key) == expected_value, \ - f"Failed for URL: {test_case['url']}, key: {key}, got: {kwargs.get(key)}, expected: {expected_value}" - + assert ( + kwargs.get(key) == expected_value + ), f"Failed for URL: {test_case['url']}, key: {key}, got: {kwargs.get(key)}, expected: {expected_value}" + def test_azure_blob_ignores_onelake_domains(self): """Test that AzureBlobFileSystem ignores OneLake domain URLs.""" from adlfs import AzureBlobFileSystem - + onelake_urls = [ "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", "abfs://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", ] - + for url in onelake_urls: kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) # Should return empty dict (no account_name extracted) assert kwargs == {}, f"AzureBlobFileSystem should ignore OneLake URL: {url}" - + def test_protocol_overlap_handling(self): """Test that protocol overlap between filesystems is handled correctly.""" from adlfs import AzureBlobFileSystem - + # Both filesystems support abfss protocol assert "abfss" in AzureBlobFileSystem.protocol assert "abfss" in OneLakeFileSystem.protocol - + # But they should handle different domains azure_url = "abfss://container@account.dfs.core.windows.net/file" - onelake_url = "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file" - + onelake_url = ( + "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file" + ) + # Azure should handle core.windows.net, ignore fabric.microsoft.com azure_kwargs = AzureBlobFileSystem._get_kwargs_from_urls(azure_url) - onelake_kwargs_from_azure = AzureBlobFileSystem._get_kwargs_from_urls(onelake_url) - + onelake_kwargs_from_azure = AzureBlobFileSystem._get_kwargs_from_urls( + onelake_url + ) + assert azure_kwargs.get("account_name") == "account" assert onelake_kwargs_from_azure == {} # Should be empty - + # OneLake should handle fabric.microsoft.com URLs onelake_kwargs = OneLakeFileSystem._get_kwargs_from_urls(onelake_url) assert onelake_kwargs.get("account_name") == "onelake" @@ -561,32 +639,32 @@ class TestOneLakeIntegration: def test_fsspec_integration(self): """Test that OneLake can be used with fsspec.open""" import fsspec - + # Register the protocol fsspec.register_implementation("onelake", OneLakeFileSystem) - + # Test that the protocol is registered assert "onelake" in fsspec.available_protocols() - + # Test URL parsing - with mock.patch('adlfs.onelake.OneLakeFileSystem.do_connect'): + with mock.patch("adlfs.onelake.OneLakeFileSystem.do_connect"): fs = fsspec.filesystem("onelake", anon=True) assert isinstance(fs, OneLakeFileSystem) def test_sync_methods(self): """Test that sync wrapper methods work""" fs = OneLakeFileSystem(anon=True) - + # These should be callable (though they might raise without proper mocking) - assert hasattr(fs, 'ls') - assert hasattr(fs, 'info') - assert hasattr(fs, 'cat_file') - assert hasattr(fs, 'pipe_file') - assert hasattr(fs, 'rm_file') - assert hasattr(fs, 'mkdir') + assert hasattr(fs, "ls") + assert hasattr(fs, "info") + assert hasattr(fs, "cat_file") + assert hasattr(fs, "pipe_file") + assert hasattr(fs, "rm_file") + assert hasattr(fs, "mkdir") assert callable(fs.ls) assert callable(fs.info) assert callable(fs.cat_file) assert callable(fs.pipe_file) assert callable(fs.rm_file) - assert callable(fs.mkdir) \ No newline at end of file + assert callable(fs.mkdir) diff --git a/adlfs/tests/test_uri_format.py b/adlfs/tests/test_uri_format.py index e752e187..3f20fced 100644 --- a/adlfs/tests/test_uri_format.py +++ b/adlfs/tests/test_uri_format.py @@ -58,7 +58,7 @@ def test_account_name_from_url(): def test_azure_storage_url_routing(): """Test that AzureBlobFileSystem correctly handles Azure Storage URLs""" - + # Test various Azure Storage URL formats azure_urls_and_expected = [ ("abfss://container@account.dfs.core.windows.net/file", "account"), @@ -66,7 +66,7 @@ def test_azure_storage_url_routing(): ("abfss://container@account.blob.core.windows.net/file", "account"), ("az://container@account.blob.core.windows.net/file", "account"), ] - + for url, expected_account in azure_urls_and_expected: kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) assert kwargs.get("account_name") == expected_account, f"Failed for URL: {url}" @@ -74,13 +74,13 @@ def test_azure_storage_url_routing(): def test_onelake_url_ignored_by_azure_blob_fs(): """Test that AzureBlobFileSystem ignores OneLake URLs""" - + # OneLake URLs should be ignored by AzureBlobFileSystem onelake_urls = [ "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", "abfs://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", ] - + for url in onelake_urls: kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) # Should return empty dict (no account_name extracted) @@ -89,22 +89,24 @@ def test_onelake_url_ignored_by_azure_blob_fs(): def test_azure_vs_onelake_domain_routing(): """Test that domain-based routing works correctly""" - + # Azure Storage domains should be handled by AzureBlobFileSystem azure_domains = [ "abfss://container@account.dfs.core.windows.net/file", "abfss://container@account.blob.core.windows.net/file", ] - + for url in azure_domains: kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) - assert kwargs.get("account_name") == "account", f"Azure domain not handled correctly: {url}" - + assert ( + kwargs.get("account_name") == "account" + ), f"Azure domain not handled correctly: {url}" + # OneLake domains should be ignored by AzureBlobFileSystem onelake_domains = [ "abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/file", ] - + for url in onelake_domains: kwargs = AzureBlobFileSystem._get_kwargs_from_urls(url) assert kwargs == {}, f"OneLake domain should be ignored: {url}" diff --git a/docs/api.md b/docs/api.md index 9332a4ab..0ea586c6 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,6 +1,8 @@ # API Reference -`adlfs.AzureBlobFileSystem` provides an interface for Azure Blob Storage. +## Azure Blob Storage / Data Lake Storage Gen2 + +`adlfs.AzureBlobFileSystem` provides an interface for Azure Blob Storage and Azure Data Lake Storage Gen2. ```{eval-rst} .. autoclass:: adlfs.AzureBlobFileSystem @@ -12,3 +14,27 @@ :members: ``` +## Microsoft OneLake + +`adlfs.OneLakeFileSystem` provides an interface for Microsoft OneLake (part of Microsoft Fabric). + +```{eval-rst} +.. autoclass:: adlfs.OneLakeFileSystem + :show-inheritance: + :members: + +.. autoclass:: adlfs.OneLakeFile + :show-inheritance: + :members: +``` + +## Azure Data Lake Storage Gen1 (Legacy) + +`adlfs.AzureDatalakeFileSystem` provides an interface for Azure Data Lake Storage Gen1 (being retired). + +```{eval-rst} +.. autoclass:: adlfs.AzureDatalakeFileSystem + :show-inheritance: + :members: +``` + diff --git a/docs/index.md b/docs/index.md index 0013d04f..737d46ed 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # adlfs -`adlfs` provides an [`fsspec`][fsspec]-compatible interface to [Azure Blob storage], [Azure Data Lake Storage Gen2], and [Azure Data Lake Storage Gen1]. +`adlfs` provides an [`fsspec`][fsspec]-compatible interface to [Azure Blob storage], [Azure Data Lake Storage Gen2], [Azure Data Lake Storage Gen1], and [Microsoft OneLake]. ## Installation @@ -14,15 +14,48 @@ or conda from the conda-forge channel conda install -c conda-forge adlfs +## Microsoft Storage Ecosystem Overview + +Microsoft provides several data storage solutions for different use cases: + +### Azure Data Lake Storage Gen1 (ADLS Gen1) +- **Status**: Legacy, being retired +- **Type**: Hierarchical file system, POSIX compliant +- **Endpoint**: `https://.azuredatalakestore.net` +- **Use Case**: Legacy big data workloads + +### Azure Data Lake Storage Gen2 (ADLS Gen2) +- **Status**: Current recommended solution +- **Type**: Based on Blob storage with hierarchical namespace +- **Endpoints**: + - Blob Service: `https://.blob.core.windows.net` + - Data Lake Service: `https://.dfs.core.windows.net` +- **Use Case**: Modern data lake and analytics workloads + +### Microsoft OneLake +- **Status**: Newest, part of Microsoft Fabric platform +- **Type**: Unified data lake with Delta Lake format, ACID transactions +- **Endpoint**: `https://onelake.dfs.fabric.microsoft.com` +- **Use Case**: Microsoft Fabric analytics platform + +### OneDrive/SharePoint +- **Note**: For OneDrive, Teams files, and SharePoint document libraries, use [`msgraphfs`](https://github.com/acsone/msgraphfs) instead of `adlfs` + ## `fsspec` protocols `adlfs` registers the following protocols with `fsspec`. -protocol | filesystem --------- | ---------- -`abfs` | `adlfs.AzureBlobFileSystem` -`az` | `adlfs.AzureBlobFileSystem` -`adl` | `adlfs.AzureDatalakeFileSystem` +protocol | filesystem | storage type +-------- | ---------- | ------------ +`abfs` | `adlfs.AzureBlobFileSystem` | Azure Blob Storage / ADLS Gen2 +`abfss` | `adlfs.AzureBlobFileSystem` or `adlfs.OneLakeFileSystem`* | Azure Blob Storage / ADLS Gen2 / OneLake +`az` | `adlfs.AzureBlobFileSystem` | Azure Blob Storage / ADLS Gen2 +`adl` | `adlfs.AzureDatalakeFileSystem` | Azure Data Lake Storage Gen1 +`onelake`| `adlfs.OneLakeFileSystem` | Microsoft OneLake + +*`abfss` URLs are automatically routed to the correct filesystem based on the domain: +- `*.dfs.core.windows.net` → `AzureBlobFileSystem` +- `onelake.dfs.fabric.microsoft.com` → `OneLakeFileSystem` ## Authentication @@ -60,6 +93,29 @@ Additionally, some methods will include the account URL and authentication crede >>> fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING) ``` +### OneLake Authentication + +OneLake requires Azure Active Directory authentication. You can authenticate using: + +```{code-block} python +>>> from adlfs import OneLakeFileSystem +>>> fs = OneLakeFileSystem( +... tenant_id="your-tenant-id", +... client_id="your-client-id", +... client_secret="your-client-secret" +... ) +``` + +Or using environment variables: + +```{code-block} python +>>> import os +>>> os.environ["AZURE_TENANT_ID"] = "your-tenant-id" +>>> os.environ["AZURE_CLIENT_ID"] = "your-client-id" +>>> os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" +>>> fs = OneLakeFileSystem() +``` + ## Usage See the [fsspec documentation] on usage. @@ -76,6 +132,28 @@ to list all the files or directories in the top-level of a storage container, yo ['gbif/occurrence'] ``` +### OneLake Usage + +OneLake paths follow the structure `workspace/lakehouse/path/to/file`. You can use both `onelake://` and `abfss://` protocols: + +```{code-block} python +>>> from adlfs import OneLakeFileSystem +>>> fs = OneLakeFileSystem(tenant_id="...", client_id="...", client_secret="...") + +# List contents of a lakehouse +>>> fs.ls("my_workspace/my_lakehouse") +['my_workspace/my_lakehouse/Files', 'my_workspace/my_lakehouse/Tables'] + +# Using with fsspec +>>> import fsspec +>>> with fsspec.open("onelake://my_workspace/my_lakehouse/Files/data.parquet") as f: +... data = f.read() + +# Using abfss protocol (automatically routes to OneLake) +>>> with fsspec.open("abfss://my_workspace@onelake.dfs.fabric.microsoft.com/my_lakehouse/Files/data.parquet") as f: +... data = f.read() +``` + ```{toctree} :maxdepth: 2 @@ -94,5 +172,6 @@ api.md [Azure Blob storage]: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction [Azure Data Lake Storage Gen2]: https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction [Azure Data Lake Storage Gen1]: https://docs.microsoft.com/en-us/azure/data-lake-store/ +[Microsoft OneLake]: https://docs.microsoft.com/en-us/fabric/onelake/ [`azure.storage.blob`]: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python [fsspec documentation]: https://filesystem-spec.readthedocs.io/en/latest/usage.html