8
8
import tempfile
9
9
from datetime import datetime , timedelta
10
10
from io import BytesIO
11
+ import os
12
+ import typing
13
+ import pytest
11
14
12
15
import pytest
13
16
from azure .core .exceptions import HttpResponseError , ResourceExistsError , ResourceModifiedError , ResourceNotFoundError
@@ -85,7 +88,13 @@ def _create_source_blob(self, data):
85
88
return blob_client
86
89
87
90
def _get_bearer_token_string (self , resource : str = "https://storage.azure.com/.default" ) -> str :
88
- return "Bearer " + f"{ self .get_credential (BlobServiceClient ).get_token (resource ).token } "
91
+ # In playback mode we don't want to invoke real Azure auth flows. Return a stable fake token
92
+ # so existing recordings (with sanitization) continue to match.
93
+ if not self .is_live :
94
+ return "Bearer FAKE_TOKEN"
95
+ credential = self .get_credential (BlobServiceClient )
96
+ token = credential .get_token (resource )
97
+ return f"Bearer { token .token } "
89
98
90
99
def assertBlobEqual (self , container_name , blob_name , expected_data ):
91
100
blob = self .bsc .get_blob_client (container_name , blob_name )
@@ -131,7 +140,8 @@ def test_upload_from_file_to_blob_with_oauth(self, **kwargs):
131
140
self .get_resource_name ("file" ),
132
141
bearer_token_string ,
133
142
storage_account_name ,
134
- source_data
143
+ source_data ,
144
+ self .is_live
135
145
)
136
146
137
147
# Set up destination blob without data
@@ -156,12 +166,13 @@ def test_upload_from_file_to_blob_with_oauth(self, **kwargs):
156
166
# Assert
157
167
assert destination_blob_data == source_data
158
168
finally :
159
- requests .delete (
160
- url = base_url ,
161
- headers = _build_base_file_share_headers (bearer_token_string , 0 ),
162
- params = {'restype' : 'share' }
163
- )
164
- blob_service_client .delete_container (self .source_container_name )
169
+ if self .is_live :
170
+ requests .delete (
171
+ url = base_url ,
172
+ headers = _build_base_file_share_headers (bearer_token_string , 0 ),
173
+ params = {'restype' : 'share' }
174
+ )
175
+ blob_service_client .delete_container (self .source_container_name )
165
176
166
177
@BlobPreparer ()
167
178
@recorded_by_proxy
@@ -180,14 +191,16 @@ def test_stage_from_file_to_blob_with_oauth(self, **kwargs):
180
191
self .get_resource_name ("file" ),
181
192
bearer_token_string ,
182
193
storage_account_name ,
183
- source_data
194
+ source_data ,
195
+ self .is_live
184
196
)
185
197
186
198
# Set up destination blob without data
187
199
blob_service_client = BlobServiceClient (
188
200
account_url = self .account_url (storage_account_name , "blob" ),
189
201
credential = storage_account_key
190
202
)
203
+
191
204
destination_blob_client = blob_service_client .get_blob_client (
192
205
container = self .source_container_name ,
193
206
blob = self .get_resource_name (TEST_BLOB_PREFIX + "1" )
@@ -209,12 +222,13 @@ def test_stage_from_file_to_blob_with_oauth(self, **kwargs):
209
222
destination_blob_data = destination_blob_client .download_blob ().readall ()
210
223
assert destination_blob_data == source_data
211
224
finally :
212
- requests .delete (
213
- url = base_url ,
214
- headers = _build_base_file_share_headers (bearer_token_string , 0 ),
215
- params = {'restype' : 'share' }
216
- )
217
- blob_service_client .delete_container (self .source_container_name )
225
+ if self .is_live :
226
+ requests .delete (
227
+ url = base_url ,
228
+ headers = _build_base_file_share_headers (bearer_token_string , 0 ),
229
+ params = {'restype' : 'share' }
230
+ )
231
+ blob_service_client .delete_container (self .source_container_name )
218
232
219
233
@BlobPreparer ()
220
234
@recorded_by_proxy
0 commit comments