Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ static int release_worker(struct vine_manager *q, struct vine_worker_info *w);

struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);

static void clean_redundant_replicas(struct vine_manager *q, struct vine_file *f);

/* Return the number of workers matching a given type: WORKER, STATUS, etc */

static int count_workers(struct vine_manager *q, vine_worker_type_t type)
Expand Down Expand Up @@ -650,6 +652,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
itable_remove(q->running_table, t->task_id);
vine_task_set_result(t, task_status);

/* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed.
* However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called.
* So, we only clean up redundant replicas for the task-inputs when the manager is configured to do so. */
if (q->clean_redundant_replicas) {
struct vine_mount *input_mount;
LIST_ITERATE(t->input_mounts, input_mount)
{
if (input_mount->file && input_mount->file->type == VINE_TEMP) {
clean_redundant_replicas(q, input_mount->file);
}
}
}

return VINE_SUCCESS;
}

Expand Down Expand Up @@ -1001,6 +1016,75 @@ static int enforce_worker_eviction_interval(struct vine_manager *q)
return 1;
}

/** Clean redundant replicas of a temporary file.
For example, a file may be transferred to another worker because a task that declares it
as input is scheduled there, resulting in an extra replica that consumes storage space.
This function evaluates whether the file has excessive replicas and removes those on
workers that do not execute their dependent tasks. */
static void clean_redundant_replicas(struct vine_manager *q, struct vine_file *f)
{
if (!f || f->type != VINE_TEMP) {
return;
}

struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name);
if (!source_workers) {
/* no surprise - a cache-update message may trigger a file deletion. */
return;
}
int excess_replicas = set_size(source_workers) - q->temp_replica_count;
if (excess_replicas <= 0) {
return;
}
/* Note that this replica may serve as a source for a peer transfer. If it is unlinked prematurely,
* the corresponding transfer could fail and leave a task without its required data.
* Therefore, we must wait until all replicas are confirmed ready before proceeding. */
if (vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY) != set_size(source_workers)) {
return;
}

struct priority_queue *clean_replicas_from_workers = priority_queue_create(0);

struct vine_worker_info *source_worker = NULL;
SET_ITERATE(source_workers, source_worker)
{
/* if the file is actively in use by a task (the input to that task), we don't remove the replica on this worker */
int file_inuse = 0;

uint64_t task_id;
struct vine_task *task;
ITABLE_ITERATE(source_worker->current_tasks, task_id, task)
{
struct vine_mount *input_mount;
LIST_ITERATE(task->input_mounts, input_mount)
{
if (f == input_mount->file) {
file_inuse = 1;
break;
}
}
if (file_inuse) {
break;
}
}

if (file_inuse) {
continue;
}

priority_queue_push(clean_replicas_from_workers, source_worker, source_worker->inuse_cache);
}

source_worker = NULL;
while (excess_replicas > 0 && (source_worker = priority_queue_pop(clean_replicas_from_workers))) {
delete_worker_file(q, source_worker, f->cached_name, 0, 0);
excess_replicas--;
}
priority_queue_delete(clean_replicas_from_workers);

return;
}

/* Remove all tasks and other associated state from a given worker. */
static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
{
Expand Down Expand Up @@ -4219,6 +4303,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;

q->temp_replica_count = 1;
q->clean_redundant_replicas = 0;
q->transfer_temps_recovery = 0;
q->transfer_replica_per_cycle = 10;

Expand Down Expand Up @@ -5980,6 +6065,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "enforce-worker-eviction-interval")) {
q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND);

} else if (!strcmp(name, "clean-redundant-replicas")) {
q->clean_redundant_replicas = !!((int)value);

} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ struct vine_manager {
int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */
int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */
int temp_replica_count; /* Number of replicas per temp file */
int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */

double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */
double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */
Expand Down