Skip to content

Commit e80f8f8

Browse files
author
DenverM80
committed
Add multithreaded memory write test to verify true interleaving of transfers
1 parent 30da67f commit e80f8f8

File tree

3 files changed

+158
-107
lines changed

3 files changed

+158
-107
lines changed

test/connection_tests.cpp

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ BOOST_AUTO_TEST_CASE( bulk_put_10k_very_small_files ) {
131131
put_chunks_args_single_thread->chunks_list = ensure_available_chunks(client, bulk_response->job_id);
132132
put_chunks_args_single_thread->verbose = False;
133133

134-
put_chunks(put_chunks_args_single_thread);
134+
put_chunks_from_file(put_chunks_args_single_thread);
135135

136136
ds3_master_object_list_response_free(put_chunks_args_single_thread->chunks_list);
137137
ds3_master_object_list_response_free(bulk_response);
@@ -164,8 +164,8 @@ BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) {
164164

165165
GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False);
166166

167-
GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks, g_ptr_array_index(put_objs_args_array, 0));
168-
GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks, g_ptr_array_index(put_objs_args_array, 1));
167+
GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 0));
168+
GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 1));
169169

170170
// Block and cleanup GThreads
171171
g_thread_join(chunks_thread_0);
@@ -209,7 +209,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) {
209209
// capture sequential test start time
210210
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
211211

212-
GThread* xfer_sequential_thread = g_thread_new("sequential_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(put_sequential_objs_threads_array, 0));
212+
GThread* xfer_sequential_thread = g_thread_new("sequential_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_sequential_objs_threads_array, 0));
213213

214214
// Block and cleanup GThreads
215215
g_thread_join(xfer_sequential_thread);
@@ -243,10 +243,10 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) {
243243
// capture sequential test start time
244244
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
245245

246-
GThread* xfer_parallel_thread_0 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(put_parallel_objs_threads_array, 0));
247-
GThread* xfer_parallel_thread_1 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(put_parallel_objs_threads_array, 1));
248-
GThread* xfer_parallel_thread_2 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(put_parallel_objs_threads_array, 2));
249-
GThread* xfer_parallel_thread_3 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(put_parallel_objs_threads_array, 3));
246+
GThread* xfer_parallel_thread_0 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 0));
247+
GThread* xfer_parallel_thread_1 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 1));
248+
GThread* xfer_parallel_thread_2 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 2));
249+
GThread* xfer_parallel_thread_3 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 3));
250250

251251
// Block and cleanup GThreads
252252
g_thread_join(xfer_parallel_thread_0);
@@ -321,8 +321,8 @@ BOOST_AUTO_TEST_CASE( multiple_client_xfer ) {
321321
// capture sequential test start time
322322
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
323323

324-
GThread* client1_xfer_thread = g_thread_new("client1_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(client1_put_objs_args, 0));
325-
GThread* client2_xfer_thread = g_thread_new("client2_objs_xfer", (GThreadFunc)put_chunks, g_ptr_array_index(client2_put_objs_args, 0));
324+
GThread* client1_xfer_thread = g_thread_new("client1_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(client1_put_objs_args, 0));
325+
GThread* client2_xfer_thread = g_thread_new("client2_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(client2_put_objs_args, 0));
326326

327327
// Block and cleanup GThreads
328328
g_thread_join(client1_xfer_thread);
@@ -355,37 +355,71 @@ BOOST_AUTO_TEST_CASE( multiple_client_xfer ) {
355355
BOOST_AUTO_TEST_CASE( performance_bulk_put ) {
356356
printf("-----Testing BULK_PUT performance-------\n");
357357

358-
const char* bucket_name = "bulk_put_performance_bucket";
358+
const char* bucket_name1 = "bulk_put_performance_bucket1";
359+
const char* bucket_name2 = "bulk_put_performance_bucket2";
360+
const char* bucket_name3 = "bulk_put_performance_bucket3";
359361
ds3_request* request = NULL;
360-
ds3_master_object_list_response* bulk_response = NULL;
361-
ds3_client* client = get_client();
362-
ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value);
362+
ds3_master_object_list_response* bulk_response1 = NULL;
363+
ds3_master_object_list_response* bulk_response2 = NULL;
364+
ds3_master_object_list_response* bulk_response3 = NULL;
365+
366+
ds3_client* client1 = get_client();
367+
ds3_client* client2 = ds3_copy_client(client1); // share the connection pool
368+
ds3_client* client3 = ds3_copy_client(client1); // share the connection pool
369+
// Log per thread
370+
int client1_thread=1, client2_thread=2, client3_thread=3;
371+
ds3_client_register_logging(client1, DS3_INFO, test_log, (void*)&client1_thread);
372+
ds3_client_register_logging(client2, DS3_INFO, test_log, (void*)&client2_thread);
373+
ds3_client_register_logging(client3, DS3_INFO, test_log, (void*)&client3_thread);
374+
375+
376+
ds3_error* error = create_bucket_with_data_policy(client1, bucket_name1, ids.data_policy_id->value);
377+
handle_error(error);
378+
error = create_bucket_with_data_policy(client1, bucket_name2, ids.data_policy_id->value);
379+
handle_error(error);
380+
error = create_bucket_with_data_policy(client1, bucket_name3, ids.data_policy_id->value);
363381
handle_error(error);
364382

365383
// Create the list of fake files to transfer
366384
size_t obj_size = 512 * 1024 * 1024; // 512MB
367385
const char* obj_prefix = "perf_obj";
368-
size_t num_files = 20;
386+
size_t num_files = 10;
369387
ds3_bulk_object_list_response* obj_list = create_bulk_object_list_from_prefix_with_size(obj_prefix, num_files, obj_size);
370388

371-
// Create the BULK_PUT job
372-
request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, obj_list);
373-
error = ds3_put_bulk_job_spectra_s3_request(client, request, &bulk_response);
389+
// Create the BULK_PUT jobs
390+
request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name1, obj_list);
391+
error = ds3_put_bulk_job_spectra_s3_request(client1, request, &bulk_response1);
392+
handle_error(error);
393+
ds3_request_free(request);
394+
395+
request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name2, obj_list);
396+
error = ds3_put_bulk_job_spectra_s3_request(client1, request, &bulk_response2);
374397
handle_error(error);
375398
ds3_request_free(request);
376399

377-
ds3_master_object_list_response* chunks_response = ensure_available_chunks(client, bulk_response->job_id);
400+
request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name3, obj_list);
401+
error = ds3_put_bulk_job_spectra_s3_request(client1, request, &bulk_response3);
402+
handle_error(error);
403+
ds3_request_free(request);
378404

379-
GPtrArray* put_perf_objs_threads_array = new_put_chunks_threads_args_performance(client, obj_prefix, bucket_name, bulk_response, chunks_response, 3, True, True);
405+
// Ensure cache space for the jobs
406+
ds3_master_object_list_response* chunks_response1 = ensure_available_chunks(client1, bulk_response1->job_id);
407+
ds3_master_object_list_response* chunks_response2 = ensure_available_chunks(client2, bulk_response2->job_id);
408+
ds3_master_object_list_response* chunks_response3 = ensure_available_chunks(client3, bulk_response3->job_id);
409+
410+
GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, bucket_name1, bulk_response1, chunks_response1, 1, True);
411+
GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, bucket_name2, bulk_response2, chunks_response2, 1, True);
412+
GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, bucket_name3, bulk_response3, chunks_response3, 1, True);
380413

381414
// capture sequential test start time
382415
struct timespec start_time_t, end_time_t;
383416
double elapsed_t;
384417
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
385418

386-
GThread* xfer_thread_1 = g_thread_new("performance_objs_xfer_1", (GThreadFunc)put_chunks, g_ptr_array_index(put_perf_objs_threads_array, 0));
387-
GThread* xfer_thread_2 = g_thread_new("performance_objs_xfer_2", (GThreadFunc)put_chunks, g_ptr_array_index(put_perf_objs_threads_array, 1));
388-
GThread* xfer_thread_3 = g_thread_new("performance_objs_xfer_3", (GThreadFunc)put_chunks, g_ptr_array_index(put_perf_objs_threads_array, 2));
419+
// Spawn threads
420+
GThread* xfer_thread_1 = g_thread_new("performance_objs_xfer_1", (GThreadFunc)put_chunks_from_mem, g_ptr_array_index(put_perf_objs_threads_array1, 0));
421+
GThread* xfer_thread_2 = g_thread_new("performance_objs_xfer_2", (GThreadFunc)put_chunks_from_mem, g_ptr_array_index(put_perf_objs_threads_array2, 0));
422+
GThread* xfer_thread_3 = g_thread_new("performance_objs_xfer_3", (GThreadFunc)put_chunks_from_mem, g_ptr_array_index(put_perf_objs_threads_array3, 0));
389423

390424
// Block and cleanup GThreads
391425
g_thread_join(xfer_thread_1);
@@ -397,12 +431,24 @@ BOOST_AUTO_TEST_CASE( performance_bulk_put ) {
397431
elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t);
398432
printf(" Elapsed time[%f]\n", elapsed_t);
399433

400-
ds3_master_object_list_response_free(bulk_response);
401-
ds3_master_object_list_response_free(chunks_response);
402-
ds3_bulk_object_list_response_free(obj_list);
403-
put_chunks_threads_args_free(put_perf_objs_threads_array);
434+
ds3_master_object_list_response_free(bulk_response1);
435+
ds3_master_object_list_response_free(chunks_response1);
436+
put_chunks_threads_args_free(put_perf_objs_threads_array1);
437+
clear_bucket(client1, bucket_name1);
438+
free_client(client1);
404439

405-
clear_bucket(client, bucket_name);
406-
free_client(client);
440+
ds3_master_object_list_response_free(bulk_response2);
441+
ds3_master_object_list_response_free(chunks_response2);
442+
put_chunks_threads_args_free(put_perf_objs_threads_array2);
443+
clear_bucket(client2, bucket_name2);
444+
free_client(client2);
445+
446+
ds3_master_object_list_response_free(bulk_response3);
447+
ds3_master_object_list_response_free(chunks_response3);
448+
put_chunks_threads_args_free(put_perf_objs_threads_array3);
449+
clear_bucket(client3, bucket_name3);
450+
free_client(client3);
451+
452+
ds3_bulk_object_list_response_free(obj_list);
407453
}
408454

0 commit comments

Comments
 (0)