diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index ca72d257d9..4528039273 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -170,6 +170,8 @@ static int release_worker(struct vine_manager *q, struct vine_worker_info *w); struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name); +static void clean_redundant_replicas(struct vine_manager *q, struct vine_file *f); + /* Return the number of workers matching a given type: WORKER, STATUS, etc */ static int count_workers(struct vine_manager *q, vine_worker_type_t type) @@ -650,6 +652,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v itable_remove(q->running_table, t->task_id); vine_task_set_result(t, task_status); + /* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed. + * However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called. + * So, we only clean up redundant replicas for the task-inputs when the manager is configured to do so. */ + if (q->clean_redundant_replicas) { + struct vine_mount *input_mount; + LIST_ITERATE(t->input_mounts, input_mount) + { + if (input_mount->file && input_mount->file->type == VINE_TEMP) { + clean_redundant_replicas(q, input_mount->file); + } + } + } + return VINE_SUCCESS; } @@ -1001,6 +1016,75 @@ static int enforce_worker_eviction_interval(struct vine_manager *q) return 1; } +/** Clean redundant replicas of a temporary file. +For example, a file may be transferred to another worker because a task that declares it +as input is scheduled there, resulting in an extra replica that consumes storage space. +This function evaluates whether the file has excessive replicas and removes those on +workers that do not execute their dependent tasks. */ +static void clean_redundant_replicas(struct vine_manager *q, struct vine_file *f) +{ + if (!f || f->type != VINE_TEMP) { + return; + } + + struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name); + if (!source_workers) { + /* no surprise - a cache-update message may trigger a file deletion. */ + return; + } + int excess_replicas = set_size(source_workers) - q->temp_replica_count; + if (excess_replicas <= 0) { + return; + } + /* Note that this replica may serve as a source for a peer transfer. If it is unlinked prematurely, + * the corresponding transfer could fail and leave a task without its required data. + * Therefore, we must wait until all replicas are confirmed ready before proceeding. */ + if (vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY) != set_size(source_workers)) { + return; + } + + struct priority_queue *clean_replicas_from_workers = priority_queue_create(0); + + struct vine_worker_info *source_worker = NULL; + SET_ITERATE(source_workers, source_worker) + { + /* if the file is actively in use by a task (the input to that task), we don't remove the replica on this worker */ + int file_inuse = 0; + + uint64_t task_id; + struct vine_task *task; + ITABLE_ITERATE(source_worker->current_tasks, task_id, task) + { + struct vine_mount *input_mount; + LIST_ITERATE(task->input_mounts, input_mount) + { + if (f == input_mount->file) { + file_inuse = 1; + break; + } + } + if (file_inuse) { + break; + } + } + + if (file_inuse) { + continue; + } + + priority_queue_push(clean_replicas_from_workers, source_worker, source_worker->inuse_cache); + } + + source_worker = NULL; + while (excess_replicas > 0 && (source_worker = priority_queue_pop(clean_replicas_from_workers))) { + delete_worker_file(q, source_worker, f->cached_name, 0, 0); + excess_replicas--; + } + priority_queue_delete(clean_replicas_from_workers); + + return; +} + /* Remove all tasks and other associated state from a given worker. */ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w) { @@ -4219,6 +4303,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->perf_log_interval = VINE_PERF_LOG_INTERVAL; q->temp_replica_count = 1; + q->clean_redundant_replicas = 0; q->transfer_temps_recovery = 0; q->transfer_replica_per_cycle = 10; @@ -5980,6 +6065,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if (!strcmp(name, "enforce-worker-eviction-interval")) { q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND); + } else if (!strcmp(name, "clean-redundant-replicas")) { + q->clean_redundant_replicas = !!((int)value); + } else { debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name); return -1; diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index bcf2405616..f0241e3d9f 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -217,6 +217,7 @@ struct vine_manager { int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */ int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */ int temp_replica_count; /* Number of replicas per temp file */ + int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */ double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */ double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */