Skip to content

Commit b506460

Browse files
committed
Fix upload and cencellation
1 parent 06270a0 commit b506460

File tree

2 files changed

+74
-52
lines changed

2 files changed

+74
-52
lines changed

src/msgraph_core/models/large_file_upload_session.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,52 @@ def __init__(
1818
is_cancelled: Optional[bool] = False,
1919
next_expected_ranges: Optional[List[str]] = None
2020
):
21-
self.upload_url = upload_url
22-
self.expiration_date_time = expiration_date_time
21+
self._upload_url = upload_url
22+
self._expiration_date_time = expiration_date_time
2323
self.additional_data = additional_data if additional_data is not None else []
2424
self.is_cancelled = is_cancelled
2525
self.next_expected_ranges = next_expected_ranges if next_expected_ranges is not None else []
2626

27+
@property
28+
def upload_url(self):
29+
return self._upload_url
30+
31+
@upload_url.setter
32+
def upload_url(self, value):
33+
self._upload_url = value
34+
35+
@property
36+
def expiration_date_time(self):
37+
return self._expiration_date_time
38+
39+
@expiration_date_time.setter
40+
def expiration_date_time(self, value):
41+
self._expiration_date_time = value
42+
43+
@property
44+
def additional_data(self):
45+
return self._additional_data
46+
47+
@additional_data.setter
48+
def additional_data(self, value):
49+
self._additional_data = value if value is not None else []
50+
51+
@property
52+
def is_cancelled(self):
53+
return self._is_cancelled
54+
55+
@is_cancelled.setter
56+
def is_cancelled(self, value):
57+
self._is_cancelled = value
58+
59+
@property
60+
def next_expected_ranges(self):
61+
return self._next_expected_ranges
62+
63+
@next_expected_ranges.setter
64+
def next_expected_ranges(self, value):
65+
self._next_expected_ranges = value if value is not None else []
66+
2767
@staticmethod
2868
def create_from_discriminator_value(
2969
parse_node: ParseNode

src/msgraph_core/tasks/large_file_upload.py

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from io import BytesIO
33
from asyncio import Future
44
from datetime import datetime, timedelta
5+
import logging
56

67
from kiota_abstractions.serialization.parsable import Parsable
78
from kiota_abstractions.method import Method
@@ -22,8 +23,10 @@ def __init__(
2223
stream: BytesIO, # counter check this
2324
max_chunk_size: int = 1024 # 4 * 1024 * 1024 - use smaller chnuks for testing
2425
):
25-
self._upload_session = upload_session
26-
self._request_adapter = request_adapter
26+
if not isinstance(upload_session, LargeFileUploadSession):
27+
raise TypeError("upload_session must be an instance of LargeFileUploadSession")
28+
self.upload_session = upload_session
29+
self.request_adapter = request_adapter
2730
self.stream = stream
2831
self.file_size = stream.getbuffer().nbytes
2932
self.max_chunk_size = max_chunk_size
@@ -36,15 +39,14 @@ def __init__(
3639

3740
@property
3841
def upload_session(self) -> Parsable:
39-
return self._upload_session
42+
return self.upload_session
4043

4144
@staticmethod
4245
async def create_upload_session(request_adapter: RequestAdapter, request_body, url: str):
4346
request_information = RequestInformation()
4447
base_url = request_adapter.base_url.rstrip('/')
4548
path = url.lstrip('/')
4649
new_url = f"{base_url}/{path}"
47-
print(f"New url {new_url}")
4850
request_information.url = new_url
4951
request_information.http_method = Method.POST
5052
request_information.set_content_from_parsable(
@@ -58,19 +60,18 @@ async def create_upload_session(request_adapter: RequestAdapter, request_body, u
5860

5961
@property
6062
def request_adapter(self) -> RequestAdapter:
61-
return self._request_adapter
63+
return self.request_adapter
6264

6365
@property
6466
def chunks(self) -> int:
6567
return self._chunks
6668

6769
def upload_session_expired(self, upload_session: LargeFileUploadSession = None) -> bool:
6870
now = datetime.now()
69-
upload_session = upload_session or self._upload_session
71+
upload_session = upload_session or self.upload_session
7072
if not hasattr(upload_session, "expiration_date_time"):
7173
raise ValueError("Upload session does not have an expiration date time")
7274
expiry = upload_session.expiration_date_time
73-
print(expiry)
7475
if expiry is None:
7576
raise ValueError("Expiry is None")
7677
if isinstance(expiry, str):
@@ -86,66 +87,53 @@ def upload_session_expired(self, upload_session: LargeFileUploadSession = None)
8687
return True
8788
return False
8889

89-
async def upload(self, after_chunk_upload=None):
90+
async def upload(self, after_chunk_upload: Optional[Callable] = None):
9091
# Rewind at this point to take care of failures.
9192
self.stream.seek(0)
9293
if self.upload_session_expired(self.upload_session):
9394
raise RuntimeError('The upload session is expired.')
9495

95-
self.on_chunk_upload_complete = after_chunk_upload if self.on_chunk_upload_complete is None else self.on_chunk_upload_complete
96+
self.on_chunk_upload_complete = after_chunk_upload or self.on_chunk_upload_complete
9697
session = await self.next_chunk(
9798
self.stream, 0, max(0, min(self.max_chunk_size - 1, self.file_size - 1))
9899
)
99100
process_next = session
100101
# determine the range to be uploaded
101102
# even when resuming existing upload sessions.
102-
range_parts = self.next_range[0].split("-") if self.next_range else ['0', '0']
103+
range_parts = self.next_range[0].split("-") if self.next_range else ['0']
103104
end = min(int(range_parts[0]) + self.max_chunk_size - 1, self.file_size)
104105
uploaded_range = [range_parts[0], end]
105-
print(f"File size {self.file_size}")
106106
while self.chunks > 0:
107107
session = process_next
108-
future = Future()
109-
110-
def success_callback(large_file_uploasd_session):
111-
nonlocal process_next, uploaded_range
112-
113-
if large_file_uploasd_session is None:
114-
return large_file_uploasd_session
115-
116-
next_range = large_file_uploasd_session.next_expected_ranges
108+
try:
109+
lfu_session: Optional[LargeFileUploadSession] = await session
110+
if lfu_session is None:
111+
continue
112+
next_range = lfu_session.next_expected_ranges
117113
old_url = self.get_validated_upload_url(self.upload_session)
118-
large_file_uploasd_session.upload_url = old_url
119-
114+
lfu_session.upload_url = old_url
120115
if self.on_chunk_upload_complete is not None:
121116
self.on_chunk_upload_complete(uploaded_range)
122-
123117
if not next_range:
124-
return large_file_uploasd_session
125-
126-
range_parts = next_range[0].split("-")
118+
continue
119+
range_parts = str(next_range[0]).split("-")
127120
end = min(int(range_parts[0]) + self.max_chunk_size, self.file_size)
128121
uploaded_range = [range_parts[0], end]
129-
self.set_next_range(next_range[0] + "-")
122+
self.next_range = next_range[0] + "-"
130123
process_next = self.next_chunk(self.stream)
131-
132-
return large_file_uploasd_session
133-
134-
def failure_callback(error):
135-
raise error
136-
137-
future.add_done_callback(success_callback)
138-
future.set_exception_callback(failure_callback)
139-
140-
if future is not None:
141-
future.result() # This will block until the future is resolved
142-
143-
self.chunks -= 1
144-
124+
except Exception as error:
125+
logging.error(f"Error uploading chunk {error}")
126+
raise # remove after manual testing
127+
self.chunks -= 1
145128
return session
146129

147-
def set_next_range(self, next_range: Optional[str]) -> None:
148-
self.next_range = next_range
130+
@property
131+
def next_range(self):
132+
return self._next_range
133+
134+
@next_range.setter
135+
def next_range(self, value: Optional[str]) -> None:
136+
self._next_range = value
149137

150138
async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int = 0) -> Future:
151139
upload_url = self.get_validated_upload_url(self.upload_session)
@@ -156,7 +144,7 @@ async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int =
156144
info.url = upload_url
157145
info.http_method = Method.PUT
158146
if not self.next_range:
159-
self.set_next_range(f'{range_start}-{range_end}')
147+
self.next_range = f'{range_start}-{range_end}'
160148
range_parts = self.next_range.split('-') if self.next_range else ['-']
161149
start = int(range_parts[0])
162150
end = int(range_parts[1]) if len(range_parts) > 1 else 0
@@ -184,12 +172,6 @@ async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int =
184172

185173
parsable_factory: LargeFileUploadSession[Any] = self.upload_session
186174

187-
print(f"Body {LargeFileUploadSession.create_from_discriminator_value}")
188-
print(f"Upload session url {self.upload_session.upload_url}")
189-
print(f"Expiraton date {self.upload_session.expiration_date_time}")
190-
print(f"Additional data {self.upload_session.additional_data}")
191-
print(f"Session is canceled {self.upload_session.is_cancelled}")
192-
print(f"Next expected ranges {self.upload_session.next_expected_ranges}")
193175
return await self.request_adapter.send_async(info, parsable_factory, error_map)
194176

195177
def get_file(self) -> BytesIO:

0 commit comments

Comments
 (0)