Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ static int release_worker(struct vine_manager *q, struct vine_worker_info *w);

struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);

static void shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f);
static void clean_redundant_replicas(struct vine_manager *q, struct vine_file *f);

/* Return the number of workers matching a given type: WORKER, STATUS, etc */

static int count_workers(struct vine_manager *q, vine_worker_type_t type)
Expand Down Expand Up @@ -422,6 +425,10 @@ static vine_msg_code_t handle_cache_update(struct vine_manager *q, struct vine_w
if (f->type == VINE_TEMP && *id == 'X' && q->temp_replica_count > 1) {
hash_table_insert(q->temp_files_to_replicate, f->cached_name, NULL);
}

if (q->shift_disk_load) {
shift_disk_load(q, w, f);
}
}
}

Expand Down Expand Up @@ -650,6 +657,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
itable_remove(q->running_table, t->task_id);
vine_task_set_result(t, task_status);

/* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed.
* However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called.
* So, we only clean up redundant replicas for the task-inputs when the manager is configured to clean redundant replicas. */
if (q->clean_redundant_replicas) {
struct vine_mount *input_mount;
LIST_ITERATE(t->input_mounts, input_mount)
{
if (input_mount->file && input_mount->file->type == VINE_TEMP) {
clean_redundant_replicas(q, input_mount->file);
}
}
}

return VINE_SUCCESS;
}

Expand Down Expand Up @@ -1001,6 +1021,128 @@ static int enforce_worker_eviction_interval(struct vine_manager *q)
return 1;
}

/**
Shift a temp file replica away from the worker using the most cache space.
This function looks for an alternative worker that can accept the file immediately
so that the original replica can be cleaned up later by @clean_redundant_replicas().
*/
static void shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f)
{
if (!q || !source_worker || !f) {
return;
}

if (f->type != VINE_TEMP) {
return;
}

struct vine_worker_info *target_worker = NULL;

char *key;
struct vine_worker_info *w = NULL;
HASH_TABLE_ITERATE(q->worker_table, key, w)
{
if (w->type != VINE_WORKER_TYPE_WORKER) {
continue;
}
if (!w->transfer_port_active) {
continue;
}
if (w->draining) {
continue;
}
if (w->resources->tag < 0) {
continue;
}
if (vine_file_replica_table_lookup(w, f->cached_name)) {
continue;
}
if (w->inuse_cache + f->size > (source_worker->inuse_cache - f->size) * 0.8) {
continue;
}
if (!target_worker || w->inuse_cache < target_worker->inuse_cache) {
target_worker = w;
}
}
if (target_worker) {
char *source_addr = string_format("%s/%s", source_worker->transfer_url, f->cached_name);
vine_manager_put_url_now(q, target_worker, source_worker, source_addr, f);
free(source_addr);
}

/* We can clean up the original one safely when the replica arrives at the destination worker. */
clean_redundant_replicas(q, f);
}

/** Clean redundant replicas of a temporary file.
For example, a file may be transferred to another worker because a task that declares it
as input is scheduled there, resulting in an extra replica that consumes storage space.
This function evaluates whether the file has excessive replicas and removes those on
workers that do not execute their dependent tasks. */
static void clean_redundant_replicas(struct vine_manager *q, struct vine_file *f)
{
if (!f || f->type != VINE_TEMP) {
return;
}

struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name);
if (!source_workers) {
/* no surprise - a cache-update message may trigger a file deletion. */
return;
}
int excess_replicas = set_size(source_workers) - q->temp_replica_count;
if (excess_replicas <= 0) {
return;
}
/* Note that this replica may serve as a source for a peer transfer. If it is unlinked prematurely,
* the corresponding transfer could fail and leave a task without its required data.
* Therefore, we must wait until all replicas are confirmed ready before proceeding. */
if (vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY) != set_size(source_workers)) {
return;
}

struct priority_queue *clean_replicas_from_workers = priority_queue_create(0);

struct vine_worker_info *source_worker = NULL;
SET_ITERATE(source_workers, source_worker)
{
/* if the file is actively in use by a task (the input to that task), we don't remove the replica on this worker */
int file_inuse = 0;

uint64_t task_id;
struct vine_task *task;
ITABLE_ITERATE(source_worker->current_tasks, task_id, task)
{
struct vine_mount *input_mount;
LIST_ITERATE(task->input_mounts, input_mount)
{
if (f == input_mount->file) {
file_inuse = 1;
break;
}
}
if (file_inuse) {
break;
}
}

if (file_inuse) {
continue;
}

priority_queue_push(clean_replicas_from_workers, source_worker, source_worker->inuse_cache);
}

source_worker = NULL;
while (excess_replicas > 0 && (source_worker = priority_queue_pop(clean_replicas_from_workers))) {
delete_worker_file(q, source_worker, f->cached_name, 0, 0);
excess_replicas--;
}
priority_queue_delete(clean_replicas_from_workers);

return;
}

/* Remove all tasks and other associated state from a given worker. */
static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
{
Expand Down Expand Up @@ -4219,6 +4361,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;

q->temp_replica_count = 1;
q->clean_redundant_replicas = 0;
q->shift_disk_load = 0;
q->transfer_temps_recovery = 0;
q->transfer_replica_per_cycle = 10;

Expand Down Expand Up @@ -5980,6 +6124,12 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "enforce-worker-eviction-interval")) {
q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND);

} else if (!strcmp(name, "clean-redundant-replicas")) {
q->clean_redundant_replicas = !!((int)value);

} else if (!strcmp(name, "shift-disk-load")) {
q->shift_disk_load = !!((int)value);

} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ struct vine_manager {
int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */
int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */
int temp_replica_count; /* Number of replicas per temp file */
int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */
int shift_disk_load; /* If true, shift storage burden to more available workers to minimize disk usage peaks. */

double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */
double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */
Expand Down