1- from typing import Callable , Optional , List , Tuple , Any
1+ from typing import Callable , Optional , List , Tuple , Any , Dict
22from io import BytesIO
33from datetime import datetime
44from asyncio import Future
5+ from datetime import datetime
6+ from dateutil .relativedelta import relativedelta
57
68from kiota_abstractions .serialization .parsable import Parsable
79from kiota_abstractions .method import Method
8- from kiota_abstractions .request_adapter import RequestAdapter
10+ from kiota_abstractions .headers_collection import HeadersCollection
11+ from kiota_http .httpx_request_adapter import HttpxRequestAdapter as RequestAdapter
912from kiota_abstractions .request_information import RequestInformation
1013from kiota_abstractions .serialization .additional_data_holder import AdditionalDataHolder
1114
12- from msgraph_core .models import LargeFileUploadCreateSession , LargeFileUploadSession # check imports
15+ from msgraph_core .models import LargeFileUploadCreateSessionBody , LargeFileUploadSession # check imports
1316
1417
1518class LargeFileUploadTask :
@@ -19,7 +22,7 @@ def __init__(
1922 upload_session : Parsable ,
2023 request_adapter : RequestAdapter ,
2124 stream : BytesIO , # counter check this
22- max_chunk_size : int = 4 * 1024 * 1024
25+ max_chunk_size : int = 1024 # 4 * 1024 * 1024 - use smaller chnuks for testing
2326 ):
2427 self .upload_session = upload_session
2528 self .request_adapter = request_adapter
@@ -36,100 +39,110 @@ def __init__(
3639 def get_upload_session (self ) -> Parsable :
3740 return self .upload_session
3841
39- def create_upload_session (
40- self ,
41- request_body : LargeFileUploadSession ,
42- model : LargeFileUploadCreateSession ,
43- ) -> Future :
44- request_info = RequestInformation ()
45- request_info .url = request_info . url # check the URL
46- request_info .http_method = Method .POST
47- request_info .set_content_from_parsable (
48- self . request_adapter , 'application/json' , request_body
42+ @ staticmethod
43+ async def create_upload_session ( request_adapter : RequestAdapter , request_body , url : str ):
44+ request_information = RequestInformation ()
45+ base_url = request_adapter . base_url . rstrip ( '/' )
46+ path = url . lstrip ( '/' )
47+ new_url = f" { base_url } / { path } "
48+ request_information .url = new_url
49+ request_information .http_method = Method .POST
50+ request_information .set_content_from_parsable (
51+ request_adapter , 'application/json' , request_body
4952 )
50- request_info . set_stream_content ( model )
53+ error_map : Dict [ str , int ] = {}
5154
52- return self .request_adapter .send_async (request_info , LargeFileUploadSession , {})
55+ return await request_adapter .send_async (
56+ request_information , LargeFileUploadSession .create_from_discriminator_value , error_map
57+ )
5358
5459 def get_adapter (self ) -> RequestAdapter :
5560 return self .request_adapter
5661
5762 def get_chunks (self ) -> int :
5863 return self .chunks
5964
60- def upload_session_expired (self , upload_session : Optional [ Parsable ] = None ) -> bool :
65+ def upload_session_expired (self , upload_session = None ):
6166 now = datetime .now ()
6267
63- validated_value = self .check_value_exists (
64- upload_session or self .upload_session , 'expiration_date_time' ,
65- ['ExpirationDateTime' , 'expirationDateTime' ]
66- )
67- if not validated_value [0 ]:
68+ if upload_session is None :
69+ upload_session = self .upload_session
70+
71+ if not hasattr (upload_session , 'expiration_date_time' ):
6872 raise Exception ('The upload session does not contain an expiry datetime.' )
6973
70- expiry = validated_value [1 ]
74+ expiry = upload_session .expiration_date_time
75+ print (f"Expiry { expiry } " )
7176
7277 if expiry is None :
7378 raise ValueError ('The upload session does not contain a valid expiry date.' )
79+
7480 if isinstance (expiry , str ):
7581 then = datetime .strptime (expiry , "%Y-%m-%dT%H:%M:%S" )
7682 else :
7783 then = expiry
78- interval = (now - then ).total_seconds ()
79- if interval < 0 :
84+ interval = relativedelta (now , then )
85+
86+ if interval .days < 0 or (interval .days == 0 and interval .seconds < 0 ):
8087 return True
8188 return False
8289
83- async def upload (self , after_chunk_upload : Optional [ Callable ] = None ) -> Future :
84- # Rewind to take care of failures.
90+ async def upload (self , after_chunk_upload = None ):
91+ # Rewind at this point to take care of failures.
8592 self .stream .seek (0 )
8693 if self .upload_session_expired (self .upload_session ):
8794 raise RuntimeError ('The upload session is expired.' )
8895
89- self .on_chunk_upload_complete = after_chunk_upload if after_chunk_upload is not None else self .on_chunk_upload_complete
90- session = self .next_chunk (
96+ self .on_chunk_upload_complete = after_chunk_upload if self . on_chunk_upload_complete is None else self .on_chunk_upload_complete
97+ session = await self .next_chunk (
9198 self .stream , 0 , max (0 , min (self .max_chunk_size - 1 , self .file_size - 1 ))
9299 )
93100 process_next = session
94- # includes when resuming existing upload sessions.
101+ # determine the range to be uploaded
102+ # even when resuming existing upload sessions.
95103 range_parts = self .next_range [0 ].split ("-" ) if self .next_range else ['0' , '0' ]
96104 end = min (int (range_parts [0 ]) + self .max_chunk_size - 1 , self .file_size )
97105 uploaded_range = [range_parts [0 ], end ]
98106 while self .chunks > 0 :
99107 session = process_next
100- process_next = session .then (
101- lambda upload_session : self .process_chunk (upload_session , uploaded_range ),
102- lambda error : self .handle_error (error )
103- )
104- if process_next is not None :
105- await process_next
106- self .chunks -= 1
107- return session
108+ future = Future ()
108109
109- def process_chunk (self , upload_session , uploaded_range ):
110- if upload_session is None :
111- return None
110+ def success_callback (large_file_uploasd_session ):
111+ nonlocal process_next , uploaded_range
112+
113+ if large_file_uploasd_session is None :
114+ return large_file_uploasd_session
115+
116+ next_range = large_file_uploasd_session .next_expected_ranges
117+ old_url = self .get_validated_upload_url (self .upload_session )
118+ large_file_uploasd_session .upload_url = old_url
119+
120+ if self .on_chunk_upload_complete is not None :
121+ self .on_chunk_upload_complete (uploaded_range )
112122
113- next_range = upload_session .get_next_expected_ranges ()
114- if not next_range :
115- return upload_session
123+ if not next_range :
124+ return large_file_uploasd_session
116125
117- old_url = self .get_validated_upload_url (self .upload_session )
118- upload_session .set_upload_url (old_url )
126+ range_parts = next_range [0 ].split ("-" )
127+ end = min (int (range_parts [0 ]) + self .max_chunk_size , self .file_size )
128+ uploaded_range = [range_parts [0 ], end ]
129+ self .set_next_range (next_range [0 ] + "-" )
130+ process_next = self .next_chunk (self .stream )
119131
120- if self .on_chunk_upload_complete is not None :
121- self .on_chunk_upload_complete (uploaded_range )
132+ return large_file_uploasd_session
122133
123- range_parts = next_range [0 ].split ("-" )
124- end = min (int (range_parts [0 ]) + self .max_chunk_size , self .file_size )
125- self .set_next_range (f"{ range_parts [0 ]} -{ end } " )
134+ def failure_callback (error ):
135+ raise error
126136
127- self .next_chunk (self .stream )
137+ future .add_done_callback (success_callback )
138+ future .set_exception_callback (failure_callback )
128139
129- return upload_session
140+ if future is not None :
141+ future .result () # This will block until the future is resolved
130142
131- def handle_error (self , error ):
132- raise error
143+ self .chunks -= 1
144+
145+ return session
133146
134147 def set_next_range (self , next_range : Optional [str ]) -> None :
135148 self .next_range = next_range
@@ -160,17 +173,24 @@ async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int =
160173 file .seek (start )
161174 end = min (end , self .max_chunk_size + start )
162175 chunk_data = file .read (end - start + 1 )
176+ print (f"Chunk data { chunk_data } " )
177+ info .headers = HeadersCollection ()
163178
164- info .headers = {
165- ** info .request_headers (), 'Content-Range' : f'bytes { start } -{ end } /{ self .file_size } '
166- }
179+ info .headers .try_add ('Content-Range' , f'bytes { start } -{ end } /{ self .file_size } ' )
180+ # info.headers.try_add(**info.request_headers) what do we do if headers need to be passed
181+ info .headers .try_add ('Content-Length' , str (len (chunk_data )))
182+ info .set_stream_content (BytesIO (chunk_data ))
183+ error_map : Dict [str , int ] = {}
167184
168- info . headers = { ** info . request_headers (), 'Content-Length' : str ( len ( chunk_data ))}
185+ parsable_factory : LargeFileUploadSession [ Any ] = self . upload_session
169186
170- info .set_stream_content (BytesIO (chunk_data ))
171- return await self .request_adapter .send_async (
172- info , LargeFileUploadSession .create_from_discriminator_value
173- )
187+ print (f"Body { LargeFileUploadSession .create_from_discriminator_value } " )
188+ print (f"Upload session url { self .upload_session .upload_url } " )
189+ print (f"Expiraton date { self .upload_session .expiration_date_time } " )
190+ print (f"Additional data { self .upload_session .additional_data } " )
191+ print (f"Session is canceled { self .upload_session .is_cancelled } " )
192+ print (f"Next expected ranges { self .upload_session .next_expected_ranges } " )
193+ return await self .request_adapter .send_async (info , parsable_factory , error_map )
174194
175195 def get_file (self ) -> BytesIO :
176196 return self .stream
@@ -187,7 +207,7 @@ async def cancel(self) -> Optional[Future]:
187207 'additional_data' ) and hasattr (self .upload_session , 'additional_data' ):
188208 current = self .upload_session .additional_data
189209 new = {** current , 'is_cancelled' : True }
190- self .upload_session .set_additional_data ( new )
210+ self .upload_session .additional_data = new
191211
192212 return self .upload_session
193213
@@ -241,7 +261,7 @@ async def resume(self) -> Future:
241261 def get_validated_upload_url (self , upload_session : Parsable ) -> str :
242262 if not hasattr (upload_session , 'upload_url' ):
243263 raise RuntimeError ('The upload session does not contain a valid upload url' )
244- result = upload_session .get_upload_url ()
264+ result = upload_session .upload_url
245265
246266 if result is None or result .strip () == '' :
247267 raise RuntimeError ('The upload URL cannot be empty.' )
0 commit comments