2525to test Datastore.
2626"""
2727
28- import concurrent .futures
2928import logging
30- from multiprocessing import pool as pool_lib
31- import time
3229
3330from absl import flags
34- from google .cloud import datastore
35- from google .oauth2 import service_account
3631from perfkitbenchmarker import background_tasks
3732from perfkitbenchmarker import configs
3833from perfkitbenchmarker import errors
3934from perfkitbenchmarker import virtual_machine
40- from perfkitbenchmarker import vm_util
4135from perfkitbenchmarker .linux_packages import ycsb
4236from perfkitbenchmarker .providers .gcp import util
4337
6458# https://github.com/brianfrankcooper/YCSB/tree/master/googledatastore
6559_YCSB_COLLECTIONS = ['usertable' ]
6660
67- # constants used for cleaning up database
68- _CLEANUP_THREAD_POOL_WORKERS = 30
69- _CLEANUP_KIND_READ_BATCH_SIZE = 12000
70- _CLEANUP_KIND_DELETE_BATCH_SIZE = 6000
71- _CLEANUP_KIND_DELETE_PER_THREAD_BATCH_SIZE = 3000
72- _CLEANUP_KIND_DELETE_OP_BATCH_SIZE = 500
73-
7461FLAGS = flags .FLAGS
75- _KEYFILE = flags .DEFINE_string (
76- 'google_datastore_keyfile' ,
77- None ,
78- 'The path to Google API JSON private key file' ,
79- )
80- _PROJECT_ID = flags .DEFINE_string (
81- 'google_datastore_projectId' ,
82- None ,
83- 'The project ID that has Cloud Datastore service' ,
84- )
8562_DATASET_ID = flags .DEFINE_string (
8663 'google_datastore_datasetId' ,
8764 None ,
88- 'The database ID that has Cloud Datastore service ' ,
65+ 'The ID of the database to use for benchmarking ' ,
8966)
9067_DEBUG = flags .DEFINE_string (
9168 'google_datastore_debug' , 'false' , 'The logging level when running YCSB'
9269)
93- _REPOPULATE = flags .DEFINE_boolean (
94- 'google_datastore_repopulate' ,
95- False ,
96- 'If True, empty database & repopulate with new data.'
97- 'By default, tests are run with pre-populated data.' ,
98- )
9970_TARGET_LOAD_QPS = flags .DEFINE_integer (
10071 'google_datastore_target_load_qps' ,
10172 500 ,
10475 ' for more info.' ,
10576)
10677
107- _KEYFILE_LOCAL_PATH = '/tmp/key.json'
10878_INSERTION_RETRY_LIMIT = 100
109- _SLEEP_AFTER_LOADING_SECS = 30 * 60
11079
11180
11281def GetConfig (user_config ):
@@ -128,20 +97,10 @@ def _Install(vm):
12897 """Installs YCSB benchmark & copies datastore keyfile to client vm."""
12998 vm .Install ('ycsb' )
13099
131- # Copy private key file to VM
132- if _KEYFILE .value :
133- if _KEYFILE .value .startswith ('gs://' ):
134- vm .Install ('google_cloud_sdk' )
135- vm .RemoteCommand (f'gsutil cp { _KEYFILE .value } { _KEYFILE_LOCAL_PATH } ' )
136- else :
137- vm .RemoteCopy (_KEYFILE .value , _KEYFILE_LOCAL_PATH )
138-
139100
140101def _GetCommonYcsbArgs ():
141102 """Returns common YCSB args."""
142- project = _PROJECT_ID .value
143- if project is None :
144- project = FLAGS .project or util .GetDefaultProject ()
103+ project = FLAGS .project or util .GetDefaultProject ()
145104 args = {
146105 'googledatastore.projectId' : project ,
147106 'googledatastore.debug' : _DEBUG .value ,
@@ -153,10 +112,7 @@ def _GetCommonYcsbArgs():
153112
154113
155114def _GetYcsbExecutor ():
156- env = {}
157- if _KEYFILE .value :
158- env = {'GOOGLE_APPLICATION_CREDENTIALS' : _KEYFILE_LOCAL_PATH }
159- return ycsb .YCSBExecutor ('googledatastore' , environment = env )
115+ return ycsb .YCSBExecutor ('googledatastore' )
160116
161117
162118def RampUpLoad (
@@ -205,9 +161,6 @@ def Prepare(benchmark_spec):
205161 benchmark_spec: The benchmark specification. Contains all data that is
206162 required to run the benchmark.
207163 """
208- if _REPOPULATE .value :
209- EmptyDatabase ()
210-
211164 vms = benchmark_spec .vms
212165
213166 # Install required packages and copy credential files
@@ -232,10 +185,6 @@ def Run(benchmark_spec):
232185 Returns:
233186 A list of sample.Sample instances.
234187 """
235- if _REPOPULATE .value and not FLAGS .ycsb_skip_run_stage :
236- logging .info ('Sleeping 30 minutes to allow for compaction.' )
237- time .sleep (_SLEEP_AFTER_LOADING_SECS )
238-
239188 executor = _GetYcsbExecutor ()
240189 vms = benchmark_spec .vms
241190 run_kwargs = _GetCommonYcsbArgs ()
@@ -250,247 +199,3 @@ def Run(benchmark_spec):
250199
251200def Cleanup (_ ):
252201 pass
253-
254-
255- def EmptyDatabase ():
256- """Deletes all entries in a datastore database."""
257- if _KEYFILE .value :
258- dataset_id = _DATASET_ID .value
259- executor = concurrent .futures .ThreadPoolExecutor (
260- max_workers = _CLEANUP_THREAD_POOL_WORKERS
261- )
262-
263- logging .info ('Attempting to delete all data in %s' , dataset_id )
264-
265- credentials = GetDatastoreDeleteCredentials ()
266-
267- futures = []
268- for kind in _YCSB_COLLECTIONS :
269- futures .append (
270- executor .submit (
271- _ReadAndDeleteAllEntities (dataset_id , credentials , kind )
272- )
273- )
274-
275- concurrent .futures .wait (
276- futures , timeout = None , return_when = concurrent .futures .ALL_COMPLETED
277- )
278- logging .info ('Deleted all data for %s' , dataset_id )
279-
280- else :
281- logging .warning ('Manually delete all the entries via GCP portal.' )
282-
283-
284- def GetDatastoreDeleteCredentials ():
285- """Returns credentials to datastore db."""
286- if _KEYFILE .value is not None and _KEYFILE .value .startswith ('gs://' ):
287- # Copy private keyfile to local disk
288- cp_cmd = [
289- 'gsutil' ,
290- 'cp' ,
291- _KEYFILE .value ,
292- _KEYFILE_LOCAL_PATH ,
293- ]
294- vm_util .IssueCommand (cp_cmd )
295- credentials_path = _KEYFILE_LOCAL_PATH
296- else :
297- credentials_path = _KEYFILE .value
298-
299- if credentials_path is None :
300- raise errors .Benchmarks .RunError ('Credentials path is None' )
301-
302- credentials = service_account .Credentials .from_service_account_file (
303- credentials_path ,
304- scopes = datastore .client .Client .SCOPE ,
305- )
306-
307- return credentials
308-
309-
310- def _ReadAndDeleteAllEntities (dataset_id , credentials , kind ):
311- """Reads and deletes all kind entries in a datastore database.
312-
313- Args:
314- dataset_id: Cloud Datastore client dataset id.
315- credentials: Cloud Datastore client credentials.
316- kind: Kind for which entities will be deleted.
317-
318- Raises:
319- ValueError: In case of delete failures.
320- """
321- task_id = 1
322- start_cursor = None
323- pool = pool_lib .ThreadPool (processes = _CLEANUP_THREAD_POOL_WORKERS )
324-
325- # We use a cursor to fetch entities in larger read batches and submit delete
326- # tasks to delete them in smaller delete batches (500 at a time) due to
327- # datastore single operation restriction.
328- entity_read_count = 0
329- total_entity_count = 0
330- delete_keys = []
331- while True :
332- query = _CreateClient (dataset_id , credentials ).query (kind = kind )
333- query .keys_only ()
334- query_iter = query .fetch (
335- start_cursor = start_cursor , limit = _CLEANUP_KIND_READ_BATCH_SIZE
336- )
337-
338- for current_entities in query_iter .pages :
339- delete_keys .extend ([entity .key for entity in current_entities ])
340- entity_read_count = len (delete_keys )
341- # logging.debug('next batch of entities for %s - total = %d', kind,
342- # entity_read_count)
343- if entity_read_count >= _CLEANUP_KIND_DELETE_BATCH_SIZE :
344- total_entity_count += entity_read_count
345- logging .info ('Creating tasks...Read %d in total' , total_entity_count )
346- while delete_keys :
347- delete_chunk = delete_keys [
348- :_CLEANUP_KIND_DELETE_PER_THREAD_BATCH_SIZE
349- ]
350- delete_keys = delete_keys [_CLEANUP_KIND_DELETE_PER_THREAD_BATCH_SIZE :]
351- # logging.debug(
352- # 'Creating new Task %d - Read %d entities for %s kind , Read %d '
353- # + 'in total.',
354- # task_id, entity_read_count, kind, total_entity_count)
355- deletion_task = _DeletionTask (kind , task_id )
356- pool .apply_async (
357- deletion_task .DeleteEntities ,
358- (
359- dataset_id ,
360- credentials ,
361- delete_chunk ,
362- ),
363- )
364- task_id += 1
365-
366- # Reset delete batch,
367- entity_read_count = 0
368- delete_keys = []
369-
370- # Read this after the pages are retrieved otherwise it will be set to None.
371- start_cursor = query_iter .next_page_token
372- if start_cursor is None :
373- logging .info ('Read all existing records for %s' , kind )
374- if delete_keys :
375- logging .info (
376- 'Entities batch is not empty %d, submitting new tasks' ,
377- len (delete_keys ),
378- )
379- while delete_keys :
380- delete_chunk = delete_keys [
381- :_CLEANUP_KIND_DELETE_PER_THREAD_BATCH_SIZE
382- ]
383- delete_keys = delete_keys [_CLEANUP_KIND_DELETE_PER_THREAD_BATCH_SIZE :]
384- logging .debug (
385- (
386- 'Creating new Task %d - Read %d entities for %s kind , Read'
387- ' %d in total.'
388- ),
389- task_id ,
390- entity_read_count ,
391- kind ,
392- total_entity_count ,
393- )
394- deletion_task = _DeletionTask (kind , task_id )
395- pool .apply_async (
396- deletion_task .DeleteEntities ,
397- (
398- dataset_id ,
399- credentials ,
400- delete_chunk ,
401- ),
402- )
403- task_id += 1
404- break
405-
406- logging .info ('Waiting for all tasks - %d to complete...' , task_id )
407- time .sleep (60 )
408- pool .close ()
409- pool .join ()
410-
411- # Rerun the query and delete any leftovers to make sure that all records
412- # are deleted as intended.
413- client = _CreateClient (dataset_id , credentials )
414- query = client .query (kind = kind )
415- query .keys_only ()
416- entities = list (query .fetch (limit = 20000 ))
417- if entities :
418- logging .info ('Deleting leftover %d entities for %s' , len (entities ), kind )
419- total_entity_count += len (entities )
420- deletion_task = _DeletionTask (kind , task_id )
421- delete_keys = []
422- delete_keys .extend ([entity .key for entity in entities ])
423- deletion_task .DeleteEntities (dataset_id , credentials , delete_keys )
424-
425- logging .info (
426- 'Deleted all data for %s - %s - %d' , dataset_id , kind , total_entity_count
427- )
428-
429-
430- def _CreateClient (dataset_id , credentials ):
431- """Creates a datastore client for the dataset using the credentials.
432-
433- Args:
434- dataset_id: Cloud Datastore client dataset id.
435- credentials: Cloud Datastore client credentials.
436-
437- Returns:
438- Datastore client.
439- """
440- return datastore .Client (project = dataset_id , credentials = credentials )
441-
442-
443- class _DeletionTask :
444- """Represents a cleanup deletion task.
445-
446- Attributes:
447- kind: Datastore kind to be deleted.
448- task_id: Task id
449- entity_deletion_count: No of entities deleted.
450- deletion_error: Set to true if deletion fails with an error.
451- """
452-
453- def __init__ (self , kind , task_id ):
454- self .kind = kind
455- self .task_id = task_id
456- self .entity_deletion_count = 0
457- self .deletion_error = False
458-
459- def DeleteEntities (self , dataset_id , credentials , delete_entities ):
460- """Deletes entities in a datastore database in batches.
461-
462- Args:
463- dataset_id: Cloud Datastore client dataset id.
464- credentials: Cloud Datastore client credentials.
465- delete_entities: Entities to delete.
466-
467- Returns:
468- number of records deleted.
469- Raises:
470- ValueError: In case of delete failures.
471- """
472- try :
473- client = datastore .Client (project = dataset_id , credentials = credentials )
474- logging .info ('Task %d - Started deletion for %s' , self .task_id , self .kind )
475- while delete_entities :
476- chunk = delete_entities [:_CLEANUP_KIND_DELETE_OP_BATCH_SIZE ]
477- delete_entities = delete_entities [_CLEANUP_KIND_DELETE_OP_BATCH_SIZE :]
478- client .delete_multi (chunk )
479- self .entity_deletion_count += len (chunk )
480-
481- logging .info (
482- 'Task %d - Completed deletion for %s - %d' ,
483- self .task_id ,
484- self .kind ,
485- self .entity_deletion_count ,
486- )
487- return self .entity_deletion_count
488- except ValueError as error :
489- logging .exception (
490- 'Task %d - Delete entities for %s failed due to %s' ,
491- self .task_id ,
492- self .kind ,
493- error ,
494- )
495- self .deletion_error = True
496- raise error
0 commit comments