diff --git a/dttools/src/itable.c b/dttools/src/itable.c index 73f697a1a8..3b45194ef7 100644 --- a/dttools/src/itable.c +++ b/dttools/src/itable.c @@ -6,12 +6,14 @@ See the file COPYING for details. */ #include "itable.h" +#include "debug.h" #include #include #define DEFAULT_SIZE 127 -#define DEFAULT_LOAD 0.75 +#define DEFAULT_MAX_LOAD 0.75 +#define DEFAULT_MIN_LOAD 0.125 struct entry { UINT64_T key; @@ -25,6 +27,14 @@ struct itable { struct entry **buckets; int ibucket; struct entry *ientry; + + /* for memory safety, itable_nextkey cannot be called in the same + * iteration if itable_insert or itable_remove has been called. + * In such case, the executable will be terminated with a fatal message. + * If the table should be modified during iterations, consider + * using the array keys from itable_keys_array. (If so, remember + * to free it afterwards with itable_free_keys_array.) */ + int cant_iterate_yet; }; struct itable *itable_create(int bucket_count) @@ -46,6 +56,7 @@ struct itable *itable_create(int bucket_count) } h->size = 0; + h->cant_iterate_yet = 0; return h; } @@ -69,6 +80,9 @@ void itable_clear(struct itable *h, void (*delete_func)(void *)) for (i = 0; i < h->bucket_count; i++) { h->buckets[i] = 0; } + + /* buckets went away, thus a nextkey would be invalid */ + h->cant_iterate_yet = 1; } void itable_delete(struct itable *h) @@ -78,11 +92,53 @@ void itable_delete(struct itable *h) free(h); } +UINT64_T *itable_keys_array(struct itable *h) +{ + UINT64_T *keys = (UINT64_T *)malloc(sizeof(int) * h->size); + int ikey = 0; + + struct entry *e, *f; + int i; + + for (i = 0; i < h->bucket_count; i++) { + e = h->buckets[i]; + while (e) { + keys[ikey] = e->key; + ikey++; + f = e->next; + e = f; + } + } + + return keys; +} + +void itable_free_keys_array(UINT64_T *keys) +{ + free(keys); +} + int itable_size(struct itable *h) { return h->size; } +double itable_load(struct itable *h) +{ + return (double)h->size / h->bucket_count; +} + +static int insert_to_buckets_aux(struct entry **buckets, int bucket_count, struct entry *new_entry) +{ + unsigned index; + index = new_entry->key % bucket_count; + + // Possible memory leak! Silently replacing value if it existed. + new_entry->next = buckets[index]; + buckets[index] = new_entry; + return 1; +} + void *itable_lookup(struct itable *h, UINT64_T key) { struct entry *e; @@ -103,73 +159,96 @@ void *itable_lookup(struct itable *h, UINT64_T key) static int itable_double_buckets(struct itable *h) { - struct itable *hn = itable_create(2 * h->bucket_count); - - if (!hn) + int new_count = (2 * (h->bucket_count + 1)) - 1; + struct entry **new_buckets = (struct entry **)calloc(new_count, sizeof(struct entry *)); + if (!new_buckets) { return 0; + } - /* Move pairs to new hash */ - uint64_t key; - void *value; - itable_firstkey(h); - while (itable_nextkey(h, &key, &value)) - if (!itable_insert(hn, key, value)) { - itable_delete(hn); - return 0; - } - - /* Delete all old pairs */ struct entry *e, *f; - int i; - for (i = 0; i < h->bucket_count; i++) { + for (int i = 0; i < h->bucket_count; i++) { e = h->buckets[i]; while (e) { f = e->next; - free(e); + e->next = NULL; + insert_to_buckets_aux(new_buckets, new_count, e); e = f; } } /* Make the old point to the new */ free(h->buckets); - h->buckets = hn->buckets; - h->bucket_count = hn->bucket_count; - h->size = hn->size; + h->buckets = new_buckets; + h->bucket_count = new_count; - /* Delete reference to new, so old is safe */ - free(hn); + /* structure of itable changed completely, thus a nextkey would be incorrect. */ + h->cant_iterate_yet = 1; return 1; } int itable_insert(struct itable *h, UINT64_T key, const void *value) { - struct entry *e; - UINT64_T index; - - if (((float)h->size / h->bucket_count) > DEFAULT_LOAD) + if (((float)h->size / h->bucket_count) > DEFAULT_MAX_LOAD) itable_double_buckets(h); - index = key % h->bucket_count; - e = h->buckets[index]; + struct entry *new_entry = (struct entry *)malloc(sizeof(struct entry)); + if (!new_entry) + return 0; - while (e) { - if (key == e->key) { - e->value = (void *)value; - return 1; - } - e = e->next; + new_entry->key = key; + new_entry->value = (void *)value; + + int inserted = insert_to_buckets_aux(h->buckets, h->bucket_count, new_entry); + if (inserted) { + h->size++; + /* inserting cause different behaviours with nextkey (e.g., sometimes the new + * key would be included or skipped in the iteration */ + h->cant_iterate_yet = 1; + + return 1; } - e = (struct entry *)malloc(sizeof(struct entry)); - if (!e) + return 0; +} + +static int itable_reduce_buckets(struct itable *h) +{ + int new_count = ((h->bucket_count + 1) / 2) - 1; + + /* DEFAULT_SIZE is the minimum size */ + if (new_count < DEFAULT_SIZE) { + return 1; + } + + /* Table cannot be reduced above DEFAULT_MAX_LOAD */ + if (((float)h->size / new_count) > DEFAULT_MAX_LOAD) { + return 1; + } + + struct entry **new_buckets = (struct entry **)calloc(new_count, sizeof(struct entry *)); + if (!new_buckets) { return 0; + } - e->key = key; - e->value = (void *)value; - e->next = h->buckets[index]; - h->buckets[index] = e; - h->size++; + struct entry *e, *f; + for (int i = 0; i < h->bucket_count; i++) { + e = h->buckets[i]; + while (e) { + f = e->next; + e->next = NULL; + insert_to_buckets_aux(new_buckets, new_count, e); + e = f; + } + } + + /* Make the old point to the new */ + free(h->buckets); + h->buckets = new_buckets; + h->bucket_count = new_count; + + /* structure of itable changed completely, thus a nextkey would be incorrect. */ + h->cant_iterate_yet = 1; return 1; } @@ -194,6 +273,12 @@ void *itable_remove(struct itable *h, UINT64_T key) value = e->value; free(e); h->size--; + h->cant_iterate_yet = 1; + + if (((float)h->size / h->bucket_count) < DEFAULT_MIN_LOAD) { + itable_reduce_buckets(h); + } + return value; } f = e; @@ -218,6 +303,8 @@ void *itable_pop(struct itable *t) void itable_firstkey(struct itable *h) { + h->cant_iterate_yet = 0; + h->ientry = 0; for (h->ibucket = 0; h->ibucket < h->bucket_count; h->ibucket++) { h->ientry = h->buckets[h->ibucket]; @@ -228,6 +315,10 @@ void itable_firstkey(struct itable *h) int itable_nextkey(struct itable *h, UINT64_T *key, void **value) { + if (h->cant_iterate_yet) { + fatal("cctools bug: the itable iteration has not been reset since last modification"); + } + if (h->ientry) { *key = h->ientry->key; if (value) diff --git a/dttools/src/itable.h b/dttools/src/itable.h index 10c8bb259f..5a8f96c4a8 100644 --- a/dttools/src/itable.h +++ b/dttools/src/itable.h @@ -69,6 +69,24 @@ Note that this function will not delete all of the objects contained within the void itable_delete(struct itable *h); + +/** Return an array with all the current keys. +It is the responsibility of the caller to free this array with +itable_free_keys_array. +@param h A pointer to a hash table. +@return An array of all the current keys. +*/ + +UINT64_T *itable_keys_array(struct itable *h); + + +/** Free an array generated from itable_free_keys_array. +@param keys An array of all the keys. +*/ + +void itable_free_keys_array(UINT64_T *keys); + + /** Count the entries in an integer table. @return The number of entries in the table. @param h A pointer to an integer table. @@ -76,9 +94,16 @@ void itable_delete(struct itable *h); int itable_size(struct itable *h); +/** Get the proportion of elements vs buckets in the table. +@return The load of the table. +@param h A pointer to an integer table. +*/ + +double itable_load(struct itable *h); + + /** Insert a key and value. -This call will fail if the table already contains the same key. -You must call @ref itable_remove to remove it. +This call will replace the value if it already contains the same key. Also note that you cannot insert a null value into the table. @param h A pointer to an integer table. @param key An integer key diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index d285d5bb4c..0bd1644bf4 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -747,17 +747,21 @@ static void handle_failed_library_process(struct vine_process *p, struct link *m /* Forsake the tasks that are running on this library */ /* It no available libraries on this worker, tasks waiting for this library will be forsaken */ - struct vine_process *p_running; - uint64_t task_id; + uint64_t *task_ids = itable_keys_array(procs_running); + int total_procs = itable_size(procs_running); + + for (int i = 0; i < total_procs; i++) { + uint64_t task_id = task_ids[i]; + struct vine_process *p_running = itable_lookup(procs_running, task_id); - ITABLE_ITERATE(procs_running, task_id, p_running) - { if (p_running->library_process == p) { debug(D_VINE, "killing function task %d running on library task %d", (int)task_id, p->task->task_id); finish_running_task(p_running, VINE_RESULT_FORSAKEN); reap_process(p_running, manager); } } + + itable_free_keys_array(task_ids); } /* @@ -1104,14 +1108,15 @@ then we need to abort to clean things up. static void kill_all_tasks() { - struct vine_process *p; - uint64_t task_id; + uint64_t *task_ids = itable_keys_array(procs_table); + int total_tasks = itable_size(procs_table); - ITABLE_ITERATE(procs_table, task_id, p) - { - do_kill(task_id); + for (int i = 0; i < total_tasks; i++) { + do_kill(task_ids[i]); } + itable_free_keys_array(task_ids); + assert(itable_size(procs_table) == 0); assert(itable_size(procs_running) == 0); assert(itable_size(procs_complete) == 0);