|
| 1 | +"""Example of asynchronous chunked file upload to SystemLink. |
| 2 | +
|
| 3 | +This example demonstrates uploading a large file in chunks concurrently, |
| 4 | +without loading the entire file into memory at once. Multiple chunks are |
| 5 | +uploaded simultaneously for better performance. |
| 6 | +""" |
| 7 | + |
| 8 | +import asyncio |
| 9 | +import tempfile |
| 10 | +import time |
| 11 | +from functools import partial |
| 12 | +from io import BytesIO |
| 13 | + |
| 14 | +from nisystemlink.clients.core import HttpConfiguration |
| 15 | +from nisystemlink.clients.file import FileClient |
| 16 | + |
| 17 | +# Configure connection to SystemLink server |
| 18 | +server_configuration = HttpConfiguration( |
| 19 | + server_uri="https://yourserver.yourcompany.com", |
| 20 | + api_key="YourAPIKeyGeneratedFromSystemLink", |
| 21 | +) |
| 22 | + |
| 23 | +client = FileClient(configuration=server_configuration) |
| 24 | + |
| 25 | +# Generate example file content (50 MB for demonstration) |
| 26 | +CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB chunks |
| 27 | +FILE_SIZE = 50 * 1024 * 1024 # 50 MB file |
| 28 | +# Generate test file content by repeating a simple message |
| 29 | +test_data = b"This is test data for chunked file upload example.\n" |
| 30 | +file_content = test_data * (FILE_SIZE // len(test_data)) |
| 31 | + |
| 32 | + |
| 33 | +async def upload_chunk_async( |
| 34 | + session_id: str, chunk_index: int, chunk_data: bytes, is_last: bool |
| 35 | +) -> int: |
| 36 | + """Upload a single chunk asynchronously.""" |
| 37 | + # Run the synchronous upload in a thread pool to avoid blocking |
| 38 | + await asyncio.to_thread( |
| 39 | + client.append_to_upload_session, |
| 40 | + session_id=session_id, |
| 41 | + chunk_index=chunk_index, |
| 42 | + chunk=BytesIO(chunk_data), |
| 43 | + close=is_last, |
| 44 | + ) |
| 45 | + return chunk_index |
| 46 | + |
| 47 | + |
| 48 | +# Create a temporary file to mimic file-on-disk behavior |
| 49 | +with tempfile.NamedTemporaryFile(delete=True, suffix=".bin") as temp_file: |
| 50 | + temp_file.write(file_content) |
| 51 | + temp_file_path = temp_file.name |
| 52 | + temp_file.flush() # Ensure data is written to disk |
| 53 | + |
| 54 | + async def main(): |
| 55 | + """Main async function to upload file chunks concurrently.""" |
| 56 | + print(f"Created temporary file: {temp_file_path}") |
| 57 | + print(f"File size: {FILE_SIZE / (1024 * 1024):.1f} MB") |
| 58 | + print(f"Chunk size: {CHUNK_SIZE / (1024 * 1024):.1f} MB\n") |
| 59 | + |
| 60 | + # Start upload session |
| 61 | + session_response = client.start_upload_session(workspace=None) |
| 62 | + session_id = session_response.session_id |
| 63 | + print(f"Started upload session: {session_id}\n") |
| 64 | + |
| 65 | + file_id = None |
| 66 | + start_time = time.time() |
| 67 | + |
| 68 | + try: |
| 69 | + chunks = [] |
| 70 | + temp_file.seek(0) |
| 71 | + |
| 72 | + read_chunk = partial(temp_file.read, CHUNK_SIZE) |
| 73 | + |
| 74 | + chunk_iterator = iter(read_chunk, b"") |
| 75 | + |
| 76 | + chunk_index = 1 |
| 77 | + for chunk_data in chunk_iterator: |
| 78 | + chunks.append((chunk_index, chunk_data)) |
| 79 | + chunk_index += 1 |
| 80 | + |
| 81 | + total_chunks = len(chunks) |
| 82 | + print(f"Uploading {total_chunks} chunks concurrently...\n") |
| 83 | + |
| 84 | + # Upload chunks concurrently |
| 85 | + tasks = [ |
| 86 | + upload_chunk_async(session_id, idx, data, idx == total_chunks) |
| 87 | + for idx, data in chunks |
| 88 | + ] |
| 89 | + |
| 90 | + # Run all upload tasks concurrently |
| 91 | + results = await asyncio.gather(*tasks, return_exceptions=True) |
| 92 | + |
| 93 | + # Check for errors |
| 94 | + for i, result in enumerate(results): |
| 95 | + if isinstance(result, Exception): |
| 96 | + print(f"Chunk {i + 1} failed: {result}") |
| 97 | + raise result |
| 98 | + else: |
| 99 | + print(f"Chunk {result}/{total_chunks} uploaded") |
| 100 | + |
| 101 | + # Finish the upload session |
| 102 | + file_id = client.finish_upload_session( |
| 103 | + session_id=session_id, |
| 104 | + name="async_chunked_upload_example.bin", |
| 105 | + properties={"Type": "Asynchronous Chunked", "FileSize": str(FILE_SIZE)}, |
| 106 | + ) |
| 107 | + |
| 108 | + total_time = time.time() - start_time |
| 109 | + print("\nUpload completed successfully!") |
| 110 | + print(f"File ID: {file_id}") |
| 111 | + print(f"Total time: {total_time:.2f}s") |
| 112 | + |
| 113 | + return file_id |
| 114 | + |
| 115 | + except Exception as e: |
| 116 | + print(f"Error during upload: {e}") |
| 117 | + if not file_id: |
| 118 | + try: |
| 119 | + file_id = client.finish_upload_session( |
| 120 | + session_id=session_id, |
| 121 | + name="incomplete_async_upload.bin", |
| 122 | + properties={"Status": "Failed"}, |
| 123 | + ) |
| 124 | + except Exception as finish_error: |
| 125 | + print(f"Failed to finish session: {finish_error}") |
| 126 | + return file_id |
| 127 | + |
| 128 | + # Run the async main function |
| 129 | + try: |
| 130 | + file_id = asyncio.run(main()) |
| 131 | + |
| 132 | + finally: |
| 133 | + # Clean up: Delete uploaded file |
| 134 | + print("\nCleaning up...") |
| 135 | + if file_id: |
| 136 | + try: |
| 137 | + client.delete_file(id=file_id) |
| 138 | + print(f"Deleted uploaded file: {file_id}") |
| 139 | + except Exception as e: |
| 140 | + print(f"Failed to delete file {file_id}: {e}") |
| 141 | + |
| 142 | + print(f"Temporary file will be automatically deleted: {temp_file_path}") |
| 143 | + print("Cleanup complete") |
0 commit comments