From f5c247b027c99bb9c3d0a6e61641732dd6ce5b75 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 10:29:54 -0500 Subject: [PATCH 1/9] priority queue to array of priorities --- dttools/src/priority_queue.c | 168 +++++++++++++++++++++++++++++------ dttools/src/priority_queue.h | 25 ++++-- 2 files changed, 159 insertions(+), 34 deletions(-) diff --git a/dttools/src/priority_queue.c b/dttools/src/priority_queue.c index bf77142d20..078119b4ec 100644 --- a/dttools/src/priority_queue.c +++ b/dttools/src/priority_queue.c @@ -5,18 +5,25 @@ See the file COPYING for details. */ #include "priority_queue.h" +#include "debug.h" #include #include #include #include #include +#include #define DEFAULT_CAPACITY 127 struct element { void *data; - double priority; // In this implementation, elements with bigger priorities are considered to be privileged. + + // Priorities ar keps in this array, compared in order. + // For two elements e1 and e2, e1 is considered to have higher priority than e2 if + // e1->priority[0] > e2->priority[0] or e1->priority[0] == e2->priority[0] and e1->priority[1] > e2->priority[1] or ... + // Larger numbers have higher priority. + double *priority; }; struct priority_queue { @@ -24,6 +31,9 @@ struct priority_queue { int capacity; struct element **elements; + int priority_count; // Number of priorities per element. Priorities are compared + // in the order priority[] -> priority[1] -> ... -> priority[priority_count-1] + /* The following three cursors are used to iterate over the elements in the numerical order they are stored in the array, which is different from the order of priorities. Each of them has different concerns when traverse the queue Though the typical priority-based traversal is done by the repeated invocation of priority_queue_peek_top and priority_queue_pop APIs, rather than using any cursors. */ @@ -34,6 +44,57 @@ struct priority_queue { /****** Static Methods ******/ +static struct element *element_create(void *data, int priority_count, const double *priority) +{ + struct element *e = (struct element *)malloc(sizeof(struct element)); + + e->data = data; + + if (priority) { + e->priority = (double *)malloc(priority_count * sizeof(double)); + if (!e->priority) { + free(e); + return NULL; + } + memcpy(e->priority, priority, priority_count * sizeof(double)); + } else { + e->priority = NULL; + } + + return e; +} + +static void element_delete(struct element *e) +{ + free(e->priority); + free(e); +} + +static int cmp_left_right(struct priority_queue *pq, int left_idx, int right_idx) +{ + double *left = pq->elements[left_idx]->priority; + double *right = pq->elements[right_idx]->priority; + + for (int i = 0; i < pq->priority_count; i++) { + if (left[i] < right[i]) { + return -1; + } + + if (left[i] > right[i]) { + return 1; + } + } + + // If all priorities are equal, return 0 + return 0; +} + +#define LE(p, l, r) (cmp_left_right(p, l, r) <= 0) +#define LT(p, l, r) (cmp_left_right(p, l, r) < 0) +#define GT(p, l, r) (cmp_left_right(p, l, r) > 0) +#define GE(p, l, r) (cmp_left_right(p, l, r) >= 0) +#define EQ(p, l, r) (cmp_left_right(p, l, r) == 0) + static void swap_elements(struct priority_queue *pq, int i, int j) { struct element *temp = pq->elements[i]; @@ -47,7 +108,7 @@ static int swim(struct priority_queue *pq, int k) return 1; } - while (k > 0 && pq->elements[(k - 1) / 2]->priority <= pq->elements[k]->priority) { + while (k > 0 && LE(pq, (k - 1) / 2, k)) { swap_elements(pq, k, (k - 1) / 2); k = (k - 1) / 2; } @@ -63,12 +124,14 @@ static int sink(struct priority_queue *pq, int k) while (2 * k + 1 < pq->size) { int j = 2 * k + 1; - if (j + 1 < pq->size && pq->elements[j]->priority <= pq->elements[j + 1]->priority) { + if (j + 1 < pq->size && LE(pq, j, j + 1)) { j++; } - if (pq->elements[k]->priority >= pq->elements[j]->priority) { + + if (GE(pq, k, j)) { break; } + swap_elements(pq, k, j); k = j; } @@ -99,7 +162,7 @@ static int priority_queue_double_capacity(struct priority_queue *pq) /****** External Methods ******/ -struct priority_queue *priority_queue_create(int init_capacity) +struct priority_queue *priority_queue_create(int init_capacity, int priority_count) { struct priority_queue *pq = (struct priority_queue *)malloc(sizeof(struct priority_queue)); if (!pq) { @@ -110,17 +173,23 @@ struct priority_queue *priority_queue_create(int init_capacity) init_capacity = DEFAULT_CAPACITY; } + if (priority_count < 1) { + fatal("Priority count must be at least 1.\n"); + return NULL; + } + + pq->elements = (struct element **)calloc(init_capacity, sizeof(struct element *)); if (!pq->elements) { - free(pq); - fprintf(stderr, "Fatal error: Memory allocation failed.\n"); - exit(EXIT_FAILURE); + fatal("Priority queue memory allocation failed.\n"); return NULL; } pq->capacity = init_capacity; pq->size = 0; + pq->priority_count = priority_count; + pq->static_cursor = 0; pq->base_cursor = 0; pq->rotate_cursor = 0; @@ -137,7 +206,8 @@ int priority_queue_size(struct priority_queue *pq) return pq->size; } -int priority_queue_push(struct priority_queue *pq, void *data, double priority) +/** Push to the queue without duplicating priotiry array. */ +int push_internal(struct priority_queue *pq, void *data, double *priority) { if (!pq) { return -1; @@ -148,11 +218,11 @@ int priority_queue_push(struct priority_queue *pq, void *data, double priority) return -1; } } - struct element *e = (struct element *)malloc(sizeof(struct element)); + + struct element *e = element_create(data, pq->priority_count, NULL); if (!e) { return -1; } - e->data = data; e->priority = priority; pq->elements[pq->size++] = e; @@ -167,6 +237,52 @@ int priority_queue_push(struct priority_queue *pq, void *data, double priority) return new_idx; } +int priority_queue_push(struct priority_queue *pq, void *data, const double *priority) +{ + if (!pq) { + return -1; + } + + double *priority_copy = (double *)malloc(pq->priority_count * sizeof(double)); + if (!priority_copy) { + return -1; + } + + memcpy(priority_copy, priority, pq->priority_count * sizeof(double)); + return push_internal(pq, data, priority_copy); +} + + + +int priority_queue_push_varargs(struct priority_queue *pq, void *data, ...) +{ + if (!pq) { + return -1; + } + + // Allocate array for priorities + double *priority = (double *)malloc(pq->priority_count * sizeof(double)); + if (!priority) { + return -1; + } + + // Collect variable arguments into the priority array + va_list args; + va_start(args, data); + for (int i = 0; i < pq->priority_count; i++) { + priority[i] = va_arg(args, double); + } + va_end(args); + + // Call the standard push function + int result = push_internal(pq, data, priority); + + // Clean up + free(priority); + + return result; +} + void *priority_queue_pop(struct priority_queue *pq) { if (!pq || pq->size == 0) { @@ -192,13 +308,13 @@ void *priority_queue_peek_top(struct priority_queue *pq) return pq->elements[0]->data; } -double priority_queue_get_priority_at(struct priority_queue *pq, int idx) +double priority_queue_get_priority_at(struct priority_queue *pq, int priority_idx, int element_index) { - if (!pq || pq->size < 1 || idx < 0 || idx > pq->size - 1) { + if (!pq || pq->size < 1 || element_index < 0 || element_index > pq->size - 1 || priority_idx < 0 || priority_idx >= pq->priority_count) { return 0; } - return pq->elements[idx]->priority; + return pq->elements[element_index]->priority[priority_idx]; } double priority_queue_get_top_priority(struct priority_queue *pq) @@ -207,7 +323,7 @@ double priority_queue_get_top_priority(struct priority_queue *pq) return 0; } - return pq->elements[0]->priority; + return pq->elements[0]->priority[0]; } void *priority_queue_peek_at(struct priority_queue *pq, int idx) @@ -219,7 +335,7 @@ void *priority_queue_peek_at(struct priority_queue *pq, int idx) return pq->elements[idx]->data; } -int priority_queue_update_priority(struct priority_queue *pq, void *data, double new_priority) +int priority_queue_update_priority(struct priority_queue *pq, void *data, int priority_idx, double new_priority) { if (!pq) { return -1; @@ -233,13 +349,12 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, double } } - /* If the data isn’t already in the queue, enqueue it. */ if (idx == -1) { - return priority_queue_push(pq, data, new_priority); + return -1; } - double old_priority = pq->elements[idx]->priority; - pq->elements[idx]->priority = new_priority; + double old_priority = pq->elements[idx]->priority[priority_idx]; + pq->elements[idx]->priority[priority_idx] = new_priority; int new_idx = -1; @@ -346,19 +461,14 @@ int priority_queue_remove(struct priority_queue *pq, int idx) struct element *to_delete = pq->elements[idx]; struct element *last_elem = pq->elements[pq->size - 1]; - double old_priority = to_delete->priority; - double new_priority = last_elem->priority; - - free(to_delete); - pq->size--; if (idx != pq->size) { pq->elements[idx] = last_elem; pq->elements[pq->size] = NULL; - if (new_priority > old_priority) { + if (GT(pq, idx, pq->size -1)) { swim(pq, idx); - } else if (new_priority < old_priority) { + } else if (LT(pq, idx, pq->size -1)) { sink(pq, idx); } } else { @@ -380,6 +490,8 @@ int priority_queue_remove(struct priority_queue *pq, int idx) priority_queue_rotate_reset(pq); } + element_delete(to_delete); + return 1; } @@ -391,7 +503,7 @@ void priority_queue_delete(struct priority_queue *pq) for (int i = 0; i < pq->size; i++) { if (pq->elements[i]) { - free(pq->elements[i]); + element_delete(pq->elements[i]); } } free(pq->elements); diff --git a/dttools/src/priority_queue.h b/dttools/src/priority_queue.h index 6d624b3d3e..4843375ca2 100644 --- a/dttools/src/priority_queue.h +++ b/dttools/src/priority_queue.h @@ -92,9 +92,10 @@ PRIORITY_QUEUE_BASE_ITERATE (pq, idx, data, iter_count, iter_depth) { /** Create a new priority queue. Element with a higher priority is at the top of the heap. @param init_capacity The initial number of elements in the queue. If zero, a default value will be used. +@param priority_count The number of priorities per element. @return A pointer to a new priority queue. */ -struct priority_queue *priority_queue_create(int init_capacity); +struct priority_queue *priority_queue_create(int init_capacity, int priority_count); /** Count the elements in a priority queue. @param pq A pointer to a priority queue. @@ -104,12 +105,22 @@ int priority_queue_size(struct priority_queue *pq); /** Push an element into a priority queue. The standard push operation. New elements are placed lower than existing elements of the same priority +The priorities are specified as a double array, with the length of the array being the number of priorities per element. @param pq A pointer to a priority queue. @param data A pointer to store in the queue. -@param priority The specified priority with the given object. +@param priority The specified priorities with the given object. @return The idex of data if the push succeeded, -1 on failure. */ -int priority_queue_push(struct priority_queue *pq, void *data, double priority); +int priority_queue_push(struct priority_queue *pq, void *data, const double *priority); + +/** Push an element into a priority queue. +The standard push operation. New elements are placed lower than existing elements of the same priority. +@param pq A pointer to a priority queue. +@param data A pointer to store in the queue. +@param ... The variable arguments list of priorities, as doubles. +@return The idex of data if the push succeeded, -1 on failure. +*/ +int priority_queue_push_varargs(struct priority_queue *pq, void *data, ...); /** Pop the element with the highest priority from a priority queue. @param pq A pointer to a priority queue. @@ -134,10 +145,11 @@ void *priority_queue_peek_at(struct priority_queue *pq, int index); /** Get the priority of an element at a specified index. @param pq A pointer to a priority queue. -@param index The index of the element. +@param priority_idx The index of the priority. +@param element_index The index of the element. @return The priority of the element if any, NAN on failure. */ -double priority_queue_get_priority_at(struct priority_queue *pq, int index); +double priority_queue_get_priority_at(struct priority_queue *pq, int priority_idx, int element_index); /** Get the priority of the top element in a priority queue. @param pq A pointer to a priority queue. @@ -148,10 +160,11 @@ double priority_queue_get_top_priority(struct priority_queue *pq); /** Update the priority of an element in a priority queue. @param pq A pointer to a priority queue. @param data The pointer to the element to update. +@param priority_idx The index of the priority to update. @param new_priority The new priority of the element. @return The new index if the update succeeded, -1 on failure. */ -int priority_queue_update_priority(struct priority_queue *pq, void *data, double new_priority); +int priority_queue_update_priority(struct priority_queue *pq, void *data, int priority_idx, double new_priority); /** Find the index of an element in a priority queue. @param pq A pointer to a priority queue. From fe8a8a7f173c7aa97937fd90460c08dae05b8c9d Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 10:48:10 -0500 Subject: [PATCH 2/9] fix memory issues --- dttools/src/priority_queue.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/dttools/src/priority_queue.c b/dttools/src/priority_queue.c index 078119b4ec..fd2332d1fe 100644 --- a/dttools/src/priority_queue.c +++ b/dttools/src/priority_queue.c @@ -275,12 +275,8 @@ int priority_queue_push_varargs(struct priority_queue *pq, void *data, ...) va_end(args); // Call the standard push function - int result = push_internal(pq, data, priority); - - // Clean up - free(priority); - - return result; + // Note: push_internal takes ownership of the priority array, so we don't free it here + return push_internal(pq, data, priority); } void *priority_queue_pop(struct priority_queue *pq) @@ -294,7 +290,7 @@ void *priority_queue_pop(struct priority_queue *pq) pq->elements[0] = pq->elements[--pq->size]; pq->elements[pq->size] = NULL; sink(pq, 0); - free(e); + element_delete(e); return data; } From 679b79eb1faeb156ecbaf7ab605dcd43047f2fd7 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 10:48:22 -0500 Subject: [PATCH 3/9] fix example --- dttools/src/priority_queue_test.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/dttools/src/priority_queue_test.c b/dttools/src/priority_queue_test.c index dea803a7db..a0f1e22c52 100644 --- a/dttools/src/priority_queue_test.c +++ b/dttools/src/priority_queue_test.c @@ -10,7 +10,7 @@ See the file COPYING for details. int main() { - struct priority_queue *pq = priority_queue_create(2); + struct priority_queue *pq = priority_queue_create(2, 1); if (!pq) { fprintf(stderr, "Failed to create priority queue.\n"); return EXIT_FAILURE; @@ -22,7 +22,7 @@ int main() // Insert elements printf("Inserting elements:\n"); for (int i = 0; i < 6; i++) { - int idx = priority_queue_push(pq, data[i], priorities[i]); + int idx = priority_queue_push_varargs(pq, data[i], priorities[i]); if (idx >= 0) { printf("Inserted '%s' with priority %.1f at index %d\n", data[i], priorities[i], idx); } else { @@ -42,7 +42,7 @@ int main() printf("\nIterating over the priority queue using PRIORITY_QUEUE_BASE_ITERATE:\n"); PRIORITY_QUEUE_BASE_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); } @@ -74,7 +74,7 @@ int main() } // Update the priority of an element - int update_idx = priority_queue_update_priority(pq, "Task A", 9.0); + int update_idx = priority_queue_update_priority(pq, "Task A", 0, 9.0); printf("\nUpdating the priority of 'Task A' to 9.0:\n"); if (update_idx >= 0) { printf("Task A new index after priority update: %d\n", update_idx); @@ -83,7 +83,7 @@ int main() } // Insert an element - int ins_idx = priority_queue_push(pq, "Task G", 11.0); + int ins_idx = priority_queue_push_varargs(pq, "Task G", 11.0); printf("\nInserting Task G with priority 11.0:\n"); if (ins_idx >= 0) { printf("Inserted Task G at index %d\n", ins_idx); @@ -98,7 +98,7 @@ int main() iter_depth = priority_queue_size(pq); PRIORITY_QUEUE_BASE_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); } @@ -110,7 +110,7 @@ int main() printf("\nIterating over the priority queue using PRIORITY_QUEUE_ROTATE_ITERATE with a depth %d:\n", iter_depth); PRIORITY_QUEUE_ROTATE_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); // The break check must go after the task is considered, as the rotate cursor is advanced in the macro and must be considered } @@ -121,7 +121,7 @@ int main() printf("\nReset the rotate cursor and Iterate from beginning with a depth %d:\n", iter_depth); PRIORITY_QUEUE_ROTATE_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); } @@ -133,7 +133,7 @@ int main() printf("\nIterating over the priority queue using PRIORITY_QUEUE_STATIC_ITERATE with a depth %d:\n", iter_depth); PRIORITY_QUEUE_STATIC_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); } iter_count = 0; @@ -141,7 +141,7 @@ int main() printf("Continue iterating from the last position with a depth %d\n", iter_depth); PRIORITY_QUEUE_STATIC_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); } @@ -158,14 +158,14 @@ int main() printf("\nIterating over the priority queue using PRIORITY_QUEUE_BASE_ITERATE:\n"); PRIORITY_QUEUE_BASE_ITERATE(pq, idx, item, iter_count, iter_depth) { - double prio = priority_queue_get_priority_at(pq, idx); + double prio = priority_queue_get_priority_at(pq, 0, idx); printf("Index: %d, Element: %s, Priority: %.1f\n", idx, item, prio); } // Pop elements from the priority queue using priority_queue_pop printf("\nPopping elements from the priority queue:\n"); while ((item = (char *)priority_queue_peek_top(pq)) != NULL) { - printf("Popped element: %s Priority: %d\n", item, (int)priority_queue_get_priority_at(pq, 0)); + printf("Popped element: %s Priority: %d\n", item, (int)priority_queue_get_priority_at(pq, 0, 0)); priority_queue_pop(pq); } From 8b285fca47ba6c060c1b282dca76babf0fb06b8e Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 12:25:04 -0500 Subject: [PATCH 4/9] update taskvine --- taskvine/src/manager/vine_manager.c | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index e4ce14d328..41eae4fddf 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -121,6 +121,13 @@ See the file COPYING for details. /* Default timeout for slow workers to come back to the pool, can be set prior to creating a manager. */ double vine_option_blocklist_slow_workers_timeout = 900; +typedef enum { + VINE_PRIORITY_DEFAULT = 0, + VINE_PRIORITY_EXHAUSTION = 2 << 4, + VINE_PRIORITY_RECOVERY = 2 << 8, + VINE_PRIORITY_HIGHEST = 2 << 30, +} vine_priority_type_t; + /* Forward prototypes for functions that are called out of order. */ /* Many of these should be removed if forward declaration is not needed. */ @@ -3684,11 +3691,11 @@ static int send_one_task(struct vine_manager *q, int *tasks_ready_left_to_consid case VINE_APP_FAILURE: case VINE_WORKER_FAILURE: /* failed to dispatch, commit put the task back in the right place. */ - break; - case VINE_MGR_FAILURE: - /* special case, commit had a chained failure. */ - priority_queue_push(q->ready_tasks, t, t->priority); - break; + break; + case VINE_MGR_FAILURE: + /* special case, commit had a chained failure. */ + priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_DEFAULT,t->priority); + break; case VINE_END_OF_LIST: /* shouldn't happen, keep going */ break; @@ -4125,7 +4132,10 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->next_task_id = 1; q->fixed_location_in_queue = 0; - q->ready_tasks = priority_queue_create(0); + /* Each tasks has two priority. First is the priority that taskvine itself assigns and + it is always VINE_PRIORITY_DEFAULT under normal operation. The second is the one the + user assigns. */ + q->ready_tasks = priority_queue_create(0, 2); q->running_table = itable_create(0); q->waiting_retrieval_list = list_create(); q->retrieved_list = list_create(); @@ -4672,9 +4682,9 @@ static void push_task_to_ready_tasks(struct vine_manager *q, struct vine_task *t * as possible among those with the same priority. This avoids * the issue in which all 'big' tasks fail because the first * allocation is too small. */ - priority_queue_push(q->ready_tasks, t, t->priority + 1); + priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_EXHAUSTION, t->priority); } else { - priority_queue_push(q->ready_tasks, t, t->priority); + priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_DEFAULT, t->priority); } /* If the task has been used before, clear out accumulated state. */ From b7fe955e0a02e32e8b5e249b2d0d5a157610455c Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 12:26:44 -0500 Subject: [PATCH 5/9] lint --- dttools/src/priority_queue.c | 11 ++++------- taskvine/src/manager/vine_manager.c | 12 ++++++------ 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/dttools/src/priority_queue.c b/dttools/src/priority_queue.c index fd2332d1fe..db66835c75 100644 --- a/dttools/src/priority_queue.c +++ b/dttools/src/priority_queue.c @@ -31,8 +31,8 @@ struct priority_queue { int capacity; struct element **elements; - int priority_count; // Number of priorities per element. Priorities are compared - // in the order priority[] -> priority[1] -> ... -> priority[priority_count-1] + int priority_count; // Number of priorities per element. Priorities are compared + // in the order priority[] -> priority[1] -> ... -> priority[priority_count-1] /* The following three cursors are used to iterate over the elements in the numerical order they are stored in the array, which is different from the order of priorities. Each of them has different concerns when traverse the queue Though the typical priority-based @@ -178,7 +178,6 @@ struct priority_queue *priority_queue_create(int init_capacity, int priority_cou return NULL; } - pq->elements = (struct element **)calloc(init_capacity, sizeof(struct element *)); if (!pq->elements) { fatal("Priority queue memory allocation failed.\n"); @@ -252,8 +251,6 @@ int priority_queue_push(struct priority_queue *pq, void *data, const double *pri return push_internal(pq, data, priority_copy); } - - int priority_queue_push_varargs(struct priority_queue *pq, void *data, ...) { if (!pq) { @@ -462,9 +459,9 @@ int priority_queue_remove(struct priority_queue *pq, int idx) pq->elements[idx] = last_elem; pq->elements[pq->size] = NULL; - if (GT(pq, idx, pq->size -1)) { + if (GT(pq, idx, pq->size - 1)) { swim(pq, idx); - } else if (LT(pq, idx, pq->size -1)) { + } else if (LT(pq, idx, pq->size - 1)) { sink(pq, idx); } } else { diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 41eae4fddf..e03d51ca6e 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -3691,11 +3691,11 @@ static int send_one_task(struct vine_manager *q, int *tasks_ready_left_to_consid case VINE_APP_FAILURE: case VINE_WORKER_FAILURE: /* failed to dispatch, commit put the task back in the right place. */ - break; - case VINE_MGR_FAILURE: - /* special case, commit had a chained failure. */ - priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_DEFAULT,t->priority); - break; + break; + case VINE_MGR_FAILURE: + /* special case, commit had a chained failure. */ + priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_DEFAULT, t->priority); + break; case VINE_END_OF_LIST: /* shouldn't happen, keep going */ break; @@ -4133,7 +4133,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->fixed_location_in_queue = 0; /* Each tasks has two priority. First is the priority that taskvine itself assigns and - it is always VINE_PRIORITY_DEFAULT under normal operation. The second is the one the + it is always VINE_PRIORITY_DEFAULT under normal operation. The second is the one the user assigns. */ q->ready_tasks = priority_queue_create(0, 2); q->running_table = itable_create(0); From 14d0e1b6185be1166c3de2286938074bb86dae50 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 12:39:27 -0500 Subject: [PATCH 6/9] update doc comment --- dttools/src/priority_queue.h | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/dttools/src/priority_queue.h b/dttools/src/priority_queue.h index 4843375ca2..f0a443819c 100644 --- a/dttools/src/priority_queue.h +++ b/dttools/src/priority_queue.h @@ -54,15 +54,28 @@ The fifth time - [(5, "e"), (4, "d"), (2, "b"), (1, "a"), (3, "c")] As seen, the iteration order of elements is not the same as the priority order. +Further, the priority queue can be used to store elements with multiple priorities. +Each element can have an array of priorities, allowing for multi-level priority ordering. +Priorities are compared lexicographically: priority[0] is compared first; if equal, priority[1] +is compared; if equal, priority[2] is compared, and so on. This allows for sophisticated +tie-breaking schemes where higher-indexed priorities serve as secondary criteria. + +For example, with 3 priority levels: +- Element A with priorities [5.0, 3.0, 7.0] +- Element B with priorities [5.0, 4.0, 1.0] +The comparison checks priority[0]: both are 5.0 (equal), so it moves to priority[1]. +Since 3.0 < 4.0, element B has higher overall priority. Note that priority[2] is never examined. + An example to create a priority queue and manipulate its elements:
 struct priority_queue *pq;
-pq = priority_queue_create(10);
+pq = priority_queue_create(10, 2); // intial space for 10 elements and 2 priorities per element
 
-int priority = 5;
+int priority0 = 5;
+int priority1 = 3;
 void *data = someDataPointer;
 
-priority_queue_push(pq, data, priority);
+priority_queue_push_varargs(pq, data, priority0, priority1);
 data = priority_queue_pop(pq);
 void *headData = priority_queue_peek_top(pq);
 
From be3595d24030eb5648faa02a9fa77ec75e11982b Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 12:45:29 -0500 Subject: [PATCH 7/9] fix recovery priority --- taskvine/src/manager/vine_manager.c | 30 ++++++++++------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index e03d51ca6e..e9302348a4 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -4676,17 +4676,18 @@ char *vine_monitor_wrap(struct vine_manager *q, struct vine_worker_info *w, stru static void push_task_to_ready_tasks(struct vine_manager *q, struct vine_task *t) { - if (t->result == VINE_RESULT_RESOURCE_EXHAUSTION) { - /* when a task is resubmitted given resource exhaustion, we - * increment its priority by 1, so it gets to run as soon - * as possible among those with the same priority. This avoids - * the issue in which all 'big' tasks fail because the first - * allocation is too small. */ - priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_EXHAUSTION, t->priority); - } else { - priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_DEFAULT, t->priority); + /* when a task is resubmitted given resource exhaustion, or as a recovery task, + its priority is increased by one level to allow them to run as soon as possible. */ + + double priority = VINE_PRIORITY_DEFAULT; + if (t->type == VINE_TASK_TYPE_RECOVERY) { + priority = VINE_PRIORITY_RECOVERY; + } else if (t->result == VINE_RESULT_RESOURCE_EXHAUSTION) { + priority = VINE_PRIORITY_EXHAUSTION; } + priority_queue_push_varargs(q->ready_tasks, t, priority, t->priority); + /* If the task has been used before, clear out accumulated state. */ vine_task_clean(t); } @@ -4870,17 +4871,6 @@ int vine_submit(struct vine_manager *q, struct vine_task *t) /* Issue warnings if the files are set up strangely. */ vine_task_check_consistency(t); - /* The goal of a recovery task is to reproduce a lost temp file, which serves as input to another task. - * Recovery tasks should therefore be prioritized ahead of all other tasks. - * If a recovery task is not runnable due to its own missing inputs, we submit additional recovery tasks to restore those files. - * Each of these later-submitted recovery tasks is assigned a higher priority than all currently existing tasks, - * so they are considered first. This ensures that the most recent recovery tasks have the best chance to run and succeed. - * Additionally, we incorporate the priority of the original task, so not all recovery tasks receive the same priority, - * this distinction is important when many files are lost and the workflow is effectively rerun from scratch. */ - if (t->type == VINE_TASK_TYPE_RECOVERY) { - vine_task_set_priority(t, t->priority + priority_queue_get_top_priority(q->ready_tasks) + 1); - } - if (t->has_fixed_locations) { q->fixed_location_in_queue++; vine_task_set_scheduler(t, VINE_SCHEDULE_FILES); From cb33da4ca2f9057b22865de672ab99f51bbbb903 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Wed, 5 Nov 2025 16:25:59 -0500 Subject: [PATCH 8/9] do not use malloc --- dttools/src/priority_queue.c | 139 ++++++++++----------------- dttools/src/priority_queue.h | 17 +--- dttools/src/priority_queue_test.c | 6 +- taskvine/src/manager/vine_manager.c | 6 +- taskvine/src/manager/vine_schedule.c | 2 +- 5 files changed, 61 insertions(+), 109 deletions(-) diff --git a/dttools/src/priority_queue.c b/dttools/src/priority_queue.c index db66835c75..0db95bd4aa 100644 --- a/dttools/src/priority_queue.c +++ b/dttools/src/priority_queue.c @@ -13,17 +13,25 @@ See the file COPYING for details. #include #include #include +#include #define DEFAULT_CAPACITY 127 +/* This is the maximum number of priorities defined in struct element. */ +#define MAX_PRIORITY_COUNT 3 + struct element { void *data; - // Priorities ar keps in this array, compared in order. + // Priorities are stored as three separate variables, compared in order. // For two elements e1 and e2, e1 is considered to have higher priority than e2 if - // e1->priority[0] > e2->priority[0] or e1->priority[0] == e2->priority[0] and e1->priority[1] > e2->priority[1] or ... + // e1->priority_0 > e2->priority_0 or e1->priority_0 == e2->priority_0 and e1->priority_1 > e2->priority_1 or ... // Larger numbers have higher priority. - double *priority; + + // WARNING: This attributes should be the last defined in this struct. + double priority_0; + double priority_1; + double priority_2; }; struct priority_queue { @@ -32,7 +40,7 @@ struct priority_queue { struct element **elements; int priority_count; // Number of priorities per element. Priorities are compared - // in the order priority[] -> priority[1] -> ... -> priority[priority_count-1] + // in the order priority0 -> priority1 -> ... /* The following three cursors are used to iterate over the elements in the numerical order they are stored in the array, which is different from the order of priorities. Each of them has different concerns when traverse the queue Though the typical priority-based @@ -43,44 +51,20 @@ struct priority_queue { }; /****** Static Methods ******/ - -static struct element *element_create(void *data, int priority_count, const double *priority) -{ - struct element *e = (struct element *)malloc(sizeof(struct element)); - - e->data = data; - - if (priority) { - e->priority = (double *)malloc(priority_count * sizeof(double)); - if (!e->priority) { - free(e); - return NULL; - } - memcpy(e->priority, priority, priority_count * sizeof(double)); - } else { - e->priority = NULL; - } - - return e; -} - -static void element_delete(struct element *e) -{ - free(e->priority); - free(e); -} - static int cmp_left_right(struct priority_queue *pq, int left_idx, int right_idx) { - double *left = pq->elements[left_idx]->priority; - double *right = pq->elements[right_idx]->priority; + struct element *left = pq->elements[left_idx]; + struct element *right = pq->elements[right_idx]; + // Compare priorities in order for (int i = 0; i < pq->priority_count; i++) { - if (left[i] < right[i]) { + double left_priority = *((double *)((char *)left + offsetof(struct element, priority_0) + i * sizeof(double))); + double right_priority = *((double *)((char *)right + offsetof(struct element, priority_0) + i * sizeof(double))); + + if (left_priority < right_priority) { return -1; } - - if (left[i] > right[i]) { + if (left_priority > right_priority) { return 1; } } @@ -173,8 +157,8 @@ struct priority_queue *priority_queue_create(int init_capacity, int priority_cou init_capacity = DEFAULT_CAPACITY; } - if (priority_count < 1) { - fatal("Priority count must be at least 1.\n"); + if (priority_count < 1 || priority_count > MAX_PRIORITY_COUNT) { + fatal("Priority count must be at least 1 and at most %d.\n", MAX_PRIORITY_COUNT); return NULL; } @@ -205,8 +189,7 @@ int priority_queue_size(struct priority_queue *pq) return pq->size; } -/** Push to the queue without duplicating priotiry array. */ -int push_internal(struct priority_queue *pq, void *data, double *priority) +int priority_queue_push(struct priority_queue *pq, void *data, ...) { if (!pq) { return -1; @@ -218,11 +201,20 @@ int push_internal(struct priority_queue *pq, void *data, double *priority) } } - struct element *e = element_create(data, pq->priority_count, NULL); + struct element *e = (struct element *)calloc(1, sizeof(struct element)); if (!e) { return -1; } - e->priority = priority; + + e->data = data; + + // Collect variable arguments for the three priorities + va_list args; + va_start(args, data); + for (int i = 0; i < pq->priority_count; i++) { + *((double *)((char *)e + offsetof(struct element, priority_0) + i * sizeof(double))) = va_arg(args, double); + } + va_end(args); pq->elements[pq->size++] = e; @@ -236,46 +228,6 @@ int push_internal(struct priority_queue *pq, void *data, double *priority) return new_idx; } -int priority_queue_push(struct priority_queue *pq, void *data, const double *priority) -{ - if (!pq) { - return -1; - } - - double *priority_copy = (double *)malloc(pq->priority_count * sizeof(double)); - if (!priority_copy) { - return -1; - } - - memcpy(priority_copy, priority, pq->priority_count * sizeof(double)); - return push_internal(pq, data, priority_copy); -} - -int priority_queue_push_varargs(struct priority_queue *pq, void *data, ...) -{ - if (!pq) { - return -1; - } - - // Allocate array for priorities - double *priority = (double *)malloc(pq->priority_count * sizeof(double)); - if (!priority) { - return -1; - } - - // Collect variable arguments into the priority array - va_list args; - va_start(args, data); - for (int i = 0; i < pq->priority_count; i++) { - priority[i] = va_arg(args, double); - } - va_end(args); - - // Call the standard push function - // Note: push_internal takes ownership of the priority array, so we don't free it here - return push_internal(pq, data, priority); -} - void *priority_queue_pop(struct priority_queue *pq) { if (!pq || pq->size == 0) { @@ -287,7 +239,7 @@ void *priority_queue_pop(struct priority_queue *pq) pq->elements[0] = pq->elements[--pq->size]; pq->elements[pq->size] = NULL; sink(pq, 0); - element_delete(e); + free(e); return data; } @@ -307,7 +259,8 @@ double priority_queue_get_priority_at(struct priority_queue *pq, int priority_id return 0; } - return pq->elements[element_index]->priority[priority_idx]; + struct element *e = pq->elements[element_index]; + return *((double *)((char *)e + offsetof(struct element, priority_0) + priority_idx * sizeof(double))); } double priority_queue_get_top_priority(struct priority_queue *pq) @@ -316,7 +269,8 @@ double priority_queue_get_top_priority(struct priority_queue *pq) return 0; } - return pq->elements[0]->priority[0]; + struct element *e = pq->elements[0]; + return *((double *)((char *)e + offsetof(struct element, priority_0) + 0 * sizeof(double))); } void *priority_queue_peek_at(struct priority_queue *pq, int idx) @@ -346,8 +300,15 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, int pr return -1; } - double old_priority = pq->elements[idx]->priority[priority_idx]; - pq->elements[idx]->priority[priority_idx] = new_priority; + struct element *e = pq->elements[idx]; + + // Get old priority and set new priority based on priority_idx + if (priority_idx < 0 || priority_idx >= pq->priority_count) { + return -1; + } + + double old_priority = *((double *)((char *)e + offsetof(struct element, priority_0) + priority_idx * sizeof(double))); + *((double *)((char *)e + offsetof(struct element, priority_0) + priority_idx * sizeof(double))) = new_priority; int new_idx = -1; @@ -483,7 +444,7 @@ int priority_queue_remove(struct priority_queue *pq, int idx) priority_queue_rotate_reset(pq); } - element_delete(to_delete); + free(to_delete); return 1; } @@ -496,7 +457,7 @@ void priority_queue_delete(struct priority_queue *pq) for (int i = 0; i < pq->size; i++) { if (pq->elements[i]) { - element_delete(pq->elements[i]); + free(pq->elements[i]); } } free(pq->elements); diff --git a/dttools/src/priority_queue.h b/dttools/src/priority_queue.h index f0a443819c..837a5b5e53 100644 --- a/dttools/src/priority_queue.h +++ b/dttools/src/priority_queue.h @@ -75,7 +75,7 @@ int priority0 = 5; int priority1 = 3; void *data = someDataPointer; -priority_queue_push_varargs(pq, data, priority0, priority1); +priority_queue_push(pq, data, priority0, priority1); data = priority_queue_pop(pq); void *headData = priority_queue_peek_top(pq); @@ -116,24 +116,15 @@ struct priority_queue *priority_queue_create(int init_capacity, int priority_cou */ int priority_queue_size(struct priority_queue *pq); -/** Push an element into a priority queue. -The standard push operation. New elements are placed lower than existing elements of the same priority -The priorities are specified as a double array, with the length of the array being the number of priorities per element. -@param pq A pointer to a priority queue. -@param data A pointer to store in the queue. -@param priority The specified priorities with the given object. -@return The idex of data if the push succeeded, -1 on failure. -*/ -int priority_queue_push(struct priority_queue *pq, void *data, const double *priority); - /** Push an element into a priority queue. The standard push operation. New elements are placed lower than existing elements of the same priority. +Takes three priority values as variable arguments. @param pq A pointer to a priority queue. @param data A pointer to store in the queue. -@param ... The variable arguments list of priorities, as doubles. +@param ... The priority values (priority_0, priority_1, ...) as doubles. @return The idex of data if the push succeeded, -1 on failure. */ -int priority_queue_push_varargs(struct priority_queue *pq, void *data, ...); +int priority_queue_push(struct priority_queue *pq, void *data, ...); /** Pop the element with the highest priority from a priority queue. @param pq A pointer to a priority queue. diff --git a/dttools/src/priority_queue_test.c b/dttools/src/priority_queue_test.c index a0f1e22c52..a55f2912cb 100644 --- a/dttools/src/priority_queue_test.c +++ b/dttools/src/priority_queue_test.c @@ -10,7 +10,7 @@ See the file COPYING for details. int main() { - struct priority_queue *pq = priority_queue_create(2, 1); + struct priority_queue *pq = priority_queue_create(2, 3); if (!pq) { fprintf(stderr, "Failed to create priority queue.\n"); return EXIT_FAILURE; @@ -22,7 +22,7 @@ int main() // Insert elements printf("Inserting elements:\n"); for (int i = 0; i < 6; i++) { - int idx = priority_queue_push_varargs(pq, data[i], priorities[i]); + int idx = priority_queue_push(pq, data[i], priorities[i], 0.0, 0.0); if (idx >= 0) { printf("Inserted '%s' with priority %.1f at index %d\n", data[i], priorities[i], idx); } else { @@ -83,7 +83,7 @@ int main() } // Insert an element - int ins_idx = priority_queue_push_varargs(pq, "Task G", 11.0); + int ins_idx = priority_queue_push(pq, "Task G", 11.0, 0.0, 0.0); printf("\nInserting Task G with priority 11.0:\n"); if (ins_idx >= 0) { printf("Inserted Task G at index %d\n", ins_idx); diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index e9302348a4..212cd95d34 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -3694,7 +3694,7 @@ static int send_one_task(struct vine_manager *q, int *tasks_ready_left_to_consid break; case VINE_MGR_FAILURE: /* special case, commit had a chained failure. */ - priority_queue_push_varargs(q->ready_tasks, t, VINE_PRIORITY_DEFAULT, t->priority); + priority_queue_push(q->ready_tasks, t, VINE_PRIORITY_DEFAULT, t->priority); break; case VINE_END_OF_LIST: /* shouldn't happen, keep going */ @@ -4134,7 +4134,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert /* Each tasks has two priority. First is the priority that taskvine itself assigns and it is always VINE_PRIORITY_DEFAULT under normal operation. The second is the one the - user assigns. */ + user assigns. The third priority is unused and set to 0.0. */ q->ready_tasks = priority_queue_create(0, 2); q->running_table = itable_create(0); q->waiting_retrieval_list = list_create(); @@ -4686,7 +4686,7 @@ static void push_task_to_ready_tasks(struct vine_manager *q, struct vine_task *t priority = VINE_PRIORITY_EXHAUSTION; } - priority_queue_push_varargs(q->ready_tasks, t, priority, t->priority); + priority_queue_push(q->ready_tasks, t, priority, t->priority); /* If the task has been used before, clear out accumulated state. */ vine_task_clean(t); diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c index 0168c3a979..5261e7b1ee 100644 --- a/taskvine/src/manager/vine_schedule.c +++ b/taskvine/src/manager/vine_schedule.c @@ -361,7 +361,7 @@ struct vine_worker_info *vine_schedule_task_to_worker(struct vine_manager *q, st } /* first sort by the strategy-specific criterion, then run @check_worker_against_task on the sorted list */ - struct priority_queue *workers = priority_queue_create(0); + struct priority_queue *workers = priority_queue_create(0, 1); if (!workers) { return NULL; } From 94e84a8943356148bb7c4d80e566a3154262c8bf Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 6 Nov 2025 08:41:43 -0500 Subject: [PATCH 9/9] cleanup example --- dttools/src/priority_queue_test.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dttools/src/priority_queue_test.c b/dttools/src/priority_queue_test.c index a55f2912cb..0ef0b0fd8a 100644 --- a/dttools/src/priority_queue_test.c +++ b/dttools/src/priority_queue_test.c @@ -10,7 +10,7 @@ See the file COPYING for details. int main() { - struct priority_queue *pq = priority_queue_create(2, 3); + struct priority_queue *pq = priority_queue_create(0, 1); if (!pq) { fprintf(stderr, "Failed to create priority queue.\n"); return EXIT_FAILURE; @@ -22,7 +22,7 @@ int main() // Insert elements printf("Inserting elements:\n"); for (int i = 0; i < 6; i++) { - int idx = priority_queue_push(pq, data[i], priorities[i], 0.0, 0.0); + int idx = priority_queue_push(pq, data[i], priorities[i]); if (idx >= 0) { printf("Inserted '%s' with priority %.1f at index %d\n", data[i], priorities[i], idx); } else { @@ -83,7 +83,7 @@ int main() } // Insert an element - int ins_idx = priority_queue_push(pq, "Task G", 11.0, 0.0, 0.0); + int ins_idx = priority_queue_push(pq, "Task G", 11.0); printf("\nInserting Task G with priority 11.0:\n"); if (ins_idx >= 0) { printf("Inserted Task G at index %d\n", ins_idx);