Skip to content

Commit 65b5ed4

Browse files
authored
Merge pull request #197 from DenverM80/concurrent_xfers_test
Add test case to verify CURL can PUT objects multithreaded concurrently
2 parents 8688b56 + e80f8f8 commit 65b5ed4

15 files changed

+889
-196
lines changed

src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,6 @@ else(WIN32)
8282
DESTINATION
8383
"/usr/local/include")
8484
install(TARGETS ds3 DESTINATION lib)
85+
set(CMAKE_BUILD_TYPE Release)
86+
#set(CMAKE_BUILD_TYPE Debug)
8587
endif(WIN32)

src/ds3.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,30 @@ ds3_error* ds3_create_client_from_env(ds3_client** client) {
10611061
return NULL;
10621062
}
10631063

1064+
// Allow multiple ds3_clients to share a ds3_connection_pool
1065+
ds3_client* ds3_copy_client(const ds3_client const* client) {
1066+
if (client == NULL) {
1067+
return NULL;
1068+
}
1069+
1070+
ds3_client* copied_client = g_new0(ds3_client, 1);
1071+
copied_client->endpoint = ds3_str_dup(client->endpoint);
1072+
if (client->proxy) {
1073+
copied_client->proxy = ds3_str_dup(client->proxy);
1074+
}
1075+
copied_client->num_redirects = client->num_redirects;
1076+
copied_client->creds = ds3_create_creds(client->creds->access_id->value, client->creds->secret_key->value);
1077+
1078+
ds3_client_register_net( copied_client, net_process_request );
1079+
ds3_client_register_logging(copied_client, client->log->log_lvl, client->log->log_callback, NULL);
1080+
1081+
copied_client->connection_pool = client->connection_pool;
1082+
ds3_connection_pool_inc_ref(copied_client->connection_pool);
1083+
return copied_client;
1084+
}
1085+
10641086
void ds3_client_proxy(ds3_client* client, const char* proxy) {
1087+
ds3_str_free(client->proxy);
10651088
client->proxy = ds3_str_init(proxy);
10661089
}
10671090

@@ -1080,11 +1103,12 @@ void ds3_client_free(ds3_client* client) {
10801103
return;
10811104
}
10821105

1106+
// free client->connection_pool only if there are no remaining references
1107+
ds3_connection_pool_dec_ref(client->connection_pool);
1108+
10831109
ds3_str_free(client->endpoint);
10841110
ds3_str_free(client->proxy);
10851111
g_free(client->log);
1086-
ds3_connection_pool_clear(client->connection_pool);
1087-
g_free(client->connection_pool);
10881112
g_free(client);
10891113
}
10901114

src/ds3.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,7 @@ LIBRARY_API void ds3_client_free(ds3_client* client);
23292329
LIBRARY_API ds3_creds* ds3_create_creds(const char *const access_id, const char *const secret_key);
23302330
LIBRARY_API ds3_client* ds3_create_client(const char *const endpoint, ds3_creds* creds);
23312331
LIBRARY_API ds3_error* ds3_create_client_from_env(ds3_client** client);
2332+
LIBRARY_API ds3_client* ds3_copy_client(const ds3_client* client);
23322333
LIBRARY_API void ds3_client_register_logging(ds3_client* client, ds3_log_lvl log_lvl, void (* log_callback)(const char* log_message, void* user_data), void* user_data);
23332334
LIBRARY_API void ds3_client_register_net(ds3_client* client, ds3_error* (* net_callback)(const ds3_client* client,
23342335
const ds3_request* _request,

src/ds3_connection.c

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,41 +20,52 @@
2020
#include <string.h>
2121
#include <curl/curl.h>
2222
#include <glib.h>
23+
#include <inttypes.h>
2324
#include "ds3_net.h"
2425

2526
ds3_connection_pool* ds3_connection_pool_init(void) {
27+
return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE);
28+
}
29+
30+
ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) {
2631
ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1);
32+
pool->connections = g_new0(ds3_connection*, pool_size);
33+
pool->num_connections = pool_size;
2734
g_mutex_init(&pool->mutex);
2835
g_cond_init(&pool->available_connections);
36+
pool->ref_count = 1;
2937
return pool;
3038
}
3139

32-
void ds3_connection_pool_clear(ds3_connection_pool* pool) {
40+
void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked) {
3341
int index;
3442

3543
if (pool == NULL) {
3644
return;
3745
}
3846

39-
g_mutex_lock(&pool->mutex);
47+
if (already_locked == False) {
48+
g_mutex_lock(&pool->mutex);
49+
}
4050

41-
for (index = 0; index < CONNECTION_POOL_SIZE; index++) {
51+
for (index = 0; index < pool->num_connections; index++) {
4252
if (pool->connections[index] != NULL) {
4353
curl_easy_cleanup(pool->connections[index]);
4454
}
4555
}
4656

57+
g_free(pool->connections);
4758
g_mutex_unlock(&pool->mutex);
48-
g_mutex_clear(&pool->mutex);
59+
g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined
4960
g_cond_clear(&pool->available_connections);
5061
}
5162

52-
static int _pool_inc(ds3_connection_pool* pool, int index) {
53-
return (index+1) % CONNECTION_POOL_SIZE;
63+
static int _pool_inc(int index, uint16_t num_connections) {
64+
return (index+1) % num_connections;
5465
}
5566

5667
static int _pool_full(ds3_connection_pool* pool) {
57-
return (_pool_inc(pool, pool->tail) == pool->head);
68+
return (_pool_inc(pool->head, pool->num_connections) == pool->tail);
5869
}
5970

6071

@@ -73,7 +84,7 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {
7384
} else {
7485
connection = pool->connections[pool->head];
7586
}
76-
pool->head = _pool_inc(pool, pool->head);
87+
pool->head = _pool_inc(pool->head, pool->num_connections);
7788

7889
g_mutex_unlock(&pool->mutex);
7990

@@ -82,11 +93,27 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {
8293

8394
void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) {
8495
g_mutex_lock(&pool->mutex);
85-
8696
curl_easy_reset(connection);
87-
pool->tail = _pool_inc(pool, pool->tail);
97+
pool->tail = _pool_inc(pool->tail, pool->num_connections);
8898

8999
g_mutex_unlock(&pool->mutex);
90100
g_cond_signal(&pool->available_connections);
91101
}
92102

103+
void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) {
104+
g_mutex_lock(&pool->mutex);
105+
pool->ref_count++;
106+
g_mutex_unlock(&pool->mutex);
107+
}
108+
109+
void ds3_connection_pool_dec_ref(ds3_connection_pool* pool) {
110+
g_mutex_lock(&pool->mutex);
111+
pool->ref_count--;
112+
113+
if (pool->ref_count == 0) {
114+
ds3_connection_pool_clear(pool, True);
115+
g_free(pool);
116+
} else {
117+
g_mutex_unlock(&pool->mutex);
118+
}
119+
}

src/ds3_connection.h

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ extern "C" {
2727
#include <curl/curl.h>
2828
#include <glib.h>
2929

30-
#define CONNECTION_POOL_SIZE 100
30+
#define CONNECTION_POOL_SIZE 10
3131

3232
typedef GMutex ds3_mutex;
3333
typedef GCond ds3_condition;
@@ -36,19 +36,25 @@ typedef CURL ds3_connection;
3636

3737
//-- Opaque struct
3838
struct _ds3_connection_pool{
39-
ds3_connection* connections[CONNECTION_POOL_SIZE];
40-
int head;
41-
int tail;
42-
ds3_mutex mutex;
43-
ds3_condition available_connections;
39+
ds3_connection** connections;
40+
uint16_t num_connections;
41+
int head;
42+
int tail;
43+
ds3_mutex mutex;
44+
ds3_condition available_connections;
45+
uint16_t ref_count;
4446
};
4547

4648
ds3_connection_pool* ds3_connection_pool_init(void);
47-
void ds3_connection_pool_clear(ds3_connection_pool* pool);
49+
ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size);
50+
void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked);
4851

4952
ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool);
5053
void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* handle);
5154

55+
void ds3_connection_pool_inc_ref(ds3_connection_pool* pool);
56+
void ds3_connection_pool_dec_ref(ds3_connection_pool* pool);
57+
5258
#ifdef __cplusplus
5359
}
5460
#endif

src/ds3_net.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,9 +398,12 @@ ds3_error* net_process_request(const ds3_client* client,
398398
url = g_strconcat(client->endpoint->value, request->path->value,"?",query_params, NULL);
399399
g_free(query_params);
400400
}
401+
ds3_log_message(client->log, DS3_DEBUG, "URL[%s]", url);
401402

402403
while (retry_count < client->num_redirects) {
403-
handle = (CURL*)ds3_connection_acquire(client->connection_pool);
404+
ds3_log_message(client->log, DS3_DEBUG, "Acquiring connection...");
405+
handle = ds3_connection_acquire(client->connection_pool);
406+
ds3_log_message(client->log, DS3_DEBUG, "Connection acquired.");
404407

405408
if (handle) {
406409
char* amz_headers;
@@ -509,14 +512,18 @@ ds3_error* net_process_request(const ds3_client* client,
509512

510513
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
511514

515+
ds3_log_message(client->log, DS3_DEBUG, "Attempt curl_easy_perform...");
512516
res = curl_easy_perform(handle);
517+
ds3_log_message(client->log, DS3_DEBUG, "curl_easy_perform done.");
513518

514519
g_free(date);
515520
g_free(date_header);
516521
g_free(signature);
517522
g_free(auth_header);
518523
curl_slist_free_all(headers);
524+
ds3_log_message(client->log, DS3_DEBUG, "Releasing connection...");
519525
ds3_connection_release(client->connection_pool, handle);
526+
ds3_log_message(client->log, DS3_DEBUG, "Connection released.");
520527

521528
//process the response
522529
if (res != CURLE_OK) {

src/ds3_string.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ ds3_str* ds3_str_init_with_size(const char* string, size_t size) {
3030
}
3131

3232
ds3_str* ds3_str_dup(const ds3_str* string) {
33+
if (string == NULL) {
34+
return NULL;
35+
}
3336
ds3_str* str = g_new0(ds3_str, 1);
3437
str->value = g_strndup(string->value, string->size);
3538
str->size = string->size;

test/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ else(WIN32) # POSIX
5959
link_directories("${PROJECT_SOURCE_DIR}/../install/lib")
6060
include_directories("${PROJECT_SOURCE_DIR}/../install/include")
6161

62+
set(CMAKE_BUILD_TYPE Release)
63+
#set(CMAKE_BUILD_TYPE Debug)
6264
endif(WIN32)
6365

6466
add_executable(ds3_c_tests
@@ -74,6 +76,7 @@ add_executable(ds3_c_tests
7476
negative_tests.cpp
7577
search_tests.cpp
7678
service_tests.cpp
79+
connection_tests.cpp
7780
test.cpp)
7881

7982
add_test(regression_tests ds3_c_tests)

0 commit comments

Comments
 (0)