Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 276 additions & 4 deletions common/arg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ struct common_load_model_from_url_headers {
std::string etag;
std::string last_modified;
std::string accept_ranges;
long content_length = -1;
};

struct FILE_deleter {
Expand All @@ -263,6 +264,7 @@ static size_t common_header_callback(char * buffer, size_t, size_t n_items, void
static std::regex etag_regex("ETag", std::regex_constants::icase);
static std::regex last_modified_regex("Last-Modified", std::regex_constants::icase);
static std::regex accept_ranges_regex("Accept-Ranges", std::regex_constants::icase);
static std::regex content_length_regex("Content-Length", std::regex_constants::icase);
std::string header(buffer, n_items);
std::smatch match;
if (std::regex_match(header, match, header_regex)) {
Expand All @@ -274,6 +276,12 @@ static size_t common_header_callback(char * buffer, size_t, size_t n_items, void
headers->last_modified = value;
} else if (std::regex_match(key, match, accept_ranges_regex)) {
headers->accept_ranges = value;
} else if (std::regex_match(key, match, content_length_regex)) {
try {
headers->content_length = std::stol(value);
} catch (const std::exception & e) {
LOG_WRN("%s: failed to parse Content-Length '%s': %s\n", __func__, value.c_str(), e.what());
}
}
}

Expand Down Expand Up @@ -367,6 +375,225 @@ static bool common_download_head(CURL * curl,
return common_curl_perf(curl) == CURLE_OK;
}

// Structure to manage a single download chunk
struct common_download_chunk {
std::string url;
std::string bearer_token;
std::string path_temporary;
long start_byte;
long end_byte;
long downloaded_bytes;
CURL * curl;
FILE * file;
bool completed;

common_download_chunk(const std::string & url_,
const std::string & bearer_token_,
const std::string & path_,
long start, long end)
: url(url_), bearer_token(bearer_token_), path_temporary(path_),
start_byte(start), end_byte(end), downloaded_bytes(0),
curl(nullptr), file(nullptr), completed(false) {}
};

// Multi-connection download manager
static bool common_download_file_multiconn(const std::string & url,
const std::string & path,
const std::string & bearer_token,
long content_length,
int num_connections = 4) {
// Minimum chunk size (2MB) - don't use multi-connection for small files
const long min_chunk_size = 2 * 1024 * 1024;
const long min_file_size = min_chunk_size * 2; // Minimum file size to enable multi-connection

if (content_length < min_file_size) {
LOG_DBG("%s: file too small (%ld bytes) for multi-connection download\n", __func__, content_length);
return false;
}

// Adjust number of connections based on file size
const long chunk_size = content_length / num_connections;
if (chunk_size < min_chunk_size) {
num_connections = static_cast<int>(content_length / min_chunk_size);
if (num_connections < 2) {
LOG_DBG("%s: adjusted connection count results in less than 2 connections\n", __func__);
return false;
}
}

LOG_INF("%s: starting multi-connection download with %d connections for %ld bytes (chunk size: %ld)\n",
__func__, num_connections, content_length, content_length / num_connections);

std::vector<common_download_chunk> chunks;
std::vector<std::future<bool>> futures;

// Create chunk files and prepare for download
for (int i = 0; i < num_connections; ++i) {
long start = static_cast<long>(i) * (content_length / num_connections);
long end;
if (i == num_connections - 1) {
end = content_length - 1;
} else {
end = start + (content_length / num_connections) - 1;
}

std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i);
chunks.emplace_back(url, bearer_token, chunk_path, start, end);
}

// Download chunks in parallel
for (size_t i = 0; i < chunks.size(); ++i) {
futures.push_back(std::async(std::launch::async, [&](size_t chunk_idx) -> bool {
auto & chunk = chunks[chunk_idx];

// Initialize CURL for this chunk
chunk.curl = curl_easy_init();
if (!chunk.curl) {
LOG_ERR("%s: failed to initialize CURL for chunk %zu\n", __func__, chunk_idx);
return false;
}

// Check if chunk file exists (resume support)
long resume_from = 0;
if (std::filesystem::exists(chunk.path_temporary)) {
resume_from = std::filesystem::file_size(chunk.path_temporary);
chunk.downloaded_bytes = resume_from;
LOG_DBG("%s: resuming chunk %zu from byte %ld\n", __func__, chunk_idx, resume_from);
}

// Open chunk file for writing
chunk.file = fopen(chunk.path_temporary.c_str(), "ab");
if (!chunk.file) {
LOG_ERR("%s: failed to open chunk file %s\n", __func__, chunk.path_temporary.c_str());
curl_easy_cleanup(chunk.curl);
return false;
}

// Set up CURL options
curl_easy_setopt(chunk.curl, CURLOPT_URL, chunk.url.c_str());
curl_easy_setopt(chunk.curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(chunk.curl, CURLOPT_WRITEDATA, chunk.file);
curl_easy_setopt(chunk.curl, CURLOPT_WRITEFUNCTION, common_write_callback);
curl_easy_setopt(chunk.curl, CURLOPT_NOPROGRESS, 1L); // Disable progress per chunk

// Set range for this chunk
long actual_start = chunk.start_byte + resume_from;
if (actual_start <= chunk.end_byte) {
std::string range_str = std::to_string(actual_start) + "-" + std::to_string(chunk.end_byte);
curl_easy_setopt(chunk.curl, CURLOPT_RANGE, range_str.c_str());

LOG_DBG("%s: downloading chunk %zu range %s\n", __func__, chunk_idx, range_str.c_str());
} else {
// Chunk already completed
chunk.completed = true;
fclose(chunk.file);
curl_easy_cleanup(chunk.curl);
LOG_DBG("%s: chunk %zu already completed\n", __func__, chunk_idx);
return true;
}

// Add authorization header if needed
curl_slist * headers = nullptr;
headers = curl_slist_append(headers, "User-Agent: llama-cpp");
if (!chunk.bearer_token.empty()) {
std::string auth_header = "Authorization: Bearer " + chunk.bearer_token;
headers = curl_slist_append(headers, auth_header.c_str());
}
curl_easy_setopt(chunk.curl, CURLOPT_HTTPHEADER, headers);

#ifdef _WIN32
curl_easy_setopt(chunk.curl, CURLOPT_SSL_OPTIONS, CURLSSLOPT_NATIVE_CA);
#endif

// Perform the download
CURLcode res = curl_easy_perform(chunk.curl);
bool success = (res == CURLE_OK);

if (success) {
long http_code = 0;
curl_easy_getinfo(chunk.curl, CURLINFO_RESPONSE_CODE, &http_code);
if (http_code < 200 || http_code >= 400) {
LOG_ERR("%s: chunk %zu failed with HTTP code %ld\n", __func__, chunk_idx, http_code);
success = false;
} else {
LOG_DBG("%s: chunk %zu completed successfully (HTTP %ld)\n", __func__, chunk_idx, http_code);
}
} else {
LOG_ERR("%s: chunk %zu failed: %s\n", __func__, chunk_idx, curl_easy_strerror(res));
}

// Cleanup
if (headers) {
curl_slist_free_all(headers);
}
fclose(chunk.file);
curl_easy_cleanup(chunk.curl);

chunk.completed = success;
return success;
}, i));
}

// Wait for all chunks to complete
bool all_success = true;
for (size_t i = 0; i < futures.size(); ++i) {
if (!futures[i].get()) {
LOG_ERR("%s: chunk %zu failed\n", __func__, i);
all_success = false;
}
}

if (!all_success) {
LOG_ERR("%s: one or more chunks failed to download\n", __func__);
// Clean up any partial chunk files
for (const auto & chunk : chunks) {
if (std::filesystem::exists(chunk.path_temporary)) {
std::filesystem::remove(chunk.path_temporary);
}
}
return false;
}

// Combine chunks into final file
const std::string path_temporary = path + ".downloadInProgress";
std::ofstream final_file(path_temporary, std::ios::binary);
if (!final_file) {
LOG_ERR("%s: failed to create final file %s\n", __func__, path_temporary.c_str());
return false;
}

LOG_INF("%s: combining %zu chunks into final file\n", __func__, chunks.size());
for (size_t i = 0; i < chunks.size(); ++i) {
std::ifstream chunk_file(chunks[i].path_temporary, std::ios::binary);
if (!chunk_file) {
LOG_ERR("%s: failed to open chunk file %s for combining\n", __func__, chunks[i].path_temporary.c_str());
final_file.close();
std::filesystem::remove(path_temporary);
return false;
}

// Copy chunk to final file
final_file << chunk_file.rdbuf();
chunk_file.close();

// Verify chunk was written properly
if (final_file.fail()) {
LOG_ERR("%s: failed to write chunk %zu to final file\n", __func__, i);
final_file.close();
std::filesystem::remove(path_temporary);
return false;
}

// Remove chunk file after successful combination
std::filesystem::remove(chunks[i].path_temporary);
LOG_DBG("%s: combined and removed chunk %zu\n", __func__, i);
}

final_file.close();
LOG_INF("%s: multi-connection download completed successfully\n", __func__);
return true;
}

// download one single file from remote URL to local path
static bool common_download_file_single(const std::string & url,
const std::string & path,
Expand Down Expand Up @@ -485,9 +712,10 @@ static bool common_download_file_single(const std::string & url,

// Write the updated JSON metadata file.
metadata.update({
{ "url", url },
{ "etag", headers.etag },
{ "lastModified", headers.last_modified }
{ "url", url },
{ "etag", headers.etag },
{ "lastModified", headers.last_modified },
{ "contentLength", headers.content_length }
});
write_file(metadata_path, metadata.dump(4));
LOG_DBG("%s: file metadata saved: %s\n", __func__, metadata_path.c_str());
Expand All @@ -496,7 +724,51 @@ static bool common_download_file_single(const std::string & url,
LOG_INF("%s: trying to download model from %s to %s (server_etag:%s, server_last_modified:%s)...\n",
__func__, llama_download_hide_password_in_url(url).c_str(), path_temporary.c_str(),
headers.etag.c_str(), headers.last_modified.c_str());
const bool was_pull_successful = common_pull_file(curl.get(), path_temporary);

bool was_pull_successful = false;

// Try multi-connection download if conditions are met
if (accept_ranges_supported && headers.content_length > 0 && should_download_from_scratch) {
LOG_INF("%s: server supports range requests with content length %ld bytes\n", __func__, headers.content_length);

// Store chunk info in metadata for progress tracking
metadata["multiconn"] = {
{"content_length", headers.content_length},
{"chunks_used", true},
{"attempt_time", std::time(nullptr)}
};
write_file(metadata_path, metadata.dump(4));

was_pull_successful = common_download_file_multiconn(url, path, bearer_token, headers.content_length);

if (!was_pull_successful) {
LOG_WRN("%s: multi-connection download failed, falling back to single connection\n", __func__);
// Remove failed chunk metadata
metadata.erase("multiconn");
write_file(metadata_path, metadata.dump(4));

// Clean up any remaining chunk files
try {
for (int i = 0; i < 10; ++i) { // Check up to 10 possible chunks
std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i);
if (std::filesystem::exists(chunk_path)) {
std::filesystem::remove(chunk_path);
LOG_DBG("%s: cleaned up chunk file %s\n", __func__, chunk_path.c_str());
}
}
} catch (const std::exception& e) {
LOG_WRN("%s: error cleaning up chunk files: %s\n", __func__, e.what());
}
}
} else {
LOG_DBG("%s: multi-connection download not attempted: accept_ranges=%d, content_length=%ld, from_scratch=%d\n",
__func__, accept_ranges_supported ? 1 : 0, headers.content_length, should_download_from_scratch ? 1 : 0);
}

// Fall back to single-connection download if multi-connection failed or wasn't attempted
if (!was_pull_successful) {
was_pull_successful = common_pull_file(curl.get(), path_temporary);
}
if (!was_pull_successful) {
if (i + 1 < max_attempts) {
const int exponential_backoff_delay = std::pow(retry_delay_seconds, i) * 1000;
Expand Down
Loading