diff --git a/dvuploader/dvuploader.py b/dvuploader/dvuploader.py index 511a918..20bd022 100644 --- a/dvuploader/dvuploader.py +++ b/dvuploader/dvuploader.py @@ -239,9 +239,22 @@ def _check_duplicates( to_skip.append(file.file_id) if replace_existing: - table.add_row( - file.file_name, "[bright_cyan]Exists", "[bright_black]Replace" - ) + ds_file = self._get_dsfile_by_id(file.file_id, ds_files) + if not self._check_size(file, ds_file): + file._unchanged_data = False + else: + # calculate checksum + file.update_checksum_chunked() + file.apply_checksum() + file._unchanged_data = self._check_hashes(file, ds_file) + if file._unchanged_data: + table.add_row( + file.file_name, "[bright_cyan]Exists", "[bright_black]Replace Meta" + ) + else: + table.add_row( + file.file_name, "[bright_cyan]Exists", "[bright_black]Replace" + ) else: table.add_row( file.file_name, "[bright_cyan]Exists", "[bright_black]Skipping" @@ -294,6 +307,25 @@ def _get_file_id( if dspath == fpath: return ds_file["dataFile"]["id"] + @staticmethod + def _get_dsfile_by_id( + file_id: int, + ds_files: List[Dict], + ) -> Optional[Dict]: + """ + Retrieves a dataset file dictionary by its ID. + + Args: + file_id (int): The ID of the file to retrieve. + ds_files (List[Dict]): List of dictionary objects representing dataset files. + + Returns: + Optional[Dict]: The dataset file dictionary if found, None otherwise. + """ + for ds_file in ds_files: + if ds_file["dataFile"]["id"] == file_id: + return ds_file + @staticmethod def _check_hashes(file: File, dsFile: Dict): """ @@ -321,6 +353,20 @@ def _check_hashes(file: File, dsFile: Dict): and path == os.path.join(file.directory_label, file.file_name) # type: ignore ) + @staticmethod + def _check_size(file: File, dsFile: Dict) -> bool: + """ + Checks if the file size matches the size of the file in the dataset. + + Args: + file (File): The file to check. + dsFile (Dict): The file in the dataset to compare against. + + Returns: + bool: True if the sizes match, False otherwise. + """ + return dsFile["dataFile"]["filesize"] == file._size + @staticmethod def _has_direct_upload( dataverse_url: str, diff --git a/dvuploader/file.py b/dvuploader/file.py index a43edb0..6107ced 100644 --- a/dvuploader/file.py +++ b/dvuploader/file.py @@ -29,6 +29,7 @@ class File(BaseModel): Private Attributes: _size (int): Size of the file in bytes. + _unchanged_data (bool): Indicates if the file data has not changed since last upload. Methods: extract_file_name(): Extracts filename from filepath and initializes file handler. @@ -57,6 +58,7 @@ class File(BaseModel): tab_ingest: bool = Field(default=True, alias="tabIngest") _size: int = PrivateAttr(default=0) + _unchanged_data: bool = PrivateAttr(default=False) def extract_file_name(self): """ @@ -110,6 +112,7 @@ def apply_checksum(self): """ Calculates and applies the checksum for the file. Must be called after extract_file_name() has initialized the checksum. + And all data has been fed into the checksum hash function. Raises: AssertionError: If checksum is not initialized or hash function is not set. @@ -119,6 +122,36 @@ def apply_checksum(self): self.checksum.apply_checksum() + + def update_checksum_chunked(self, blocksize=2**20): + """Updates the checksum with data read from a file-like object in chunks. + + Args: + blocksize (int, optional): Size of chunks to read. Defaults to 1MB (2**20) + + Raises: + AssertionError: If the hash function has not been initialized + + Note: + This method resets the file position to the start after reading. + """ + assert self.handler is not None, "File handler is not initialized." + assert self.checksum is not None, "Checksum is not initialized." + assert self.checksum._hash_fun is not None, "Checksum hash function is not set." + + while True: + buf = self.handler.read(blocksize) + + if not isinstance(buf, bytes): + buf = buf.encode() + + if not buf: + break + self.checksum._hash_fun.update(buf) + + self.handler.seek(0) + + def __del__(self): if self.handler is not None: self.handler.close() diff --git a/dvuploader/nativeupload.py b/dvuploader/nativeupload.py index 7296c86..b1c5a94 100644 --- a/dvuploader/nativeupload.py +++ b/dvuploader/nativeupload.py @@ -91,15 +91,30 @@ async def native_upload( "proxy": proxy, } + files_new = [file for file in files if not file.to_replace] + files_new_metadata = [file for file in files if file.to_replace and file._unchanged_data] + files_replace = [file for file in files if file.to_replace and not file._unchanged_data] + async with httpx.AsyncClient(**session_params) as session: with tempfile.TemporaryDirectory() as tmp_dir: - packages = distribute_files(files) + packages = distribute_files(files_new) packaged_files = _zip_packages( packages=packages, tmp_dir=tmp_dir, progress=progress, ) + replacable_files = [ + ( + progress.add_task( + file.file_name, # type: ignore + total=file._size, + ), + file + ) + for file in files_replace + ] + tasks = [ _single_native_upload( session=session, @@ -108,7 +123,7 @@ async def native_upload( pbar=pbar, # type: ignore progress=progress, ) - for pbar, file in packaged_files + for pbar, file in (packaged_files + replacable_files) ] responses = await asyncio.gather(*tasks) @@ -116,7 +131,7 @@ async def native_upload( await _update_metadata( session=session, - files=files, + files=files_new + files_new_metadata, persistent_id=persistent_id, dataverse_url=dataverse_url, api_token=api_token, @@ -167,6 +182,10 @@ def _zip_packages( for index, package in packages: if len(package) == 1: file = package[0] + pbar = progress.add_task( + file.file_name, # type: ignore + total=file._size, + ) else: path = zip_files( files=package, @@ -178,10 +197,10 @@ def _zip_packages( file.extract_file_name() file.mimeType = "application/zip" - pbar = progress.add_task( - file.file_name, # type: ignore - total=file._size, - ) + pbar = progress.add_task( + f"Zip package of {len(package)} files", # type: ignore + total=file._size, + ) files.append((pbar, file)) @@ -396,7 +415,6 @@ async def _update_single_metadata( json_data = _get_json_data(file) del json_data["forceReplace"] - del json_data["restrict"] # Send metadata as a readable byte stream # This is a workaround since "data" and "json"