Skip to content

Commit 054e4b2

Browse files
authored
Merge pull request #36 from RachelTucker/updating_samples
Updating bulk get sample for multi-blob download and improved clarity
2 parents c2f8807 + 17c8e38 commit 054e4b2

File tree

1 file changed

+64
-26
lines changed

1 file changed

+64
-26
lines changed

samples/gettingData.py

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,59 +15,97 @@
1515

1616
from ds3 import ds3
1717

18-
client = ds3.createClientFromEnv()
18+
# This example retrieves all objects in the specified bucket and lands them in the specified destination.
19+
# By default it looks for objects in bucket 'books' and lands them in the temporary directory.
20+
# At the end of running, those files are removed from the local system for testing reasons.
21+
#
22+
# This example assumes that a bucket named "books" containing some objects exist on the server.
1923

20-
bucketName = "books"
21-
# this example assumes that a bucket named "books" containing some objects exist on the server
24+
bucketName = "books" # modify this value to match the BP bucket you wish to retrieve objects from
2225

26+
destination = tempfile.gettempdir() # modify this value to match where the object should be landed on your system
27+
28+
client = ds3.createClientFromEnv()
29+
30+
# retrieves a list of all objects in the bucket
2331
bucketContents = client.get_bucket(ds3.GetBucketRequest(bucketName))
2432

33+
# Converting that list of objects into a list of objects to retrieve.
34+
# If you want to retrieve a subset of objects, or already know their names, then just make a list of ds3.DesGetObject
35+
# where each item describes one object you wish to retrieve from the BP.
2536
objectList = list([ds3.Ds3GetObject(obj['Key']) for obj in bucketContents.result['ContentsList']])
37+
38+
# Create a dictionary to map the BP object name to the destination where your landing the object.
39+
# In this example, we are landing all objects to the path described in the destination variable.
40+
# Also, if the object name contains paths, this will normalize it for your OS and land that object
41+
# in a sub-folder of the destination.
42+
objectNameToDestinationPathMap = {}
43+
for obj in objectList:
44+
objectNameToDestinationPathMap[obj.name] = os.path.join(destination, os.path.normpath(obj.name))
45+
46+
# Create a bulk get job on the BP. This tells the BP what objects your going to retrieve.
47+
# This triggers the BP to start staging the objects in cache.
48+
# Large objects may have been broken up into several pieces, i.e. blobs.
49+
# The BP breaks up your retrieval job into "chunks".
50+
# These chunks represent bundles of data that are ready to be retrieved.
51+
# Each chunk which will contain one or more pieces of your files (blobs).
52+
# How the job will be broken up (chunked) is determined when you create the bulk get job.
2653
bulkGetResult = client.get_bulk_job_spectra_s3(ds3.GetBulkJobSpectraS3Request(bucketName, objectList))
2754

28-
# create a set of the chunk ids which will be used to track
29-
# what chunks have not been retrieved
55+
# Create a set of the chunk ids that describe all units of work that make up the get job.
56+
# This will be used to track which chunks we still need to process.
3057
chunkIds = set([x['ChunkId'] for x in bulkGetResult.result['ObjectsList']])
3158

32-
# create a dictionary to map our retrieved objects to temporary files
33-
# if you want to keep the retreived files on disk, this is not necessary
34-
tempFiles = {}
35-
36-
# while we still have chunks to retrieve
59+
# Attempt to retrieve data from the BP while there are still chunks that need to be processed.
3760
while len(chunkIds) > 0:
38-
# get a list of the available chunks that we can get
61+
# Get a list of chunks for this job that are ready to be retrieved.
3962
availableChunks = client.get_job_chunks_ready_for_client_processing_spectra_s3(
4063
ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(bulkGetResult.result['JobId']))
4164

4265
chunks = availableChunks.result['ObjectsList']
4366

44-
# check to make sure we got some chunks, if we did not
45-
# sleep and retry. This could mean that the cache is full
67+
# Check to make sure we got some chunks, if we did not sleep and retry.
68+
# Having no chunks ready may indicate that the BP cache is currently full.
4669
if len(chunks) == 0:
4770
time.sleep(availableChunks.retryAfter)
4871
continue
4972

50-
# for each chunk that is available, check to make sure
51-
# we have not gotten it, and if not, get that object
73+
# For each chunk that is available, check to make sure we haven't processed it already.
74+
# If we have not processed this chunk yet, then retrieve all its objects.
5275
for chunk in chunks:
5376
if not chunk['ChunkId'] in chunkIds:
77+
# This chunk has already been processed
5478
continue
55-
chunkIds.remove(chunk['ChunkId'])
79+
80+
# For each blob within this chunk, retrieve the data and land it on the destination.
5681
for obj in chunk['ObjectList']:
57-
# if we haven't create a temporary file for this object yet, create one
58-
if obj['Name'] not in list(tempFiles.keys()):
59-
tempFiles[obj['Name']] = tempfile.mkstemp()
82+
# Open the destination file and seek to the offset corresponding with this blob.
83+
objectStream = open(objectNameToDestinationPathMap[obj['Name']], "wb")
84+
objectStream.seek(int(obj['Offset']))
6085

61-
# get the object
62-
objectStream = open(tempFiles[obj['Name']][1], "wb")
86+
# Get the blob for the current object and write it to the destination.
6387
client.get_object(ds3.GetObjectRequest(bucketName,
6488
obj['Name'],
6589
objectStream,
6690
offset=int(obj['Offset']),
6791
job=bulkGetResult.result['JobId']))
6892

69-
# iterate over the temporary files, printing out their names, then closing and and removing them
70-
for objName in list(tempFiles.keys()):
71-
print(objName)
72-
os.close(tempFiles[objName][0])
73-
os.remove(tempFiles[objName][1])
93+
# Close the file handle.
94+
objectStream.close()
95+
96+
# We've finished processing this chunk. Remove it from our list of chunks that still need processing.
97+
chunkIds.remove(chunk['ChunkId'])
98+
99+
# Go through all items that were landed and check that they were created.
100+
# This is not needed in production code.
101+
for objName in objectNameToDestinationPathMap.keys():
102+
destinationPath = objectNameToDestinationPathMap[objName]
103+
if os.path.isfile(destinationPath):
104+
fileSize = os.path.getsize(destinationPath)
105+
print(f'Retrieved object={objName}, landed at destination={destinationPath}, has size={fileSize}')
106+
107+
# This removes the retrieved file from the destination.
108+
# This is done to clean up the script for when people are using it to test connection only.
109+
os.remove(destinationPath) # Remove in production code.
110+
else:
111+
print(f'Failed to retrieve object={objName}')

0 commit comments

Comments
 (0)