Skip to content

Commit 75cdb73

Browse files
author
DenverM80
committed
Add test case to verify CURL can PUT objects multithreaded concurrently
1 parent 8688b56 commit 75cdb73

File tree

1 file changed

+146
-51
lines changed

1 file changed

+146
-51
lines changed

test/bulk_put.cpp

Lines changed: 146 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
#include "ds3_net.h"
2525
#include "test.h"
2626

27-
#define BUFF_SIZE 16
27+
#define BUFF_SIZE 64
2828

2929
/**
3030
* Create a ds3_bulk_object_list_response with the same name many times, append a number
@@ -40,7 +40,7 @@ ds3_bulk_object_list_response* create_bulk_object_list_single_file(const char* f
4040

4141
GPtrArray* ds3_bulk_object_response_array = g_ptr_array_new();
4242
for (size_t index = 0; index < num_files; index++) {
43-
g_snprintf(put_filename, BUFF_SIZE, "file_%05lu", index);
43+
g_snprintf(put_filename, BUFF_SIZE, "%s_%05lu", file_name, index);
4444

4545
ds3_bulk_object_response* obj = g_new0(ds3_bulk_object_response, 1);
4646
obj->name = ds3_str_init(put_filename);
@@ -67,36 +67,84 @@ typedef struct {
6767
char* src_object_name;
6868
char* bucket_name;
6969
ds3_master_object_list_response* chunks_list;
70-
} test_put_chunks_args;
70+
ds3_bool verbose;
71+
} put_chunks_args;
7172

73+
/*
74+
* Returned put_chunks_threads_args* must be freed with put_chunks_threads_args_free();
75+
*/
76+
GPtrArray* new_put_chunks_threads_args(ds3_client* client,
77+
const char* src_obj_name,
78+
const char* dest_bucket_name,
79+
const ds3_master_object_list_response* bulk_response,
80+
ds3_master_object_list_response* available_chunks,
81+
const uint8_t num_threads,
82+
const ds3_bool verbose) {
83+
GPtrArray* put_chunks_args_array = g_ptr_array_new();
84+
85+
for (uint8_t thread_index = 0; thread_index < num_threads; thread_index++) {
86+
put_chunks_args* put_objects_args = g_new0(put_chunks_args, 1);
87+
put_objects_args->client = client;
88+
put_objects_args->job_id = bulk_response->job_id->value;
89+
put_objects_args->src_object_name = (char*)src_obj_name;
90+
put_objects_args->bucket_name = (char*)dest_bucket_name;
91+
put_objects_args->chunks_list = available_chunks;
92+
put_objects_args->thread_num = thread_index;
93+
put_objects_args->num_threads = num_threads;
94+
put_objects_args->verbose = verbose;
95+
g_ptr_array_add(put_chunks_args_array, put_objects_args);
96+
}
97+
98+
return put_chunks_args_array;
99+
}
100+
101+
void put_chunks_threads_args_free(GPtrArray* array) {
102+
for (size_t index = 0; index < array->len; index++) {
103+
g_free(g_ptr_array_index(array, index));
104+
}
105+
106+
g_ptr_array_free(array, TRUE);
107+
}
108+
109+
/**
110+
* To be passed as GThreadFunc arg to g_thread_new() along with a put_chunks_args struct
111+
*/
72112
void put_chunks(void* args) {
73-
test_put_chunks_args* put_chunks_args = (test_put_chunks_args*)args;
113+
put_chunks_args* _args = (put_chunks_args*)args;
74114
ds3_objects_response* chunk_object_list = NULL;
75115

76-
for (size_t chunk_index = 0; chunk_index < put_chunks_args->chunks_list->num_objects; chunk_index++) {
77-
chunk_object_list = put_chunks_args->chunks_list->objects[chunk_index];
116+
for (size_t chunk_index = 0; chunk_index < _args->chunks_list->num_objects; chunk_index++) {
117+
chunk_object_list = _args->chunks_list->objects[chunk_index];
78118
for (size_t object_index = 0; object_index < chunk_object_list->num_objects; object_index++) {
79119

80120
// Work distribution
81-
if (object_index % put_chunks_args->num_threads == put_chunks_args->thread_num) {
121+
if (object_index % _args->num_threads == _args->thread_num) {
82122
ds3_bulk_object_response* object = chunk_object_list->objects[object_index];
83123
// Send the same file every time, give it a different destination name
84-
FILE* file = fopen(put_chunks_args->src_object_name, "r");
124+
FILE* file = fopen(_args->src_object_name, "r");
85125
if (file == NULL) {
86-
printf("Unable to open %s for read (FILE NULL), skipping put to bucket %s!\n", put_chunks_args->src_object_name, put_chunks_args->bucket_name);
126+
printf("Unable to open %s for read (FILE NULL), skipping put to bucket %s!\n", _args->src_object_name, _args->bucket_name);
87127
return;
88128
}
89129

90-
ds3_request* request = ds3_init_put_object_request(put_chunks_args->bucket_name, object->name->value, object->length);
91-
ds3_request_set_job(request, put_chunks_args->job_id);
130+
ds3_request* request = ds3_init_put_object_request(_args->bucket_name, object->name->value, object->length);
131+
ds3_request_set_job(request, _args->job_id);
92132
if (object->offset > 0) {
93133
fseek(file, object->offset, SEEK_SET);
94134
}
95-
ds3_error* error = ds3_put_object_request(put_chunks_args->client, request, file, ds3_read_from_file);
96-
ds3_request_free(request);
97135

136+
if (_args->verbose) {
137+
printf(" Thread[%d] BEGIN xfer File[%s] Chunk[%lu]\n", _args->thread_num, object->name->value, _args->chunks_list->num_objects);
138+
}
139+
140+
ds3_error* error = ds3_put_object_request(_args->client, request, file, ds3_read_from_file);
141+
ds3_request_free(request);
98142
fclose(file);
99143
handle_error(error);
144+
145+
if (_args->verbose) {
146+
printf(" Thread[%d] END xfer File[%s] Chunk[%lu]\n", _args->thread_num, object->name->value, _args->chunks_list->num_objects);
147+
}
100148
}
101149
}
102150
}
@@ -119,29 +167,30 @@ BOOST_AUTO_TEST_CASE( bulk_put_10k_very_small_files ) {
119167
ds3_bulk_object_list_response_free(object_list);
120168
handle_error(error);
121169

122-
test_put_chunks_args* put_chunks_args = g_new0(test_put_chunks_args, 1);
123-
put_chunks_args->client = client;
124-
put_chunks_args->num_threads = 1;
125-
put_chunks_args->thread_num = 0;
126-
put_chunks_args->job_id = bulk_response->job_id->value;
127-
put_chunks_args->src_object_name = (char*)object_name;
128-
put_chunks_args->bucket_name = (char*)bucket_name;
129-
put_chunks_args->chunks_list = ensure_available_chunks(client, bulk_response->job_id);
170+
put_chunks_args* put_chunks_args_single_thread = g_new0(put_chunks_args, 1);
171+
put_chunks_args_single_thread->client = client;
172+
put_chunks_args_single_thread->num_threads = 1;
173+
put_chunks_args_single_thread->thread_num = 0;
174+
put_chunks_args_single_thread->job_id = bulk_response->job_id->value;
175+
put_chunks_args_single_thread->src_object_name = (char*)object_name;
176+
put_chunks_args_single_thread->bucket_name = (char*)bucket_name;
177+
put_chunks_args_single_thread->chunks_list = ensure_available_chunks(client, bulk_response->job_id);
178+
put_chunks_args_single_thread->verbose = False;
130179

131-
put_chunks(put_chunks_args);
180+
put_chunks(put_chunks_args_single_thread);
132181

133-
ds3_master_object_list_response_free(put_chunks_args->chunks_list);
182+
ds3_master_object_list_response_free(put_chunks_args_single_thread->chunks_list);
134183
ds3_master_object_list_response_free(bulk_response);
135-
g_free(put_chunks_args);
184+
g_free(put_chunks_args_single_thread);
136185

137186
clear_bucket(client, bucket_name);
138187
free_client(client);
139188
}
140189

141-
142190
BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) {
143191
printf("-----Testing Bulk PUT of 200 very small files multithreaded-------\n");
144192
const char* bucket_name = "test_bulk_put_200_very_small_files_multithreaded";
193+
const uint8_t num_threads = 2;
145194
const char* object_name = "resources/very_small_file.txt";
146195
ds3_request* request = NULL;
147196
ds3_master_object_list_response* bulk_response = NULL;
@@ -159,37 +208,83 @@ BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) {
159208

160209
ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id);
161210

162-
// send to child thread 1
163-
test_put_chunks_args* put_odd_objects_args = g_new0(test_put_chunks_args, 1);
164-
put_odd_objects_args->client = client;
165-
put_odd_objects_args->job_id = bulk_response->job_id->value;
166-
put_odd_objects_args->src_object_name = (char*)object_name;
167-
put_odd_objects_args->bucket_name = (char*)bucket_name;
168-
put_odd_objects_args->chunks_list = chunk_response;
169-
put_odd_objects_args->thread_num = 0;
170-
put_odd_objects_args->num_threads = 2;
171-
172-
// send to child thread 2
173-
test_put_chunks_args* put_even_objects_args = g_new0(test_put_chunks_args, 1);
174-
put_even_objects_args->client = client;
175-
put_even_objects_args->job_id = bulk_response->job_id->value;
176-
put_even_objects_args->src_object_name = (char*)object_name;
177-
put_even_objects_args->bucket_name = (char*)bucket_name;
178-
put_even_objects_args->chunks_list = chunk_response;
179-
put_even_objects_args->thread_num = 1;
180-
put_even_objects_args->num_threads = 2;
181-
182-
GThread* even_chunks_thread = g_thread_new("even_objects", (GThreadFunc)put_chunks, put_even_objects_args);
183-
GThread* odd_chunks_thread = g_thread_new("odd_objects", (GThreadFunc)put_chunks, put_odd_objects_args);
211+
GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False);
212+
213+
GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks, g_ptr_array_index(put_objs_args_array, 0));
214+
GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks, g_ptr_array_index(put_objs_args_array, 1));
184215

185216
// Block and cleanup GThreads
186-
g_thread_join(even_chunks_thread);
187-
g_thread_join(odd_chunks_thread);
217+
g_thread_join(chunks_thread_0);
218+
g_thread_join(chunks_thread_1);
188219

189220
ds3_master_object_list_response_free(chunk_response);
190221
ds3_master_object_list_response_free(bulk_response);
191-
g_free(put_odd_objects_args);
192-
g_free(put_even_objects_args);
222+
put_chunks_threads_args_free(put_objs_args_array);
223+
224+
clear_bucket(client, bucket_name);
225+
free_client(client);
226+
}
227+
228+
/**
229+
* Create two jobs to put two set of files: 200 x 20bytes and 10x46mb
230+
* Divide file PUTs between 2 threads each, 4 total.
231+
* Assert no errors are encountered.
232+
*/
233+
BOOST_AUTO_TEST_CASE( put_large_and_small_objects_concurrently ) {
234+
printf("-----Testing BULK_PUT of large(15mb) & small objects(20bytes) concurrently-------\n");
235+
236+
const char* bucket_name = "test_bulk_put_large_and_small_objects_concurrently";
237+
const char* small_object_name = "resources/very_small_file.txt"; // 20 bytes
238+
const char* large_object_name = "resources/ulysses_46mb.txt"; // 46 mb
239+
ds3_request* request = NULL;
240+
241+
ds3_master_object_list_response* small_objs_bulk_response = NULL;
242+
ds3_master_object_list_response* large_objs_bulk_response = NULL;
243+
ds3_bulk_object_list_response* small_object_list = create_bulk_object_list_single_file(small_object_name, 200);
244+
ds3_bulk_object_list_response* large_object_list = create_bulk_object_list_single_file(large_object_name, 10);
245+
246+
ds3_client* client = get_client();
247+
ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value);
248+
249+
request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, small_object_list);
250+
error = ds3_put_bulk_job_spectra_s3_request(client, request, &small_objs_bulk_response);
251+
ds3_request_free(request);
252+
ds3_bulk_object_list_response_free(small_object_list);
253+
handle_error(error);
254+
255+
ds3_master_object_list_response* small_objs_chunk_response = ensure_available_chunks(client, small_objs_bulk_response->job_id);
256+
257+
request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, large_object_list);
258+
error = ds3_put_bulk_job_spectra_s3_request(client, request, &large_objs_bulk_response);
259+
ds3_request_free(request);
260+
ds3_bulk_object_list_response_free(large_object_list);
261+
handle_error(error);
262+
263+
ds3_master_object_list_response* large_objs_chunk_response = ensure_available_chunks(client, large_objs_bulk_response->job_id);
264+
265+
GPtrArray* put_small_objs_args_array = new_put_chunks_threads_args(client, small_object_name, bucket_name, small_objs_bulk_response, small_objs_chunk_response, 2, True);
266+
GPtrArray* put_large_objs_args_array = new_put_chunks_threads_args(client, large_object_name, bucket_name, large_objs_bulk_response, large_objs_chunk_response, 2, True);
267+
268+
GThread* small_chunks_thread_0 = g_thread_new("small_objects_0", (GThreadFunc)put_chunks, g_ptr_array_index(put_small_objs_args_array, 0));
269+
GThread* small_chunks_thread_1 = g_thread_new("small_objects_1", (GThreadFunc)put_chunks, g_ptr_array_index(put_small_objs_args_array, 1));
270+
271+
GThread* large_chunks_thread_0 = g_thread_new("large_objects_0", (GThreadFunc)put_chunks, g_ptr_array_index(put_large_objs_args_array, 0));
272+
GThread* large_chunks_thread_1 = g_thread_new("large_objects_1", (GThreadFunc)put_chunks, g_ptr_array_index(put_large_objs_args_array, 1));
273+
274+
// Block and cleanup GThreads
275+
g_thread_join(small_chunks_thread_0);
276+
g_thread_join(small_chunks_thread_1);
277+
278+
g_thread_join(large_chunks_thread_0);
279+
g_thread_join(large_chunks_thread_1);
280+
281+
ds3_master_object_list_response_free(small_objs_chunk_response);
282+
ds3_master_object_list_response_free(small_objs_bulk_response);
283+
put_chunks_threads_args_free(put_small_objs_args_array);
284+
285+
ds3_master_object_list_response_free(large_objs_chunk_response);
286+
ds3_master_object_list_response_free(large_objs_bulk_response);
287+
put_chunks_threads_args_free(put_large_objs_args_array);
193288

194289
clear_bucket(client, bucket_name);
195290
free_client(client);

0 commit comments

Comments
 (0)