Skip to content

Commit 3830276

Browse files
committed
Native Upload: Handle replacements
1 parent 87c694d commit 3830276

File tree

3 files changed

+111
-10
lines changed

3 files changed

+111
-10
lines changed

dvuploader/dvuploader.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,24 @@ def _check_duplicates(
239239
to_skip.append(file.file_id)
240240

241241
if replace_existing:
242-
table.add_row(
243-
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace"
244-
)
242+
ds_file = self._get_dsfile_by_id(file.file_id, ds_files)
243+
# calculate size and initialize hash function
244+
file.extract_file_name()
245+
if not self._check_size(file, ds_file):
246+
file._unchanged_data = False
247+
else:
248+
# calculate checksum
249+
file.update_checksum_chunked()
250+
file.apply_checksum()
251+
file._unchanged_data = self._check_hashes(file, ds_file)
252+
if file._unchanged_data:
253+
table.add_row(
254+
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace Meta"
255+
)
256+
else:
257+
table.add_row(
258+
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace"
259+
)
245260
else:
246261
table.add_row(
247262
file.file_name, "[bright_cyan]Exists", "[bright_black]Skipping"
@@ -294,6 +309,25 @@ def _get_file_id(
294309
if dspath == fpath:
295310
return ds_file["dataFile"]["id"]
296311

312+
@staticmethod
313+
def _get_dsfile_by_id(
314+
file_id: int,
315+
ds_files: List[Dict],
316+
) -> Optional[Dict]:
317+
"""
318+
Retrieves a dataset file dictionary by its ID.
319+
320+
Args:
321+
file_id (int): The ID of the file to retrieve.
322+
ds_files (List[Dict]): List of dictionary objects representing dataset files.
323+
324+
Returns:
325+
Optional[Dict]: The dataset file dictionary if found, None otherwise.
326+
"""
327+
for ds_file in ds_files:
328+
if ds_file["dataFile"]["id"] == file_id:
329+
return ds_file
330+
297331
@staticmethod
298332
def _check_hashes(file: File, dsFile: Dict):
299333
"""
@@ -321,6 +355,20 @@ def _check_hashes(file: File, dsFile: Dict):
321355
and path == os.path.join(file.directory_label, file.file_name) # type: ignore
322356
)
323357

358+
@staticmethod
359+
def _check_size(file: File, dsFile: Dict) -> bool:
360+
"""
361+
Checks if the file size matches the size of the file in the dataset.
362+
363+
Args:
364+
file (File): The file to check.
365+
dsFile (Dict): The file in the dataset to compare against.
366+
367+
Returns:
368+
bool: True if the sizes match, False otherwise.
369+
"""
370+
return dsFile["dataFile"]["filesize"] == file._size
371+
324372
@staticmethod
325373
def _has_direct_upload(
326374
dataverse_url: str,

dvuploader/file.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class File(BaseModel):
2929
3030
Private Attributes:
3131
_size (int): Size of the file in bytes.
32+
_unchanged_data (bool): Indicates if the file data has not changed since last upload.
3233
3334
Methods:
3435
extract_file_name(): Extracts filename from filepath and initializes file handler.
@@ -57,6 +58,7 @@ class File(BaseModel):
5758
tab_ingest: bool = Field(default=True, alias="tabIngest")
5859

5960
_size: int = PrivateAttr(default=0)
61+
_size_unchanged_data: bool = PrivateAttr(default=False)
6062

6163
def extract_file_name(self):
6264
"""
@@ -110,6 +112,7 @@ def apply_checksum(self):
110112
"""
111113
Calculates and applies the checksum for the file.
112114
Must be called after extract_file_name() has initialized the checksum.
115+
And all data has been fed into the checksum hash function.
113116
114117
Raises:
115118
AssertionError: If checksum is not initialized or hash function is not set.
@@ -119,6 +122,36 @@ def apply_checksum(self):
119122

120123
self.checksum.apply_checksum()
121124

125+
126+
def update_checksum_chunked(self, blocksize=2**20):
127+
"""Updates the checksum with data read from a file-like object in chunks.
128+
129+
Args:
130+
blocksize (int, optional): Size of chunks to read. Defaults to 1MB (2**20)
131+
132+
Raises:
133+
AssertionError: If the hash function has not been initialized
134+
135+
Note:
136+
This method resets the file position to the start after reading.
137+
"""
138+
assert self.handler is not None, "File handler is not initialized."
139+
assert self.checksum is not None, "Checksum is not initialized."
140+
assert self.checksum._hash_fun is not None, "Checksum hash function is not set."
141+
142+
while True:
143+
buf = self.handler.read(blocksize)
144+
145+
if not isinstance(buf, bytes):
146+
buf = buf.encode()
147+
148+
if not buf:
149+
break
150+
self.checksum._hash_fun.update(buf)
151+
152+
self.handler.seek(0)
153+
154+
122155
def __del__(self):
123156
if self.handler is not None:
124157
self.handler.close()

dvuploader/nativeupload.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,30 @@ async def native_upload(
9191
"proxy": proxy,
9292
}
9393

94+
files_new = [file for file in files if not file.to_replace]
95+
files_new_metadata = [file for file in files if file.to_replace and file._unchanged_data]
96+
files_replace = [file for file in files if file.to_replace and not file._unchanged_data]
97+
9498
async with httpx.AsyncClient(**session_params) as session:
9599
with tempfile.TemporaryDirectory() as tmp_dir:
96-
packages = distribute_files(files)
100+
packages = distribute_files(files_new)
97101
packaged_files = _zip_packages(
98102
packages=packages,
99103
tmp_dir=tmp_dir,
100104
progress=progress,
101105
)
102106

107+
replacable_files = [
108+
(
109+
progress.add_task(
110+
file.file_name, # type: ignore
111+
total=file._size,
112+
),
113+
file
114+
)
115+
for file in files_replace
116+
]
117+
103118
tasks = [
104119
_single_native_upload(
105120
session=session,
@@ -108,15 +123,15 @@ async def native_upload(
108123
pbar=pbar, # type: ignore
109124
progress=progress,
110125
)
111-
for pbar, file in packaged_files
126+
for pbar, file in (packaged_files + replacable_files)
112127
]
113128

114129
responses = await asyncio.gather(*tasks)
115130
_validate_upload_responses(responses, files)
116131

117132
await _update_metadata(
118133
session=session,
119-
files=files,
134+
files=files_new + files_new_metadata,
120135
persistent_id=persistent_id,
121136
dataverse_url=dataverse_url,
122137
api_token=api_token,
@@ -167,6 +182,11 @@ def _zip_packages(
167182
for index, package in packages:
168183
if len(package) == 1:
169184
file = package[0]
185+
file.extract_file_name()
186+
pbar = progress.add_task(
187+
file.file_name, # type: ignore
188+
total=file._size,
189+
)
170190
else:
171191
path = zip_files(
172192
files=package,
@@ -178,10 +198,10 @@ def _zip_packages(
178198
file.extract_file_name()
179199
file.mimeType = "application/zip"
180200

181-
pbar = progress.add_task(
182-
file.file_name, # type: ignore
183-
total=file._size,
184-
)
201+
pbar = progress.add_task(
202+
f"Zip package of {len(package)} files", # type: ignore
203+
total=file._size,
204+
)
185205

186206
files.append((pbar, file))
187207

0 commit comments

Comments
 (0)