Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 133 additions & 42 deletions dttools/src/itable.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ See the file COPYING for details.
*/

#include "itable.h"
#include "debug.h"

#include <stdlib.h>
#include <string.h>

#define DEFAULT_SIZE 127
#define DEFAULT_LOAD 0.75
#define DEFAULT_MAX_LOAD 0.75
#define DEFAULT_MIN_LOAD 0.125

struct entry {
UINT64_T key;
Expand All @@ -25,6 +27,14 @@ struct itable {
struct entry **buckets;
int ibucket;
struct entry *ientry;

/* for memory safety, itable_nextkey cannot be called in the same
* iteration if itable_insert or itable_remove has been called.
* In such case, the executable will be terminated with a fatal message.
* If the table should be modified during iterations, consider
* using the array keys from itable_keys_array. (If so, remember
* to free it afterwards with itable_free_keys_array.) */
int cant_iterate_yet;
};

struct itable *itable_create(int bucket_count)
Expand All @@ -46,6 +56,7 @@ struct itable *itable_create(int bucket_count)
}

h->size = 0;
h->cant_iterate_yet = 0;

return h;
}
Expand All @@ -69,6 +80,9 @@ void itable_clear(struct itable *h, void (*delete_func)(void *))
for (i = 0; i < h->bucket_count; i++) {
h->buckets[i] = 0;
}

/* buckets went away, thus a nextkey would be invalid */
h->cant_iterate_yet = 1;
}

void itable_delete(struct itable *h)
Expand All @@ -78,11 +92,53 @@ void itable_delete(struct itable *h)
free(h);
}

UINT64_T *itable_keys_array(struct itable *h)
{
UINT64_T *keys = (UINT64_T *)malloc(sizeof(int) * h->size);
int ikey = 0;

struct entry *e, *f;
int i;

for (i = 0; i < h->bucket_count; i++) {
e = h->buckets[i];
while (e) {
keys[ikey] = e->key;
ikey++;
f = e->next;
e = f;
}
}

return keys;
}

void itable_free_keys_array(UINT64_T *keys)
{
free(keys);
}

int itable_size(struct itable *h)
{
return h->size;
}

double itable_load(struct itable *h)
{
return (double)h->size / h->bucket_count;
}

static int insert_to_buckets_aux(struct entry **buckets, int bucket_count, struct entry *new_entry)
{
unsigned index;
index = new_entry->key % bucket_count;

// Possible memory leak! Silently replacing value if it existed.
new_entry->next = buckets[index];
buckets[index] = new_entry;
return 1;
}

void *itable_lookup(struct itable *h, UINT64_T key)
{
struct entry *e;
Expand All @@ -103,73 +159,96 @@ void *itable_lookup(struct itable *h, UINT64_T key)

static int itable_double_buckets(struct itable *h)
{
struct itable *hn = itable_create(2 * h->bucket_count);

if (!hn)
int new_count = (2 * (h->bucket_count + 1)) - 1;
struct entry **new_buckets = (struct entry **)calloc(new_count, sizeof(struct entry *));
if (!new_buckets) {
return 0;
}

/* Move pairs to new hash */
uint64_t key;
void *value;
itable_firstkey(h);
while (itable_nextkey(h, &key, &value))
if (!itable_insert(hn, key, value)) {
itable_delete(hn);
return 0;
}

/* Delete all old pairs */
struct entry *e, *f;
int i;
for (i = 0; i < h->bucket_count; i++) {
for (int i = 0; i < h->bucket_count; i++) {
e = h->buckets[i];
while (e) {
f = e->next;
free(e);
e->next = NULL;
insert_to_buckets_aux(new_buckets, new_count, e);
e = f;
}
}

/* Make the old point to the new */
free(h->buckets);
h->buckets = hn->buckets;
h->bucket_count = hn->bucket_count;
h->size = hn->size;
h->buckets = new_buckets;
h->bucket_count = new_count;

/* Delete reference to new, so old is safe */
free(hn);
/* structure of itable changed completely, thus a nextkey would be incorrect. */
h->cant_iterate_yet = 1;

return 1;
}

int itable_insert(struct itable *h, UINT64_T key, const void *value)
{
struct entry *e;
UINT64_T index;

if (((float)h->size / h->bucket_count) > DEFAULT_LOAD)
if (((float)h->size / h->bucket_count) > DEFAULT_MAX_LOAD)
itable_double_buckets(h);

index = key % h->bucket_count;
e = h->buckets[index];
struct entry *new_entry = (struct entry *)malloc(sizeof(struct entry));
if (!new_entry)
return 0;

while (e) {
if (key == e->key) {
e->value = (void *)value;
return 1;
}
e = e->next;
new_entry->key = key;
new_entry->value = (void *)value;

int inserted = insert_to_buckets_aux(h->buckets, h->bucket_count, new_entry);
if (inserted) {
h->size++;
/* inserting cause different behaviours with nextkey (e.g., sometimes the new
* key would be included or skipped in the iteration */
h->cant_iterate_yet = 1;

return 1;
}

e = (struct entry *)malloc(sizeof(struct entry));
if (!e)
return 0;
}

static int itable_reduce_buckets(struct itable *h)
{
int new_count = ((h->bucket_count + 1) / 2) - 1;

/* DEFAULT_SIZE is the minimum size */
if (new_count < DEFAULT_SIZE) {
return 1;
}

/* Table cannot be reduced above DEFAULT_MAX_LOAD */
if (((float)h->size / new_count) > DEFAULT_MAX_LOAD) {
return 1;
}

struct entry **new_buckets = (struct entry **)calloc(new_count, sizeof(struct entry *));
if (!new_buckets) {
return 0;
}

e->key = key;
e->value = (void *)value;
e->next = h->buckets[index];
h->buckets[index] = e;
h->size++;
struct entry *e, *f;
for (int i = 0; i < h->bucket_count; i++) {
e = h->buckets[i];
while (e) {
f = e->next;
e->next = NULL;
insert_to_buckets_aux(new_buckets, new_count, e);
e = f;
}
}

/* Make the old point to the new */
free(h->buckets);
h->buckets = new_buckets;
h->bucket_count = new_count;

/* structure of itable changed completely, thus a nextkey would be incorrect. */
h->cant_iterate_yet = 1;

return 1;
}
Expand All @@ -194,6 +273,12 @@ void *itable_remove(struct itable *h, UINT64_T key)
value = e->value;
free(e);
h->size--;
h->cant_iterate_yet = 1;

if (((float)h->size / h->bucket_count) < DEFAULT_MIN_LOAD) {
itable_reduce_buckets(h);
}

return value;
}
f = e;
Expand All @@ -218,6 +303,8 @@ void *itable_pop(struct itable *t)

void itable_firstkey(struct itable *h)
{
h->cant_iterate_yet = 0;

h->ientry = 0;
for (h->ibucket = 0; h->ibucket < h->bucket_count; h->ibucket++) {
h->ientry = h->buckets[h->ibucket];
Expand All @@ -228,6 +315,10 @@ void itable_firstkey(struct itable *h)

int itable_nextkey(struct itable *h, UINT64_T *key, void **value)
{
if (h->cant_iterate_yet) {
fatal("cctools bug: the itable iteration has not been reset since last modification");
}

if (h->ientry) {
*key = h->ientry->key;
if (value)
Expand Down
29 changes: 27 additions & 2 deletions dttools/src/itable.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,41 @@ Note that this function will not delete all of the objects contained within the

void itable_delete(struct itable *h);


/** Return an array with all the current keys.
It is the responsibility of the caller to free this array with
itable_free_keys_array.
@param h A pointer to a hash table.
@return An array of all the current keys.
*/

UINT64_T *itable_keys_array(struct itable *h);


/** Free an array generated from itable_free_keys_array.
@param keys An array of all the keys.
*/

void itable_free_keys_array(UINT64_T *keys);


/** Count the entries in an integer table.
@return The number of entries in the table.
@param h A pointer to an integer table.
*/

int itable_size(struct itable *h);

/** Get the proportion of elements vs buckets in the table.
@return The load of the table.
@param h A pointer to an integer table.
*/

double itable_load(struct itable *h);


/** Insert a key and value.
This call will fail if the table already contains the same key.
You must call @ref itable_remove to remove it.
This call will replace the value if it already contains the same key.
Also note that you cannot insert a null value into the table.
@param h A pointer to an integer table.
@param key An integer key
Expand Down
23 changes: 14 additions & 9 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,17 +747,21 @@ static void handle_failed_library_process(struct vine_process *p, struct link *m
/* Forsake the tasks that are running on this library */
/* It no available libraries on this worker, tasks waiting for this library will be forsaken */

struct vine_process *p_running;
uint64_t task_id;
uint64_t *task_ids = itable_keys_array(procs_running);
int total_procs = itable_size(procs_running);

for (int i = 0; i < total_procs; i++) {
uint64_t task_id = task_ids[i];
struct vine_process *p_running = itable_lookup(procs_running, task_id);

ITABLE_ITERATE(procs_running, task_id, p_running)
{
if (p_running->library_process == p) {
debug(D_VINE, "killing function task %d running on library task %d", (int)task_id, p->task->task_id);
finish_running_task(p_running, VINE_RESULT_FORSAKEN);
reap_process(p_running, manager);
}
}

itable_free_keys_array(task_ids);
}

/*
Expand Down Expand Up @@ -1104,14 +1108,15 @@ then we need to abort to clean things up.

static void kill_all_tasks()
{
struct vine_process *p;
uint64_t task_id;
uint64_t *task_ids = itable_keys_array(procs_table);
int total_tasks = itable_size(procs_table);

ITABLE_ITERATE(procs_table, task_id, p)
{
do_kill(task_id);
for (int i = 0; i < total_tasks; i++) {
do_kill(task_ids[i]);
}

itable_free_keys_array(task_ids);

assert(itable_size(procs_table) == 0);
assert(itable_size(procs_running) == 0);
assert(itable_size(procs_complete) == 0);
Expand Down
Loading