Skip to content

Commit c735594

Browse files
committed
Enhance S3 RobustCopy Script
1 parent b26c1f1 commit c735594

File tree

1 file changed

+83
-134
lines changed

1 file changed

+83
-134
lines changed

scripts/s3RobustCopy/main.py

Lines changed: 83 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
#!/usr/bin/env -S uv run --script
2+
# /// script
3+
# requires-python = ">=3.11"
4+
# dependencies = [
5+
# "boto3==1.24.38",
6+
# "typer==0.6.1",
7+
# "hurry.filesize==0.9",
8+
# "pandas==1.4.3",
9+
# "numpy==1.26.4",
10+
# "retry==0.9.2",
11+
# "pycryptodome==3.19.1",
12+
# "pip-tools",
13+
# "debugpy==1.8.16"
14+
# ]
15+
# ///
16+
#
117
# pylint: disable=invalid-name,consider-using-in,consider-using-with,expression-not-assigned
218
import atexit
319
import os
@@ -18,12 +34,21 @@
1834
# 2.)
1935
# Imports
2036
import boto3
21-
import pandas as pd
2237
import typer
23-
from Crypto.Cipher import AES
2438
from hurry.filesize import size
2539
from retry import retry
2640

41+
if os.getenv("DEBUG"):
42+
import debugpy
43+
44+
debugpy.listen(("0.0.0.0", int(os.getenv("DEBUG_PORT", "5678"))))
45+
print(
46+
f"Waiting for debugger to attach on port {os.getenv('DEBUG_PORT', '5678')}..."
47+
)
48+
debugpy.wait_for_client()
49+
print("Debugger attached.")
50+
51+
2752
# Ignore certificate warnings in output
2853
warnings.filterwarnings(
2954
"ignore",
@@ -93,9 +118,6 @@ def copyOrDownloadFile(
93118
dest_s3=None,
94119
dest_bucket=None,
95120
no_overwrite=False,
96-
encrypt=False,
97-
decrypt=False,
98-
cypherKey=None,
99121
): # pylint: disable=too-many-arguments,too-many-branches,too-many-statements
100122
if dest_s3 is None:
101123
useDestination_ = False
@@ -143,31 +165,11 @@ def copyOrDownloadFile(
143165
sourcebucketname,
144166
key,
145167
):
146-
# print("Skipping ",str(key))
168+
print("Skipping ", str(key), "as i was identical")
147169
listOfSkipped.add(key)
148170
return 0
149171
src_bucket.download_file(key, downloadFilename)
150-
if cypherKey is not None:
151-
assert not (encrypt and decrypt)
152-
if encrypt:
153-
file_in = open(downloadFilename, "rb").read()
154-
cipher = AES.new(cypherKey, AES.MODE_EAX)
155-
ciphertext, tag = cipher.encrypt_and_digest(file_in)
156-
#
157-
file_out = open(downloadFilename, "wb")
158-
[
159-
file_out.write(x) for x in (cipher.nonce, tag, ciphertext)
160-
] # pylint: disable=expression-not-assigned
161-
file_out.close()
162-
elif decrypt:
163-
file_in = open(downloadFilename, "rb")
164-
nonce, tag, ciphertext = (file_in.read(x) for x in (16, 16, -1))
165-
cipher = AES.new(cypherKey, AES.MODE_EAX, nonce)
166-
binaryDataDecrypted = cipher.decrypt_and_verify(ciphertext, tag)
167-
file_out = open(downloadFilename, "wb")
168-
file_out.write(binaryDataDecrypted)
169-
file_out.close()
170-
# print("Copying: ", str(key))
172+
171173
#
172174
# If we copy to remote bucket
173175
if downloadfolderlocal == "." and useDestination_:
@@ -186,7 +188,9 @@ def copyOrDownloadFile(
186188
print("Failed for ", str(key))
187189
return 1
188190
else:
189-
# print("Skipping ",str(key))
191+
print(
192+
"Skipping ", str(key), " as it was already present on the local filesystem."
193+
)
190194
listOfSkipped_.add(key)
191195
return 0
192196

@@ -209,34 +213,18 @@ def main(
209213
sourcebucketaccess: str,
210214
sourcebucketsecret: str,
211215
sourceendpointurl: str,
212-
downloadfolderlocal: str = "",
213-
destinationbucketregion: str = "us-east-1",
214-
sourcebucketregion: str = "us-east-1",
215-
destinationbucketname: str = "",
216-
destinationbucketaccess: str = "",
217-
destinationbucketsecret: str = "",
218-
destinationendpointurl: str = "",
219-
files: str = "",
220-
filemetadatacsv: str = "",
221-
projectscsv: str = "",
222-
userscsv: str = "",
216+
downloadfolderlocal: str,
217+
destinationbucketregion: str,
218+
sourcebucketregion: str,
219+
destinationbucketname: str,
220+
destinationbucketaccess: str,
221+
destinationbucketsecret: str,
222+
destinationendpointurl: str,
223+
files: str,
223224
nooverwrites: bool = typer.Option(False, "--nooverwrites"),
224-
encryption: bool = typer.Option(False, "--encryption"),
225-
decryption: bool = typer.Option(False, "--decryption"),
226-
password: str = "",
227225
): # pylint: disable=too-many-arguments,too-many-branches,too-many-statements
228226
if nooverwrites:
229227
print("CONFIG: WILL NOT OVERWRITE ANY FILES.")
230-
if (encryption or decryption) and password != "":
231-
cypherKey = generateCypherKeyFromPassword(password, "")
232-
if encryption:
233-
print("ENCRYPTING FILES DURING COPY")
234-
elif decryption:
235-
print("DECRYPTING FILES DURING COPY")
236-
else:
237-
cypherKey = None
238-
239-
#
240228
#
241229
# Check if we download or copy.
242230
# If useDestination is True, we copy, else we download
@@ -258,29 +246,25 @@ def main(
258246
destinationbucketname = sourcebucketname
259247

260248
# Prepare file logging
261-
if files == "":
249+
def saveFiles():
250+
print("Program done. Writing results to file...")
251+
with open("filesCopied.txt", "w") as f:
252+
for item in list(listOfSuccess):
253+
f.write("%s\n" % item)
254+
with open("filesFailed.txt", "w") as f:
255+
for item in list(listOfFailed):
256+
f.write("%s\n" % item)
257+
with open("filesFailedWithException.txt", "w") as f:
258+
for item in list(listOfFailedWithExceptions):
259+
f.write(str(item[0]) + ": " + str(item[1]) + "\n")
260+
with open("filesSkipped.txt", "w") as f:
261+
for item in list(listOfSkipped):
262+
f.write("%s\n" % item)
263+
with open("filesOriginal.txt", "w") as f:
264+
for item in list(listOfFailed) + list(listOfSkipped) + list(listOfSuccess):
265+
f.write("%s\n" % item)
262266

263-
def saveFiles():
264-
print("Program done. Writing results to file...")
265-
with open("filesCopied.txt", "w") as f:
266-
for item in list(listOfSuccess):
267-
f.write("%s\n" % item)
268-
with open("filesFailed.txt", "w") as f:
269-
for item in list(listOfFailed):
270-
f.write("%s\n" % item)
271-
with open("filesFailedWithException.txt", "w") as f:
272-
for item in list(listOfFailedWithExceptions):
273-
f.write(str(item[0]) + ": " + str(item[1]) + "\n")
274-
with open("filesSkipped.txt", "w") as f:
275-
for item in list(listOfSkipped):
276-
f.write("%s\n" % item)
277-
with open("filesOriginal.txt", "w") as f:
278-
for item in (
279-
list(listOfFailed) + list(listOfSkipped) + list(listOfSuccess)
280-
):
281-
f.write("%s\n" % item)
282-
283-
atexit.register(saveFiles)
267+
atexit.register(saveFiles)
284268

285269
# Configure source bucket
286270
# via
@@ -322,8 +306,10 @@ def saveFiles():
322306
print("Destination Local: ", downloadfolderlocal)
323307
if os.path.isdir(downloadfolderlocal):
324308
if os.listdir(downloadfolderlocal):
325-
print("Target Backup Directory is not empty!")
326-
answer = input("Do you want to overwrite? [y/N]")
309+
print("Local Backup Directory is not empty!")
310+
answer = input(
311+
"Do you want to risk overwriting local files during the copying? [y/N]"
312+
)
327313
if answer != "y" and answer != "Y":
328314
sys.exit(0)
329315
else:
@@ -340,7 +326,10 @@ def saveFiles():
340326

341327
print("Starting...")
342328
startTime = time.time()
329+
allObjectsDestinationBucket = None
343330
if files == "": # Consider all files in source bucket
331+
print("Fetching all objects from source bucket.")
332+
print("This might take some time...")
344333
allObjectsSourceBucket = src_bucket.objects.all()
345334
if (
346335
downloadfolderlocal == "" or downloadfolderlocal == "."
@@ -388,9 +377,6 @@ def saveFiles():
388377
dest_s3=dest_s3,
389378
dest_bucket=dest_bucket,
390379
no_overwrite=nooverwrites,
391-
cypherKey=cypherKey,
392-
encrypt=encryption,
393-
decrypt=decryption,
394380
)
395381
else: # We download to local folder
396382
print("Processing filenames...")
@@ -415,13 +401,16 @@ def saveFiles():
415401
src_s3,
416402
destinationbucketname,
417403
no_overwrite=nooverwrites,
418-
cypherKey=cypherKey,
419-
encrypt=encryption,
420-
decrypt=decryption,
421404
)
422405
else: # We only consider certain files as specified in an inputfile, given by cmd option --files. Files already present in the destination are skipped without checks.
423406
with open(files, "r+") as inFiles:
424407
linesAreRead = inFiles.readlines()
408+
print(f"Only copying specified files in file {files}.")
409+
print(f"File {files} has {len(linesAreRead)} lines.")
410+
if nooverwrites and not allObjectsDestinationBucket:
411+
print("Processing source bucket keys...")
412+
print("This might take a while...")
413+
allObjectsDestinationBucket = dest_bucket.objects.all()
425414
with typer.progressbar(linesAreRead) as progress:
426415
for oneFile in progress:
427416
oneFile = oneFile.strip("\n")
@@ -441,12 +430,10 @@ def saveFiles():
441430
sourcebucketname,
442431
src_s3,
443432
destinationbucketname,
433+
allObjectsDestinationBucket=allObjectsDestinationBucket,
444434
dest_s3=dest_s3,
445435
dest_bucket=dest_bucket,
446436
no_overwrite=nooverwrites,
447-
cypherKey=cypherKey,
448-
encrypt=encryption,
449-
decrypt=decryption,
450437
)
451438
else:
452439
copyOrDownloadFile(
@@ -459,58 +446,20 @@ def saveFiles():
459446
sourcebucketname,
460447
src_s3,
461448
no_overwrite=nooverwrites,
462-
cypherKey=cypherKey,
463-
encrypt=encryption,
464-
decrypt=decryption,
465449
)
466-
#
467-
# If provided, read oSparc postgrs export csv files to resolve
468-
# user%project associated with a corrupt file
469-
if filemetadatacsv != "" and projectscsv != "":
470-
print("Trying to match corrupted files with their DB owner and projects.")
471-
filemetadata = pd.read_csv(filemetadatacsv)
472-
projectsTable = pd.read_csv(projectscsv)
473-
if userscsv != "":
474-
userstable = pd.read_csv(userscsv)
475-
elif filemetadatacsv != "" or projectscsv != "":
476-
print(
477-
"Missing one argument for matching corrupted files with their DB owner, either filemetadatacsv or projectscsv."
478-
)
479-
# After copy/DL: Print some info about the failed files
480-
print("####################")
481-
print("Failed files with exception:")
482-
for item in listOfFailedWithExceptions:
483-
print(str(item[0]) + ": " + str(item[1]))
484-
targetObj = src_s3.Object(sourcebucketname, str(item[0]))
485-
targetObj.load()
486-
print("Filesize: ", str(size(targetObj.content_length)))
487-
print("Last modified: ", str(targetObj.last_modified))
488-
# If we have the oSparc Postgres files, we can provide more details
489-
# about the files which we couldnt download
490-
if filemetadatacsv != "" and projectscsv != "":
491-
selection = filemetadata.loc[filemetadata["file_uuid"].isin([str(item[0])])]
492-
currentItem = selection.iloc[0]
493-
userID = currentItem.user_id
494-
userName = currentItem.user_name
495-
if str(userName) == "nan" and userscsv != "":
496-
selectionUser = userstable.loc[userstable["id"].isin([userID])]
497-
if len(selectionUser) == 1:
498-
userName = str(selectionUser.iloc[0].email)
499-
print("User ID: ", str(userID), " User: ", str(userName))
500-
projectID = currentItem.project_id # 1af7c5cc-d827-11eb-9a92-02420a004c15
501-
project_name = currentItem.project_name
502-
print("Project ID: ", str(projectID), " ProjectName: ", str(project_name))
503-
selectionProj = projectsTable.loc[projectsTable["uuid"].isin([projectID])]
504-
if str(project_name) == "nan" and len(selectionProj) == 0:
505-
print("PROJECT NOT FOUND in projects PG table.")
506-
elif str(project_name) == "nan":
507-
project_name = (
508-
str(selectionProj["name"]).split("\n")[0].split(" ")[1]
509-
)
510450

511-
print("--------")
451+
# After copy/DL: Print some info about the failed files
452+
if len(listOfFailedWithExceptions) > 0:
453+
print("####################")
454+
print("Failed files with exception:")
455+
for item in listOfFailedWithExceptions:
456+
print(str(item[0]) + ": " + str(item[1]))
457+
targetObj = src_s3.Object(sourcebucketname, str(item[0]))
458+
targetObj.load()
459+
print("Filesize: ", str(size(targetObj.content_length)))
460+
print("Last modified: ", str(targetObj.last_modified))
461+
print("--------")
512462
print("####################")
513-
print("Failed files")
514463
for i in listOfFailed:
515464
print(i)
516465
if files == "":

0 commit comments

Comments
 (0)