2
2
from collections .abc import Sequence
3
3
from contextlib import suppress
4
4
from pathlib import Path
5
- from typing import ClassVar , Optional
5
+ from typing import ClassVar
6
6
from urllib .parse import urlparse
7
7
8
8
from ragbits .core .audit import trace , traceable
9
+ from ragbits .core .sources .base import Source , get_local_storage_dir
10
+ from ragbits .core .sources .exceptions import SourceConnectionError , SourceNotFoundError
11
+ from ragbits .core .utils .decorators import requires_dependencies
9
12
10
13
with suppress (ImportError ):
11
14
from azure .core .exceptions import ResourceNotFoundError
12
15
from azure .identity import DefaultAzureCredential
13
- from azure .storage .blob import BlobServiceClient
14
-
15
- from ragbits .core .sources .base import Source , get_local_storage_dir
16
- from ragbits .core .sources .exceptions import SourceConnectionError , SourceNotFoundError
17
- from ragbits .core .utils .decorators import requires_dependencies
16
+ from azure .storage .blob import BlobServiceClient , ExponentialRetry
18
17
19
18
20
19
class AzureBlobStorageSource (Source ):
@@ -26,7 +25,6 @@ class AzureBlobStorageSource(Source):
26
25
account_name : str
27
26
container_name : str
28
27
blob_name : str
29
- _blob_service : Optional ["BlobServiceClient" ] = None
30
28
31
29
@property
32
30
def id (self ) -> str :
@@ -35,49 +33,6 @@ def id(self) -> str:
35
33
"""
36
34
return f"azure://{ self .account_name } /{ self .container_name } /{ self .blob_name } "
37
35
38
- @classmethod
39
- @requires_dependencies (["azure.storage.blob" , "azure.identity" ], "azure" )
40
- async def _get_blob_service (cls , account_name : str ) -> "BlobServiceClient" :
41
- """
42
- Returns an authenticated BlobServiceClient instance.
43
-
44
- Priority:
45
- 1. DefaultAzureCredential (if account_name is set and authentication succeeds).
46
- 2. Connection string (if authentication with DefaultAzureCredential fails).
47
-
48
- If neither method works, an error is raised.
49
-
50
- Args:
51
- account_name: The name of the Azure Blob Storage account.
52
-
53
- Returns:
54
- BlobServiceClient: The authenticated Blob Storage client.
55
-
56
- Raises:
57
- ValueError: If the authentication fails.
58
- """
59
- try :
60
- credential = DefaultAzureCredential ()
61
- account_url = f"https://{ account_name } .blob.core.windows.net"
62
- cls ._blob_service = BlobServiceClient (account_url = account_url , credential = credential )
63
- return cls ._blob_service
64
- except Exception as e :
65
- print (f"Warning: Failed to authenticate using DefaultAzureCredential. \n Error: { str (e )} " )
66
-
67
- connection_string = os .getenv ("AZURE_STORAGE_CONNECTION_STRING" )
68
- if connection_string :
69
- try :
70
- cls ._blob_service = BlobServiceClient .from_connection_string (conn_str = connection_string )
71
- return cls ._blob_service
72
- except Exception as e :
73
- raise ValueError ("Failed to authenticate using connection string." ) from e
74
-
75
- # If neither method works, raise an error
76
- raise ValueError (
77
- "No authentication method available. "
78
- "Provide an account_name for identity-based authentication or a connection string."
79
- )
80
-
81
36
@requires_dependencies (["azure.storage.blob" , "azure.core.exceptions" ], "azure" )
82
37
async def fetch (self ) -> Path :
83
38
"""
@@ -95,7 +50,7 @@ async def fetch(self) -> Path:
95
50
path = container_local_dir / self .blob_name
96
51
with trace (account_name = self .account_name , container = self .container_name , blob = self .blob_name ) as outputs :
97
52
try :
98
- blob_service = await self ._get_blob_service (account_name = self .account_name )
53
+ blob_service = self ._get_blob_service (self .account_name )
99
54
blob_client = blob_service .get_blob_client (container = self .container_name , blob = self .blob_name )
100
55
Path (path ).parent .mkdir (parents = True , exist_ok = True )
101
56
stream = blob_client .download_blob ()
@@ -174,12 +129,11 @@ async def list_sources(
174
129
List of source objects.
175
130
176
131
Raises:
177
- ImportError: If the required 'azure-storage-blob' package is not installed
178
132
SourceConnectionError: If there's an error connecting to Azure
179
133
"""
180
134
with trace (account_name = account_name , container = container , blob_name = blob_name ) as outputs :
181
- blob_service = await cls ._get_blob_service (account_name = account_name )
182
135
try :
136
+ blob_service = cls ._get_blob_service (account_name )
183
137
container_client = blob_service .get_container_client (container )
184
138
blobs = container_client .list_blobs (name_starts_with = blob_name )
185
139
outputs .results = [
@@ -189,3 +143,42 @@ async def list_sources(
189
143
return outputs .results
190
144
except Exception as e :
191
145
raise SourceConnectionError () from e
146
+
147
+ @staticmethod
148
+ def _get_blob_service (account_name : str ) -> "BlobServiceClient" :
149
+ """
150
+ Returns an authenticated BlobServiceClient instance.
151
+
152
+ Priority:
153
+ 1. DefaultAzureCredential.
154
+ 2. Connection string.
155
+
156
+ Args:
157
+ account_name: The name of the Azure Blob Storage account.
158
+
159
+ Returns:
160
+ The authenticated Blob Storage client.
161
+ """
162
+ try :
163
+ credential = DefaultAzureCredential ()
164
+ account_url = f"https://{ account_name } .blob.core.windows.net"
165
+ blob_service = BlobServiceClient (
166
+ account_url = account_url ,
167
+ credential = credential ,
168
+ retry_policy = ExponentialRetry (retry_total = 0 ),
169
+ )
170
+ blob_service .get_account_information ()
171
+ return blob_service
172
+ except Exception as first_exc :
173
+ if conn_str := os .getenv ("AZURE_STORAGE_CONNECTION_STRING" , "" ):
174
+ try :
175
+ service = BlobServiceClient .from_connection_string (
176
+ conn_str = conn_str ,
177
+ retry_policy = ExponentialRetry (retry_total = 0 ),
178
+ )
179
+ service .get_account_information ()
180
+ return service
181
+ except Exception as second_error :
182
+ raise second_error from first_exc
183
+
184
+ raise first_exc
0 commit comments