Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions dvuploader/dvuploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,22 @@ def _check_duplicates(
to_skip.append(file.file_id)

if replace_existing:
table.add_row(
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace"
)
ds_file = self._get_dsfile_by_id(file.file_id, ds_files)
if not self._check_size(file, ds_file):
file._unchanged_data = False
else:
# calculate checksum
file.update_checksum_chunked()
file.apply_checksum()
file._unchanged_data = self._check_hashes(file, ds_file)
if file._unchanged_data:
table.add_row(
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace Meta"
)
else:
table.add_row(
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace"
)
else:
table.add_row(
file.file_name, "[bright_cyan]Exists", "[bright_black]Skipping"
Expand Down Expand Up @@ -294,6 +307,25 @@ def _get_file_id(
if dspath == fpath:
return ds_file["dataFile"]["id"]

@staticmethod
def _get_dsfile_by_id(
file_id: int,
ds_files: List[Dict],
) -> Optional[Dict]:
"""
Retrieves a dataset file dictionary by its ID.

Args:
file_id (int): The ID of the file to retrieve.
ds_files (List[Dict]): List of dictionary objects representing dataset files.

Returns:
Optional[Dict]: The dataset file dictionary if found, None otherwise.
"""
for ds_file in ds_files:
if ds_file["dataFile"]["id"] == file_id:
return ds_file

@staticmethod
def _check_hashes(file: File, dsFile: Dict):
"""
Expand Down Expand Up @@ -321,6 +353,20 @@ def _check_hashes(file: File, dsFile: Dict):
and path == os.path.join(file.directory_label, file.file_name) # type: ignore
)

@staticmethod
def _check_size(file: File, dsFile: Dict) -> bool:
"""
Checks if the file size matches the size of the file in the dataset.

Args:
file (File): The file to check.
dsFile (Dict): The file in the dataset to compare against.

Returns:
bool: True if the sizes match, False otherwise.
"""
return dsFile["dataFile"]["filesize"] == file._size

@staticmethod
def _has_direct_upload(
dataverse_url: str,
Expand Down
33 changes: 33 additions & 0 deletions dvuploader/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class File(BaseModel):

Private Attributes:
_size (int): Size of the file in bytes.
_unchanged_data (bool): Indicates if the file data has not changed since last upload.

Methods:
extract_file_name(): Extracts filename from filepath and initializes file handler.
Expand Down Expand Up @@ -57,6 +58,7 @@ class File(BaseModel):
tab_ingest: bool = Field(default=True, alias="tabIngest")

_size: int = PrivateAttr(default=0)
_unchanged_data: bool = PrivateAttr(default=False)

def extract_file_name(self):
"""
Expand Down Expand Up @@ -110,6 +112,7 @@ def apply_checksum(self):
"""
Calculates and applies the checksum for the file.
Must be called after extract_file_name() has initialized the checksum.
And all data has been fed into the checksum hash function.

Raises:
AssertionError: If checksum is not initialized or hash function is not set.
Expand All @@ -119,6 +122,36 @@ def apply_checksum(self):

self.checksum.apply_checksum()


def update_checksum_chunked(self, blocksize=2**20):
"""Updates the checksum with data read from a file-like object in chunks.

Args:
blocksize (int, optional): Size of chunks to read. Defaults to 1MB (2**20)

Raises:
AssertionError: If the hash function has not been initialized

Note:
This method resets the file position to the start after reading.
"""
assert self.handler is not None, "File handler is not initialized."
assert self.checksum is not None, "Checksum is not initialized."
assert self.checksum._hash_fun is not None, "Checksum hash function is not set."

while True:
buf = self.handler.read(blocksize)

if not isinstance(buf, bytes):
buf = buf.encode()

if not buf:
break
self.checksum._hash_fun.update(buf)

self.handler.seek(0)


def __del__(self):
if self.handler is not None:
self.handler.close()
34 changes: 26 additions & 8 deletions dvuploader/nativeupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,30 @@ async def native_upload(
"proxy": proxy,
}

files_new = [file for file in files if not file.to_replace]
files_new_metadata = [file for file in files if file.to_replace and file._unchanged_data]
files_replace = [file for file in files if file.to_replace and not file._unchanged_data]

async with httpx.AsyncClient(**session_params) as session:
with tempfile.TemporaryDirectory() as tmp_dir:
packages = distribute_files(files)
packages = distribute_files(files_new)
packaged_files = _zip_packages(
packages=packages,
tmp_dir=tmp_dir,
progress=progress,
)

replacable_files = [
(
progress.add_task(
file.file_name, # type: ignore
total=file._size,
),
file
)
for file in files_replace
]

tasks = [
_single_native_upload(
session=session,
Expand All @@ -108,15 +123,15 @@ async def native_upload(
pbar=pbar, # type: ignore
progress=progress,
)
for pbar, file in packaged_files
for pbar, file in (packaged_files + replacable_files)
]

responses = await asyncio.gather(*tasks)
_validate_upload_responses(responses, files)

await _update_metadata(
session=session,
files=files,
files=files_new + files_new_metadata,
persistent_id=persistent_id,
dataverse_url=dataverse_url,
api_token=api_token,
Expand Down Expand Up @@ -167,6 +182,10 @@ def _zip_packages(
for index, package in packages:
if len(package) == 1:
file = package[0]
pbar = progress.add_task(
file.file_name, # type: ignore
total=file._size,
)
else:
path = zip_files(
files=package,
Expand All @@ -178,10 +197,10 @@ def _zip_packages(
file.extract_file_name()
file.mimeType = "application/zip"

pbar = progress.add_task(
file.file_name, # type: ignore
total=file._size,
)
pbar = progress.add_task(
f"Zip package of {len(package)} files", # type: ignore
total=file._size,
)

files.append((pbar, file))

Expand Down Expand Up @@ -396,7 +415,6 @@ async def _update_single_metadata(
json_data = _get_json_data(file)

del json_data["forceReplace"]
del json_data["restrict"]

# Send metadata as a readable byte stream
# This is a workaround since "data" and "json"
Expand Down
Loading