|
1 | | -"""Example to upload a large file to SystemLink using chunked upload (upload sessions). |
| 1 | +"""Example comparing synchronous and asynchronous chunked file upload to SystemLink. |
2 | 2 |
|
3 | | -This example demonstrates how to upload a file in chunks using upload sessions. |
4 | | -This is useful for large files that need to be uploaded in multiple parts. |
| 3 | +This example demonstrates: |
| 4 | +1. Synchronous chunk upload (one chunk at a time) |
| 5 | +2. Asynchronous chunk upload (multiple chunks concurrently) |
| 6 | +3. Performance comparison between both approaches |
5 | 7 | """ |
6 | 8 |
|
| 9 | +import asyncio |
7 | 10 | import io |
| 11 | +import os |
| 12 | +import tempfile |
| 13 | +import time |
8 | 14 |
|
9 | 15 | from nisystemlink.clients.core import HttpConfiguration |
10 | 16 | from nisystemlink.clients.file import FileClient |
|
17 | 23 |
|
18 | 24 | client = FileClient(configuration=server_configuration) |
19 | 25 |
|
20 | | -# Generate example file content (20 MB for demonstration) |
| 26 | +# Generate example file content (50 MB for demonstration) |
21 | 27 | CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB chunks |
22 | | -file_content = b"X" * (20 * 1024 * 1024) # 20 MB file |
| 28 | +FILE_SIZE = 50 * 1024 * 1024 # 50 MB file |
| 29 | +# Generate test file content by repeating a simple message |
| 30 | +test_data = b"This is test data for chunked file upload example.\n" |
| 31 | +file_content = test_data * (FILE_SIZE // len(test_data)) |
| 32 | + |
| 33 | +# Create a temporary file to mimic file-on-disk behavior |
| 34 | +with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as temp_file: |
| 35 | + temp_file.write(file_content) |
| 36 | + temp_file_path = temp_file.name |
| 37 | + |
| 38 | + |
| 39 | +def upload_chunk( |
| 40 | + session_id: str, chunk_index: int, chunk_data: bytes, is_last: bool |
| 41 | +) -> int: |
| 42 | + """Upload a single chunk.""" |
| 43 | + chunk_file = io.BytesIO(chunk_data) |
| 44 | + client.append_to_upload_session( |
| 45 | + session_id=session_id, chunk_index=chunk_index, file=chunk_file, close=is_last |
| 46 | + ) |
| 47 | + return chunk_index |
23 | 48 |
|
24 | | -# Step 1: Start an upload session |
25 | | -session_response = client.start_upload_session(workspace=None) |
26 | | -session_id = session_response.session_id |
27 | | -print(f"Started upload session with ID: {session_id}") |
28 | 49 |
|
29 | | -# Step 2: Upload chunks |
30 | | -# Split the file content into chunks and upload them |
31 | | -file_id = None |
32 | | -try: |
33 | | - num_chunks = (len(file_content) + CHUNK_SIZE - 1) // CHUNK_SIZE |
| 50 | +async def upload_chunk_async( |
| 51 | + session_id: str, chunk_index: int, chunk_data: bytes, is_last: bool |
| 52 | +) -> int: |
| 53 | + """Upload a single chunk asynchronously.""" |
| 54 | + # Run the synchronous upload in a thread pool to avoid blocking |
| 55 | + return await asyncio.to_thread( |
| 56 | + upload_chunk, session_id, chunk_index, chunk_data, is_last |
| 57 | + ) |
| 58 | + |
34 | 59 |
|
35 | | - for i in range(num_chunks): |
36 | | - start = i * CHUNK_SIZE |
37 | | - end = min(start + CHUNK_SIZE, len(file_content)) |
38 | | - chunk_data = file_content[start:end] |
| 60 | +def upload_synchronous(): |
| 61 | + """Upload file chunks synchronously (one at a time).""" |
| 62 | + print("\nSynchronous Upload:") |
39 | 63 |
|
40 | | - # Create a file-like object for the chunk |
41 | | - chunk_file = io.BytesIO(chunk_data) |
| 64 | + # Start upload session |
| 65 | + session_response = client.start_upload_session(workspace=None) |
| 66 | + session_id = session_response.session_id |
| 67 | + print(f"Started upload session: {session_id}\n") |
| 68 | + |
| 69 | + file_id = None |
| 70 | + start_time = time.time() |
| 71 | + |
| 72 | + try: |
| 73 | + # Read and upload chunks sequentially using iter() with sentinel |
| 74 | + with open(temp_file_path, "rb") as f: |
| 75 | + chunks = list(enumerate(iter(lambda: f.read(CHUNK_SIZE), b""), start=1)) |
| 76 | + num_chunks = len(chunks) |
42 | 77 |
|
43 | | - # Determine if this is the last chunk |
44 | | - is_last_chunk = i == num_chunks - 1 |
| 78 | + for i, chunk_data in chunks: |
| 79 | + is_last_chunk = i == num_chunks |
45 | 80 |
|
46 | | - # Upload the chunk (chunk_index is 0-based) |
47 | | - client.append_to_upload_session( |
| 81 | + chunk_start = time.time() |
| 82 | + upload_chunk(session_id, i, chunk_data, is_last_chunk) |
| 83 | + chunk_time = time.time() - chunk_start |
| 84 | + |
| 85 | + print(f" Chunk {i}/{num_chunks} uploaded in {chunk_time:.2f}s") |
| 86 | + |
| 87 | + # Finish the upload session |
| 88 | + file_id = client.finish_upload_session( |
48 | 89 | session_id=session_id, |
49 | | - chunk_index=i + 1, |
50 | | - file=chunk_file, |
51 | | - close=is_last_chunk, |
| 90 | + name="sync_upload_example.bin", |
| 91 | + properties={"Type": "Synchronous", "FileSize": str(FILE_SIZE)}, |
52 | 92 | ) |
53 | | - print(f"Uploaded chunk {i + 1}/{num_chunks} ({len(chunk_data)} bytes)") |
54 | 93 |
|
55 | | - # Step 3: Finish the upload session |
56 | | - file_name = "large_file_example.bin" |
57 | | - properties = { |
58 | | - "Description": "Example file uploaded using chunked upload", |
59 | | - "FileSize": str(len(file_content)), |
60 | | - } |
| 94 | + total_time = time.time() - start_time |
| 95 | + print(f"\nUpload completed in {total_time:.2f}s") |
| 96 | + print(f"File ID: {file_id}\n") |
| 97 | + |
| 98 | + return file_id, total_time |
| 99 | + |
| 100 | + except Exception as e: |
| 101 | + print(f"✗ Error: {e}") |
| 102 | + if not file_id: |
| 103 | + file_id = client.finish_upload_session( |
| 104 | + session_id=session_id, |
| 105 | + name="incomplete_sync.bin", |
| 106 | + properties={"Status": "Failed"}, |
| 107 | + ) |
| 108 | + return file_id, None |
61 | 109 |
|
62 | | - file_id = client.finish_upload_session( |
63 | | - session_id=session_id, name=file_name, properties=properties |
64 | | - ) |
65 | 110 |
|
66 | | - print(f"\nSuccessfully uploaded file '{file_name}' with FileID: {file_id}") |
| 111 | +async def upload_asynchronous(): |
| 112 | + """Upload file chunks asynchronously (concurrently).""" |
| 113 | + print("\nAsynchronous Upload (Concurrent):") |
| 114 | + |
| 115 | + # Start upload session |
| 116 | + session_response = client.start_upload_session(workspace=None) |
| 117 | + session_id = session_response.session_id |
| 118 | + print(f"Started upload session: {session_id}\n") |
| 119 | + |
| 120 | + file_id = None |
| 121 | + start_time = time.time() |
67 | 122 |
|
68 | | -except Exception as e: |
69 | | - print(f"Error during chunked upload: {e}") |
70 | | - # Attempt to finish the session to clean up resources on the server |
71 | 123 | try: |
72 | | - print("Attempting to clean up upload session...") |
| 124 | + # Read all chunks using iter() with sentinel for cleaner iteration |
| 125 | + with open(temp_file_path, "rb") as f: |
| 126 | + chunks = list(enumerate(iter(lambda: f.read(CHUNK_SIZE), b""), start=1)) |
| 127 | + num_chunks = len(chunks) |
| 128 | + chunks = [(i, data, i == num_chunks) for i, data in chunks] |
| 129 | + |
| 130 | + # Upload chunks concurrently |
| 131 | + print(f"Uploading {num_chunks} chunks concurrently\n") |
| 132 | + |
| 133 | + tasks = [ |
| 134 | + upload_chunk_async(session_id, idx, data, is_last) |
| 135 | + for idx, data, is_last in chunks |
| 136 | + ] |
| 137 | + |
| 138 | + # Run all upload tasks concurrently |
| 139 | + results = await asyncio.gather(*tasks, return_exceptions=True) |
| 140 | + |
| 141 | + # Check for errors |
| 142 | + for i, result in enumerate(results): |
| 143 | + if isinstance(result, Exception): |
| 144 | + print(f" Chunk {i + 1} failed: {result}") |
| 145 | + raise result |
| 146 | + else: |
| 147 | + print(f" Chunk {result}/{num_chunks} uploaded") |
| 148 | + |
| 149 | + # Finish the upload session |
73 | 150 | file_id = client.finish_upload_session( |
74 | 151 | session_id=session_id, |
75 | | - name=f"incomplete_{file_name}", |
76 | | - properties={"Status": "Incomplete", "Error": str(e)}, |
| 152 | + name="async_upload_example.bin", |
| 153 | + properties={"Type": "Asynchronous", "FileSize": str(FILE_SIZE)}, |
77 | 154 | ) |
78 | | - print(f"Session cleaned up. Partial file saved with FileID: {file_id}") |
79 | | - except Exception as cleanup_error: |
80 | | - print(f"Failed to clean up session: {cleanup_error}") |
81 | | - print(f"Upload session {session_id} may need manual cleanup") |
| 155 | + |
| 156 | + total_time = time.time() - start_time |
| 157 | + print(f"\nUpload completed in {total_time:.2f}s") |
| 158 | + print(f" File ID: {file_id}\n") |
| 159 | + |
| 160 | + return file_id, total_time |
| 161 | + |
| 162 | + except Exception as e: |
| 163 | + print(f"✗ Error: {e}") |
| 164 | + if not file_id: |
| 165 | + file_id = client.finish_upload_session( |
| 166 | + session_id=session_id, |
| 167 | + name="incomplete_async.bin", |
| 168 | + properties={"Status": "Failed"}, |
| 169 | + ) |
| 170 | + return file_id, None |
| 171 | + |
| 172 | + |
| 173 | +# Run both upload methods and compare |
| 174 | +async def main(): |
| 175 | + """Main function to run both upload methods.""" |
| 176 | + # Synchronous upload |
| 177 | + sync_file_id, sync_time = upload_synchronous() |
| 178 | + |
| 179 | + # Asynchronous upload |
| 180 | + async_file_id, async_time = await upload_asynchronous() |
| 181 | + |
| 182 | + # Performance comparison |
| 183 | + if sync_time and async_time: |
| 184 | + print("\nPerformance Comparison:") |
| 185 | + print(f"Synchronous: {sync_time:.2f}s") |
| 186 | + print(f"Asynchronous: {async_time:.2f}s") |
| 187 | + |
| 188 | + return sync_file_id, async_file_id |
| 189 | + |
| 190 | + |
| 191 | +try: |
| 192 | + # Run the async main function |
| 193 | + sync_file_id, async_file_id = asyncio.run(main()) |
82 | 194 |
|
83 | 195 | finally: |
84 | | - # Clean up: Delete the uploaded file (whether complete or incomplete) |
85 | | - if file_id: |
86 | | - try: |
87 | | - client.delete_file(id=file_id) |
88 | | - print(f"Deleted file (FileID: {file_id})") |
89 | | - except Exception as delete_error: |
90 | | - print(f"Failed to delete file: {delete_error}") |
| 196 | + # Clean up: Delete uploaded files |
| 197 | + print("Cleaning up...") |
| 198 | + for file_id in [sync_file_id, async_file_id]: |
| 199 | + if file_id: |
| 200 | + try: |
| 201 | + client.delete_file(id=file_id) |
| 202 | + print(f" Deleted file: {file_id}") |
| 203 | + except Exception as e: |
| 204 | + print(f" Failed to delete {file_id}: {e}") |
| 205 | + |
| 206 | + os.unlink(temp_file_path) |
| 207 | + print(f" Deleted temp file: {temp_file_path}") |
| 208 | + |
| 209 | + print("\nCleanup complete") |
0 commit comments