Skip to content

Commit 2c86955

Browse files
author
DenverM80
committed
WIP; ds3_connection_pool queue
1 parent 46b661b commit 2c86955

File tree

3 files changed

+61
-23
lines changed

3 files changed

+61
-23
lines changed

src/ds3_connection.c

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,33 @@ ds3_connection_pool* ds3_connection_pool_init(void) {
2727
return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE);
2828
}
2929

30+
void _ds3_queue_init(ds3_connection_pool* pool) {
31+
int index;
32+
pool->queue = g_new0(int, pool->size);
33+
for (index = 0; index < pool->size; index++) {
34+
pool->queue[index] = index; // init to default
35+
}
36+
}
37+
38+
void _ds3_queue_free(ds3_connection_pool* pool) {
39+
g_free(pool->queue);
40+
}
41+
3042
ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) {
3143
printf("ds3_connection_pool_init_with_size(%u)\n", pool_size);
3244
ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1);
3345
pool->connections = g_new0(ds3_connection*, pool_size);
34-
pool->num_connections = pool_size;
46+
pool->size = pool_size;
47+
48+
_ds3_queue_init(pool);
49+
3550
g_mutex_init(&pool->mutex);
3651
g_cond_init(&pool->available_connections);
3752
pool->ref_count = 1;
3853
return pool;
3954
}
4055

4156
void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked) {
42-
printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked"));
4357
int index;
4458

4559
if (pool == NULL) {
@@ -49,63 +63,73 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke
4963
if (already_locked == False) {
5064
g_mutex_lock(&pool->mutex);
5165
}
66+
printf("ds3_connection_pool_clear(%s)\n", (already_locked ? "locked" : "not locked"));
5267

53-
for (index = 0; index < pool->num_connections; index++) {
68+
for (index = 0; index < pool->size; index++) {
5469
if (pool->connections[index] != NULL) {
5570
curl_easy_cleanup(pool->connections[index]);
5671
}
5772
}
5873

74+
_ds3_queue_free(pool);
75+
5976
g_free(pool->connections);
6077
g_mutex_unlock(&pool->mutex);
6178
g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined
6279
g_cond_clear(&pool->available_connections);
6380
}
6481

65-
static int _pool_inc(int index, uint16_t num_connections) {
66-
printf("_pool_inc(%d, %u) :[%d]\n", index, num_connections, (index+1) % num_connections);
67-
return (index+1) % num_connections;
82+
static int _queue_inc(int index, uint16_t size) {
83+
printf("_pool_inc(%d, %u) :[%d]\n", index, size, (index+1) % size);
84+
return (index+1) % size;
6885
}
6986

70-
static int _pool_full(ds3_connection_pool* pool) {
71-
printf("_pool_full(): head[%d] tail[%d] : [%d]\n", pool->head, pool->tail, (_pool_inc(pool->head, pool->num_connections) == pool->tail) );
72-
return (_pool_inc(pool->head, pool->num_connections) == pool->tail);
87+
static int _queue_full(ds3_connection_pool* pool) {
88+
return (_pool_inc(pool->head, pool->size) == pool->tail);
7389
}
7490

7591
ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {
76-
printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail);
7792
ds3_connection* connection = NULL;
93+
int next_connection_index;
7894

7995
g_mutex_lock(&pool->mutex);
96+
printf("ds3_connection_acquire() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail);
8097
while (_pool_full(pool)) {
8198
g_cond_wait(&pool->available_connections, &pool->mutex);
8299
}
83100

84-
if (pool->connections[pool->head] == NULL) {
101+
next_connection_index = pool->queue[pool->head];
102+
if (pool->connections[next_connection_index] == NULL) {
103+
connection = g_new0(ds3_connection, 1);
85104
connection = curl_easy_init();
86105

87-
pool->connections[pool->head] = connection;
106+
pool->connections[next_connection_index] = connection;
88107
} else {
89-
connection = pool->connections[pool->head];
108+
connection = pool->connections[next_connection_index];
90109
}
91-
pool->head = _pool_inc(pool->head, pool->num_connections);
110+
pool->head = _pool_inc(pool->head, pool->size);
92111

112+
printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail);
93113
g_mutex_unlock(&pool->mutex);
94114

95-
printf("ds3_connection_acquire() END: head[%d] tail[%d]\n", pool->head, pool->tail);
96115
return connection;
97116
}
98117

99118
void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) {
100-
printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail);
119+
int tail_connection_index;
120+
101121
g_mutex_lock(&pool->mutex);
122+
printf("ds3_connection_release() BEGIN: head[%d] tail[%d]\n", pool->head, pool->tail);
102123

103124
curl_easy_reset(connection);
104-
pool->tail = _pool_inc(pool->tail, pool->num_connections);
125+
tail_connection_index = pool->queue[pool->tail];
126+
127+
pool->connections[tail_connection_index] = connection;
128+
pool->tail = _pool_inc(pool->tail, pool->size);
105129

130+
printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail);
106131
g_mutex_unlock(&pool->mutex);
107132
g_cond_signal(&pool->available_connections);
108-
printf("ds3_connection_release() END: head[%d] tail[%d]\n", pool->head, pool->tail);
109133
}
110134

111135
void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) {

src/ds3_connection.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ typedef CURL ds3_connection;
3737
//-- Opaque struct
3838
struct _ds3_connection_pool{
3939
ds3_connection** connections;
40-
uint16_t num_connections;
40+
int* queue;
41+
uint16_t size;
4142
int head;
4243
int tail;
4344
ds3_mutex mutex;

test/test.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,31 @@ BOOST_GLOBAL_FIXTURE( BoostTestFixture );
4747

4848
void log_timestamp(char* string_buff, long buff_size)
4949
{
50-
51-
g_snprintf(string_buff, buff_size, "%s", );
50+
time_t ltime;
51+
struct tm result;
52+
struct timeval tv;
53+
char usec_buff[8];
54+
55+
gettimeofday(&tv, NULL);
56+
millisec = lrint(tv.tv_usec/1000.0); // Round to nearest millisec
57+
58+
ltime = time(NULL);
59+
localtime_r(&ltime, &result);
60+
61+
strftime(string_buff, buff_size, "%Y:%m:%dT%H:%M:%S", tm);
62+
strcat(string_buff, ".");
63+
sprintf(usec_buff,"%d", (int)tmnow.tv_usec);
64+
strcat(string_buff, usec_buff);
5265
}
5366

5467
void test_log(const char* message, void* user_data) {
5568
char timebuffer[32];
5669
log_timestamp(timebuffer, 32);
5770
if (user_data) {
5871
int client_num = *((int*)user_data);
59-
fprintf(stderr, "%s Client[%d] %s\n", client_num, message);
72+
fprintf(stderr, "%s Client[%d] %s\n", timebuffer, client_num, message);
6073
} else {
61-
fprintf(stderr, "%s %s\n", message);
74+
fprintf(stderr, "%s %s\n", timebuffer, message);
6275
}
6376
}
6477

0 commit comments

Comments
 (0)