Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion taskvine/src/manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ SOURCES = \
vine_file_replica_table.c \
vine_fair.c \
vine_runtime_dir.c \
vine_task_groups.c
vine_task_groups.c \
vine_temp.c

PUBLIC_HEADERS = taskvine.h

Expand Down
135 changes: 38 additions & 97 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ See the file COPYING for details.
#include "vine_taskgraph_log.h"
#include "vine_txn_log.h"
#include "vine_worker_info.h"
#include "vine_temp.h"

#include "address.h"
#include "buffer.h"
Expand Down Expand Up @@ -165,7 +166,6 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi
static void vine_manager_consider_recovery_task(struct vine_manager *q, struct vine_file *lost_file, struct vine_task *rt);

static void delete_uncacheable_files(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);
static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);
static int release_worker(struct vine_manager *q, struct vine_worker_info *w);

struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);
Expand Down Expand Up @@ -418,9 +418,14 @@ static vine_msg_code_t handle_cache_update(struct vine_manager *q, struct vine_w
f->state = VINE_FILE_STATE_CREATED;
f->size = size;

/* And if the file is a newly created temporary, replicate as needed. */
if (f->type == VINE_TEMP && *id == 'X' && q->temp_replica_count > 1) {
hash_table_insert(q->temp_files_to_replicate, f->cached_name, NULL);
/* If a TEMP file, replicate or shift disk load as needed. */
if (f->type == VINE_TEMP) {
if (q->temp_replica_count > 1) {
vine_temp_queue_for_replication(q, f);
}
if (q->shift_disk_load) {
vine_temp_shift_disk_load(q, w, f);
}
}
}
}
Expand Down Expand Up @@ -476,6 +481,9 @@ static vine_msg_code_t handle_cache_invalid(struct vine_manager *q, struct vine_
w->last_failure_time = timestamp_get();
}

/* If the creation failed, we may want to backup the file somewhere else. */
vine_temp_handle_lost_replica(q, cachename);

/* Successfully processed this message. */
return VINE_MSG_PROCESSED;
} else {
Expand Down Expand Up @@ -650,6 +658,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
itable_remove(q->running_table, t->task_id);
vine_task_set_result(t, task_status);

/* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed.
* However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called.
* So, we only clean up redundant replicas for the task-inputs when the manager is configured to clean redundant replicas. */
if (q->clean_redundant_replicas) {
struct vine_mount *input_mount;
LIST_ITERATE(t->input_mounts, input_mount)
{
if (input_mount->file && input_mount->file->type == VINE_TEMP) {
vine_temp_clean_redundant_replicas(q, input_mount->file);
}
}
}

return VINE_SUCCESS;
}

Expand Down Expand Up @@ -1036,85 +1057,6 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
cleanup_worker_files(q, w);
}

/* Start replicating files that may need replication */
static int consider_tempfile_replications(struct vine_manager *q)
{
if (hash_table_size(q->temp_files_to_replicate) <= 0) {
return 0;
}

char *cached_name = NULL;
void *empty_val = NULL;
int total_replication_request_sent = 0;

static char key_start[PATH_MAX] = "random init";
int iter_control;
int iter_count_var;

struct list *to_remove = list_create();

HASH_TABLE_ITERATE_FROM_KEY(q->temp_files_to_replicate, iter_control, iter_count_var, key_start, cached_name, empty_val)
{
struct vine_file *f = hash_table_lookup(q->file_table, cached_name);

if (!f) {
continue;
}

/* are there any available source workers? */
struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name);
if (!source_workers) {
/* If no source workers found, it indicates that the file doesn't exist, either pruned or lost.
Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost. */
if (q->transfer_temps_recovery && file_needs_recovery(q, f)) {
vine_manager_consider_recovery_task(q, f, f->recovery_task);
}
list_push_tail(to_remove, xxstrdup(f->cached_name));
continue;
}

/* at least one source is able to transfer? */
int has_valid_source = 0;
struct vine_worker_info *s;
SET_ITERATE(source_workers, s)
{
if (s->transfer_port_active && s->outgoing_xfer_counter < q->worker_source_max_transfers && !s->draining) {
has_valid_source = 1;
break;
}
}
if (!has_valid_source) {
continue;
}

/* has this file been fully replicated? */
int nsource_workers = set_size(source_workers);
int to_find = MIN(q->temp_replica_count - nsource_workers, q->transfer_replica_per_cycle);
if (to_find <= 0) {
list_push_tail(to_remove, xxstrdup(f->cached_name));
continue;
}

// debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsource_workers, f->cached_name, to_find);

int round_replication_request_sent = vine_file_replica_table_replicate(q, f, source_workers, to_find);
total_replication_request_sent += round_replication_request_sent;

if (total_replication_request_sent >= q->attempt_schedule_depth) {
break;
}
}

while ((cached_name = list_pop_head(to_remove))) {
hash_table_remove(q->temp_files_to_replicate, cached_name);
free(cached_name);
}

list_delete(to_remove);

return total_replication_request_sent;
}

/* Insert into hashtable temp files that may need replication. */

static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_worker_info *w)
Expand All @@ -1127,11 +1069,7 @@ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_wo
// Iterate over files we want might want to recover
HASH_TABLE_ITERATE(w->current_files, cached_name, info)
{
struct vine_file *f = hash_table_lookup(q->file_table, cached_name);

if (f && f->type == VINE_TEMP) {
hash_table_insert(q->temp_files_to_replicate, cached_name, NULL);
}
vine_temp_handle_lost_replica(q, cached_name);
}
}

Expand Down Expand Up @@ -1243,7 +1181,7 @@ static void add_worker(struct vine_manager *q)

/* Delete a single file on a remote worker except those with greater delete_upto_level cache level */

static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level)
int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level)
{
if (cache_level <= delete_upto_level) {
process_replica_on_event(q, w, filename, VINE_FILE_REPLICA_STATE_TRANSITION_EVENT_UNLINK);
Expand Down Expand Up @@ -4135,7 +4073,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert

q->worker_table = hash_table_create(0, 0);
q->file_worker_table = hash_table_create(0, 0);
q->temp_files_to_replicate = hash_table_create(0, 0);
q->temp_files_to_replicate = priority_queue_create(0);
q->worker_blocklist = hash_table_create(0, 0);

q->file_table = hash_table_create(0, 0);
Expand Down Expand Up @@ -4219,6 +4157,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;

q->temp_replica_count = 1;
q->clean_redundant_replicas = 0;
q->shift_disk_load = 0;
q->transfer_temps_recovery = 0;
q->transfer_replica_per_cycle = 10;

Expand Down Expand Up @@ -4487,8 +4427,7 @@ void vine_delete(struct vine_manager *q)
hash_table_clear(q->file_worker_table, (void *)set_delete);
hash_table_delete(q->file_worker_table);

hash_table_clear(q->temp_files_to_replicate, 0);
hash_table_delete(q->temp_files_to_replicate);
priority_queue_delete(q->temp_files_to_replicate);

hash_table_clear(q->factory_table, (void *)vine_factory_info_delete);
hash_table_delete(q->factory_table);
Expand Down Expand Up @@ -5448,7 +5387,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,

// Check if any temp files need replication and start replicating
BEGIN_ACCUM_TIME(q, time_internal);
result = consider_tempfile_replications(q);
result = vine_temp_start_replication(q);
END_ACCUM_TIME(q, time_internal);
if (result) {
// recovered at least one temp file
Expand Down Expand Up @@ -5841,7 +5780,6 @@ void vine_set_manager_preferred_connection(struct vine_manager *q, const char *p

int vine_tune(struct vine_manager *q, const char *name, double value)
{

if (!strcmp(name, "attempt-schedule-depth")) {
q->attempt_schedule_depth = MAX(1, (int)value);

Expand Down Expand Up @@ -5980,6 +5918,12 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "enforce-worker-eviction-interval")) {
q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND);

} else if (!strcmp(name, "clean-redundant-replicas")) {
q->clean_redundant_replicas = !!((int)value);

} else if (!strcmp(name, "shift-disk-load")) {
q->shift_disk_load = !!((int)value);

} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
Expand Down Expand Up @@ -6464,9 +6408,6 @@ int vine_prune_file(struct vine_manager *m, struct vine_file *f)
}
}

/* also remove from the replication table. */
hash_table_remove(m->temp_files_to_replicate, f->cached_name);

return pruned_replica_count;
}

Expand Down
7 changes: 6 additions & 1 deletion taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ struct vine_manager {

struct hash_table *file_table; /* Maps fileid -> struct vine_file.* */
struct hash_table *file_worker_table; /* Maps cachename -> struct set of workers with a replica of the file.* */
struct hash_table *temp_files_to_replicate; /* Maps cachename -> NULL. Used as a set of temp files to be replicated */
struct priority_queue *temp_files_to_replicate; /* Priority queue of temp files to be replicated, those with less replicas are at the top. */


/* Primary scheduling controls. */
Expand Down Expand Up @@ -217,6 +217,8 @@ struct vine_manager {
int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */
int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */
int temp_replica_count; /* Number of replicas per temp file */
int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */
int shift_disk_load; /* If true, shift storage burden to more available workers to minimize disk usage peaks. */

double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */
double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */
Expand Down Expand Up @@ -291,6 +293,9 @@ void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info
/* Check if the worker is able to transfer the necessary files for this task. */
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);

/* Delete a file from a worker. */
int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);

/* The expected format of files created by the resource monitor.*/
#define RESOURCE_MONITOR_TASK_LOCAL_NAME "vine-task-%d"
#define RESOURCE_MONITOR_REMOTE_NAME "cctools-monitor"
Expand Down
Loading