diff --git a/taskvine/src/manager/Makefile b/taskvine/src/manager/Makefile index a036e9bd64..db09403667 100644 --- a/taskvine/src/manager/Makefile +++ b/taskvine/src/manager/Makefile @@ -28,7 +28,8 @@ SOURCES = \ vine_file_replica_table.c \ vine_fair.c \ vine_runtime_dir.c \ - vine_task_groups.c + vine_task_groups.c \ + vine_temp.c PUBLIC_HEADERS = taskvine.h diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index e4ce14d328..d04096cc8f 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -29,6 +29,7 @@ See the file COPYING for details. #include "vine_taskgraph_log.h" #include "vine_txn_log.h" #include "vine_worker_info.h" +#include "vine_temp.h" #include "address.h" #include "buffer.h" @@ -165,7 +166,6 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi static void vine_manager_consider_recovery_task(struct vine_manager *q, struct vine_file *lost_file, struct vine_task *rt); static void delete_uncacheable_files(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t); -static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level); static int release_worker(struct vine_manager *q, struct vine_worker_info *w); struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name); @@ -418,9 +418,14 @@ static vine_msg_code_t handle_cache_update(struct vine_manager *q, struct vine_w f->state = VINE_FILE_STATE_CREATED; f->size = size; - /* And if the file is a newly created temporary, replicate as needed. */ - if (f->type == VINE_TEMP && *id == 'X' && q->temp_replica_count > 1) { - hash_table_insert(q->temp_files_to_replicate, f->cached_name, NULL); + /* If a TEMP file, replicate or shift disk load as needed. */ + if (f->type == VINE_TEMP) { + if (q->temp_replica_count > 1) { + vine_temp_queue_for_replication(q, f); + } + if (q->shift_disk_load) { + vine_temp_shift_disk_load(q, w, f); + } } } } @@ -476,6 +481,9 @@ static vine_msg_code_t handle_cache_invalid(struct vine_manager *q, struct vine_ w->last_failure_time = timestamp_get(); } + /* If the creation failed, we may want to backup the file somewhere else. */ + vine_temp_handle_lost_replica(q, cachename); + /* Successfully processed this message. */ return VINE_MSG_PROCESSED; } else { @@ -650,6 +658,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v itable_remove(q->running_table, t->task_id); vine_task_set_result(t, task_status); + /* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed. + * However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called. + * So, we only clean up redundant replicas for the task-inputs when the manager is configured to clean redundant replicas. */ + if (q->clean_redundant_replicas) { + struct vine_mount *input_mount; + LIST_ITERATE(t->input_mounts, input_mount) + { + if (input_mount->file && input_mount->file->type == VINE_TEMP) { + vine_temp_clean_redundant_replicas(q, input_mount->file); + } + } + } + return VINE_SUCCESS; } @@ -1036,85 +1057,6 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w) cleanup_worker_files(q, w); } -/* Start replicating files that may need replication */ -static int consider_tempfile_replications(struct vine_manager *q) -{ - if (hash_table_size(q->temp_files_to_replicate) <= 0) { - return 0; - } - - char *cached_name = NULL; - void *empty_val = NULL; - int total_replication_request_sent = 0; - - static char key_start[PATH_MAX] = "random init"; - int iter_control; - int iter_count_var; - - struct list *to_remove = list_create(); - - HASH_TABLE_ITERATE_FROM_KEY(q->temp_files_to_replicate, iter_control, iter_count_var, key_start, cached_name, empty_val) - { - struct vine_file *f = hash_table_lookup(q->file_table, cached_name); - - if (!f) { - continue; - } - - /* are there any available source workers? */ - struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name); - if (!source_workers) { - /* If no source workers found, it indicates that the file doesn't exist, either pruned or lost. - Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost. */ - if (q->transfer_temps_recovery && file_needs_recovery(q, f)) { - vine_manager_consider_recovery_task(q, f, f->recovery_task); - } - list_push_tail(to_remove, xxstrdup(f->cached_name)); - continue; - } - - /* at least one source is able to transfer? */ - int has_valid_source = 0; - struct vine_worker_info *s; - SET_ITERATE(source_workers, s) - { - if (s->transfer_port_active && s->outgoing_xfer_counter < q->worker_source_max_transfers && !s->draining) { - has_valid_source = 1; - break; - } - } - if (!has_valid_source) { - continue; - } - - /* has this file been fully replicated? */ - int nsource_workers = set_size(source_workers); - int to_find = MIN(q->temp_replica_count - nsource_workers, q->transfer_replica_per_cycle); - if (to_find <= 0) { - list_push_tail(to_remove, xxstrdup(f->cached_name)); - continue; - } - - // debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsource_workers, f->cached_name, to_find); - - int round_replication_request_sent = vine_file_replica_table_replicate(q, f, source_workers, to_find); - total_replication_request_sent += round_replication_request_sent; - - if (total_replication_request_sent >= q->attempt_schedule_depth) { - break; - } - } - - while ((cached_name = list_pop_head(to_remove))) { - hash_table_remove(q->temp_files_to_replicate, cached_name); - free(cached_name); - } - - list_delete(to_remove); - - return total_replication_request_sent; -} - /* Insert into hashtable temp files that may need replication. */ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_worker_info *w) @@ -1127,11 +1069,7 @@ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_wo // Iterate over files we want might want to recover HASH_TABLE_ITERATE(w->current_files, cached_name, info) { - struct vine_file *f = hash_table_lookup(q->file_table, cached_name); - - if (f && f->type == VINE_TEMP) { - hash_table_insert(q->temp_files_to_replicate, cached_name, NULL); - } + vine_temp_handle_lost_replica(q, cached_name); } } @@ -1243,7 +1181,7 @@ static void add_worker(struct vine_manager *q) /* Delete a single file on a remote worker except those with greater delete_upto_level cache level */ -static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level) +int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level) { if (cache_level <= delete_upto_level) { process_replica_on_event(q, w, filename, VINE_FILE_REPLICA_STATE_TRANSITION_EVENT_UNLINK); @@ -4135,7 +4073,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->worker_table = hash_table_create(0, 0); q->file_worker_table = hash_table_create(0, 0); - q->temp_files_to_replicate = hash_table_create(0, 0); + q->temp_files_to_replicate = priority_queue_create(0); q->worker_blocklist = hash_table_create(0, 0); q->file_table = hash_table_create(0, 0); @@ -4219,6 +4157,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->perf_log_interval = VINE_PERF_LOG_INTERVAL; q->temp_replica_count = 1; + q->clean_redundant_replicas = 0; + q->shift_disk_load = 0; q->transfer_temps_recovery = 0; q->transfer_replica_per_cycle = 10; @@ -4487,8 +4427,7 @@ void vine_delete(struct vine_manager *q) hash_table_clear(q->file_worker_table, (void *)set_delete); hash_table_delete(q->file_worker_table); - hash_table_clear(q->temp_files_to_replicate, 0); - hash_table_delete(q->temp_files_to_replicate); + priority_queue_delete(q->temp_files_to_replicate); hash_table_clear(q->factory_table, (void *)vine_factory_info_delete); hash_table_delete(q->factory_table); @@ -5448,7 +5387,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, // Check if any temp files need replication and start replicating BEGIN_ACCUM_TIME(q, time_internal); - result = consider_tempfile_replications(q); + result = vine_temp_start_replication(q); END_ACCUM_TIME(q, time_internal); if (result) { // recovered at least one temp file @@ -5841,7 +5780,6 @@ void vine_set_manager_preferred_connection(struct vine_manager *q, const char *p int vine_tune(struct vine_manager *q, const char *name, double value) { - if (!strcmp(name, "attempt-schedule-depth")) { q->attempt_schedule_depth = MAX(1, (int)value); @@ -5980,6 +5918,12 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if (!strcmp(name, "enforce-worker-eviction-interval")) { q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND); + } else if (!strcmp(name, "clean-redundant-replicas")) { + q->clean_redundant_replicas = !!((int)value); + + } else if (!strcmp(name, "shift-disk-load")) { + q->shift_disk_load = !!((int)value); + } else { debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name); return -1; @@ -6464,9 +6408,6 @@ int vine_prune_file(struct vine_manager *m, struct vine_file *f) } } - /* also remove from the replication table. */ - hash_table_remove(m->temp_files_to_replicate, f->cached_name); - return pruned_replica_count; } diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index bcf2405616..5b96dacb98 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -123,7 +123,7 @@ struct vine_manager { struct hash_table *file_table; /* Maps fileid -> struct vine_file.* */ struct hash_table *file_worker_table; /* Maps cachename -> struct set of workers with a replica of the file.* */ - struct hash_table *temp_files_to_replicate; /* Maps cachename -> NULL. Used as a set of temp files to be replicated */ + struct priority_queue *temp_files_to_replicate; /* Priority queue of temp files to be replicated, those with less replicas are at the top. */ /* Primary scheduling controls. */ @@ -217,6 +217,8 @@ struct vine_manager { int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */ int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */ int temp_replica_count; /* Number of replicas per temp file */ + int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */ + int shift_disk_load; /* If true, shift storage burden to more available workers to minimize disk usage peaks. */ double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */ double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */ @@ -291,6 +293,9 @@ void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info /* Check if the worker is able to transfer the necessary files for this task. */ int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t); +/* Delete a file from a worker. */ +int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level); + /* The expected format of files created by the resource monitor.*/ #define RESOURCE_MONITOR_TASK_LOCAL_NAME "vine-task-%d" #define RESOURCE_MONITOR_REMOTE_NAME "cctools-monitor" diff --git a/taskvine/src/manager/vine_temp.c b/taskvine/src/manager/vine_temp.c new file mode 100644 index 0000000000..f7dac15109 --- /dev/null +++ b/taskvine/src/manager/vine_temp.c @@ -0,0 +1,406 @@ +#include "vine_temp.h" +#include "vine_file.h" +#include "vine_worker_info.h" +#include "vine_file_replica_table.h" +#include "vine_manager.h" +#include "vine_manager_put.h" +#include "vine_file_replica.h" +#include "vine_file_replica_table.h" +#include "vine_task.h" +#include "vine_mount.h" + +#include "priority_queue.h" +#include "macros.h" +#include "stringtools.h" +#include "debug.h" +#include "random.h" +#include "xxmalloc.h" + +/*************************************************************/ +/* Private Functions */ +/*************************************************************/ + +/** +Check whether a worker is eligible to participate in peer transfers. +*/ +static int is_worker_active(struct vine_worker_info *w) +{ + if (!w) { + return 0; + } + if (w->type != VINE_WORKER_TYPE_WORKER) { + return 0; + } + if (!w->transfer_port_active) { + return 0; + } + if (w->draining) { + return 0; + } + if (w->resources->tag < 0) { + return 0; + } + return 1; +} + +/** +Find the most suitable worker to serve as the source of a replica transfer. +Eligible workers already host the file, have a ready replica, and are not +overloaded with outgoing transfers. Preference is given to workers with fewer +outgoing transfers to balance load. +*/ +static struct vine_worker_info *get_best_source_worker(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP) { + return NULL; + } + + struct set *sources = hash_table_lookup(q->file_worker_table, f->cached_name); + if (!sources) { + return NULL; + } + + struct priority_queue *valid_sources_queue = priority_queue_create(0); + struct vine_worker_info *w = NULL; + SET_ITERATE(sources, w) + { + /* skip if transfer port is not active or in draining mode */ + if (!is_worker_active(w)) { + continue; + } + /* skip if incoming transfer counter is too high */ + if (w->outgoing_xfer_counter >= q->worker_source_max_transfers) { + continue; + } + /* skip if the worker does not have this file */ + struct vine_file_replica *replica = vine_file_replica_table_lookup(w, f->cached_name); + if (!replica) { + continue; + } + /* skip if the file is not ready */ + if (replica->state != VINE_FILE_REPLICA_STATE_READY) { + continue; + } + /* those with less outgoing_xfer_counter are preferred */ + priority_queue_push(valid_sources_queue, w, -w->outgoing_xfer_counter); + } + + struct vine_worker_info *best_source = priority_queue_pop(valid_sources_queue); + priority_queue_delete(valid_sources_queue); + + return best_source; +} + +/** +Select a destination worker that can accept a new replica. Workers must be +active, not currently hosting the file, and have sufficient free cache space. +Those with more available disk space are prioritized to reduce pressure on +heavily utilized workers. +*/ +static struct vine_worker_info *get_best_dest_worker(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP) { + return NULL; + } + + struct priority_queue *valid_destinations = priority_queue_create(0); + + char *key; + struct vine_worker_info *w; + HASH_TABLE_ITERATE(q->worker_table, key, w) + { + /* skip if transfer port is not active or in draining mode */ + if (!is_worker_active(w)) { + continue; + } + /* skip if the incoming transfer counter is too high */ + if (w->incoming_xfer_counter >= q->worker_source_max_transfers) { + continue; + } + /* skip if the worker already has this file */ + struct vine_file_replica *replica = vine_file_replica_table_lookup(w, f->cached_name); + if (replica) { + continue; + } + /* skip if the worker does not have enough disk space */ + int64_t available_disk_space = (int64_t)MEGABYTES_TO_BYTES(w->resources->disk.total) - w->inuse_cache; + if ((int64_t)f->size > available_disk_space) { + continue; + } + /* workers with more available disk space are preferred to hold the file */ + priority_queue_push(valid_destinations, w, available_disk_space); + } + + struct vine_worker_info *best_destination = priority_queue_pop(valid_destinations); + priority_queue_delete(valid_destinations); + + return best_destination; +} + +/** +Initiate a peer-to-peer transfer between two workers for the specified file. +The source worker provides a direct URL so the destination worker can pull the +replica immediately via `vine_manager_put_url_now`. +*/ +static void start_peer_transfer(struct vine_manager *q, struct vine_file *f, struct vine_worker_info *dest_worker, struct vine_worker_info *source_worker) +{ + if (!q || !f || f->type != VINE_TEMP || !dest_worker || !source_worker) { + return; + } + + char *source_addr = string_format("%s/%s", source_worker->transfer_url, f->cached_name); + vine_manager_put_url_now(q, dest_worker, source_worker, source_addr, f); + free(source_addr); +} + +/** +Attempt to replicate a temporary file immediately by selecting compatible +source and destination workers. Returns 1 when a transfer is launched, or 0 if +no suitable pair of workers is currently available. +*/ +static int vine_temp_replicate_file_now(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP) { + return 0; + } + + struct vine_worker_info *source_worker = get_best_source_worker(q, f); + if (!source_worker) { + return 0; + } + + struct vine_worker_info *dest_worker = get_best_dest_worker(q, f); + if (!dest_worker) { + return 0; + } + + start_peer_transfer(q, f, dest_worker, source_worker); + + return 1; +} + +/*************************************************************/ +/* Public Functions */ +/*************************************************************/ + +/** +Queue a temporary file for replication when it still lacks the target number of +replicas. Files without any replica and those already satisfying the quota are +ignored. A lower priority value gives preference to scarcer replicas. +*/ +int vine_temp_queue_for_replication(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) { + return 0; + } + + if (q->temp_replica_count <= 1) { + return 0; + } + + int current_replica_count = vine_file_replica_count(q, f); + if (current_replica_count == 0 || current_replica_count >= q->temp_replica_count) { + return 0; + } + + priority_queue_push(q->temp_files_to_replicate, f, -current_replica_count); + + return 1; +} + +/** +Respond to a missing replica notification by re-queuing the corresponding file +for replication, provided the file is still valid and managed by this +coordinator. The use cases include when a cache-invalid message is received from a worker, +or when a worker disconnects unexpectedly, and we need to rescue the lost data. +If the replica does not have any ready source, it will be silently discarded in the +replication phase, so don't worry about it. +*/ +int vine_temp_handle_lost_replica(struct vine_manager *q, char *cachename) +{ + if (!q || !cachename) { + return 0; + } + + struct vine_file *f = hash_table_lookup(q->file_table, cachename); + if (!f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) { + return 0; + } + + vine_temp_queue_for_replication(q, f); + + return 1; +} + +/** +Iterate through temporary files that still need additional replicas and +trigger peer-to-peer transfers when both a source and destination worker +are available. The function honors the manager's scheduling depth so that we +do not spend too much time evaluating the queue in a single invocation. +Files that cannot be replicated immediately are deferred by lowering their +priority and will be reconsidered in future calls. +*/ +int vine_temp_start_replication(struct vine_manager *q) +{ + if (!q) { + return 0; + } + + int processed = 0; + int iter_count = 0; + /* Only examine up to attempt_schedule_depth files to keep the event loop responsive. */ + int iter_depth = MIN(q->attempt_schedule_depth, priority_queue_size(q->temp_files_to_replicate)); + /* Files that cannot be replicated now are temporarily stored and re-queued at the end. */ + struct list *skipped = list_create(); + + struct vine_file *f; + while ((f = priority_queue_pop(q->temp_files_to_replicate)) && (iter_count++ < iter_depth)) { + /* skip and discard the replication request if the file is not valid */ + if (!f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) { + continue; + } + + /* skip and discard the replication request if the file has enough replicas or no replicas */ + int current_replica_count = vine_file_replica_count(q, f); + if (current_replica_count >= q->temp_replica_count || current_replica_count == 0) { + continue; + } + /* skip and discard the replication request if the file has no ready replicas */ + int current_ready_replica_count = vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY); + if (current_ready_replica_count == 0) { + continue; + } + + /* If reaches here, the file still lacks replicas and has at least one ready source, so we start finding a valid source and destination worker + * and trigger the replication. If fails to find a valid source or destination worker, we requeue the file and will consider later. */ + if (!vine_temp_replicate_file_now(q, f)) { + list_push_tail(skipped, f); + continue; + } + + processed++; + + /* Requeue the file with lower priority so it can accumulate replicas gradually. */ + vine_temp_queue_for_replication(q, f); + } + + while ((f = list_pop_head(skipped))) { + vine_temp_queue_for_replication(q, f); + } + list_delete(skipped); + + return processed; +} + +/** +Clean redundant replicas of a temporary file. +For example, a file may be transferred to another worker because a task that declares it +as input is scheduled there, resulting in an extra replica that consumes storage space. +This function evaluates whether the file has excessive replicas and removes those on +workers that do not execute their dependent tasks. +*/ +void vine_temp_clean_redundant_replicas(struct vine_manager *q, struct vine_file *f) +{ + if (!f || f->type != VINE_TEMP) { + return; + } + + struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name); + if (!source_workers) { + /* no surprise - a cache-update message may trigger a file deletion. */ + return; + } + int excess_replicas = set_size(source_workers) - q->temp_replica_count; + if (excess_replicas <= 0) { + return; + } + /* Note that this replica may serve as a source for a peer transfer. If it is unlinked prematurely, + * the corresponding transfer could fail and leave a task without its required data. + * Therefore, we must wait until all replicas are confirmed ready before proceeding. */ + if (vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY) != set_size(source_workers)) { + return; + } + + struct priority_queue *clean_replicas_from_workers = priority_queue_create(0); + + struct vine_worker_info *source_worker = NULL; + SET_ITERATE(source_workers, source_worker) + { + /* if the file is actively in use by a task (the input to that task), we don't remove the replica on this worker */ + int file_inuse = 0; + + uint64_t task_id; + struct vine_task *task; + ITABLE_ITERATE(source_worker->current_tasks, task_id, task) + { + struct vine_mount *input_mount; + LIST_ITERATE(task->input_mounts, input_mount) + { + if (f == input_mount->file) { + file_inuse = 1; + break; + } + } + if (file_inuse) { + break; + } + } + + if (file_inuse) { + continue; + } + + priority_queue_push(clean_replicas_from_workers, source_worker, source_worker->inuse_cache); + } + + source_worker = NULL; + while (excess_replicas > 0 && (source_worker = priority_queue_pop(clean_replicas_from_workers))) { + delete_worker_file(q, source_worker, f->cached_name, 0, 0); + excess_replicas--; + } + priority_queue_delete(clean_replicas_from_workers); + + return; +} + +/** +Shift a temp file replica away from the worker using the most cache space. +This function looks for an alternative worker that can accept the file immediately +so that the original replica can be cleaned up later by @vine_temp_clean_redundant_replicas(). +*/ +void vine_temp_shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f) +{ + if (!q || !source_worker || !f || f->type != VINE_TEMP) { + return; + } + + struct vine_worker_info *target_worker = NULL; + + char *key; + struct vine_worker_info *w = NULL; + HASH_TABLE_ITERATE(q->worker_table, key, w) + { + /* skip if the worker is not active */ + if (!is_worker_active(w)) { + continue; + } + /* skip if the worker already has this file */ + if (vine_file_replica_table_lookup(w, f->cached_name)) { + continue; + } + /* skip if the worker becomes heavier after the transfer */ + if (w->inuse_cache + f->size > source_worker->inuse_cache - f->size) { + continue; + } + /* workers with less inuse cache space are preferred */ + if (!target_worker || w->inuse_cache < target_worker->inuse_cache) { + target_worker = w; + } + } + if (target_worker) { + start_peer_transfer(q, f, target_worker, source_worker); + } + + /* We can clean up the original one safely when the replica arrives at the destination worker. */ + vine_temp_clean_redundant_replicas(q, f); +} \ No newline at end of file diff --git a/taskvine/src/manager/vine_temp.h b/taskvine/src/manager/vine_temp.h new file mode 100644 index 0000000000..0943b9b9cc --- /dev/null +++ b/taskvine/src/manager/vine_temp.h @@ -0,0 +1,15 @@ +#ifndef vine_temp_H +#define vine_temp_H + +#include "vine_manager.h" + +/** Replication related functions */ +int vine_temp_queue_for_replication(struct vine_manager *q, struct vine_file *f); +int vine_temp_start_replication(struct vine_manager *q); +int vine_temp_handle_lost_replica(struct vine_manager *q, char *cachename); + +/** Storage management functions */ +void vine_temp_clean_redundant_replicas(struct vine_manager *q, struct vine_file *f); +void vine_temp_shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f); + +#endif \ No newline at end of file