diff --git a/common/arg.cpp b/common/arg.cpp index 54df2df87276c..404b49bd27c02 100644 --- a/common/arg.cpp +++ b/common/arg.cpp @@ -251,6 +251,7 @@ struct common_load_model_from_url_headers { std::string etag; std::string last_modified; std::string accept_ranges; + long content_length = -1; }; struct FILE_deleter { @@ -263,6 +264,7 @@ static size_t common_header_callback(char * buffer, size_t, size_t n_items, void static std::regex etag_regex("ETag", std::regex_constants::icase); static std::regex last_modified_regex("Last-Modified", std::regex_constants::icase); static std::regex accept_ranges_regex("Accept-Ranges", std::regex_constants::icase); + static std::regex content_length_regex("Content-Length", std::regex_constants::icase); std::string header(buffer, n_items); std::smatch match; if (std::regex_match(header, match, header_regex)) { @@ -274,6 +276,12 @@ static size_t common_header_callback(char * buffer, size_t, size_t n_items, void headers->last_modified = value; } else if (std::regex_match(key, match, accept_ranges_regex)) { headers->accept_ranges = value; + } else if (std::regex_match(key, match, content_length_regex)) { + try { + headers->content_length = std::stol(value); + } catch (const std::exception & e) { + LOG_WRN("%s: failed to parse Content-Length '%s': %s\n", __func__, value.c_str(), e.what()); + } } } @@ -367,6 +375,225 @@ static bool common_download_head(CURL * curl, return common_curl_perf(curl) == CURLE_OK; } +// Structure to manage a single download chunk +struct common_download_chunk { + std::string url; + std::string bearer_token; + std::string path_temporary; + long start_byte; + long end_byte; + long downloaded_bytes; + CURL * curl; + FILE * file; + bool completed; + + common_download_chunk(const std::string & url_, + const std::string & bearer_token_, + const std::string & path_, + long start, long end) + : url(url_), bearer_token(bearer_token_), path_temporary(path_), + start_byte(start), end_byte(end), downloaded_bytes(0), + curl(nullptr), file(nullptr), completed(false) {} +}; + +// Multi-connection download manager +static bool common_download_file_multiconn(const std::string & url, + const std::string & path, + const std::string & bearer_token, + long content_length, + int num_connections = 4) { + // Minimum chunk size (2MB) - don't use multi-connection for small files + const long min_chunk_size = 2 * 1024 * 1024; + const long min_file_size = min_chunk_size * 2; // Minimum file size to enable multi-connection + + if (content_length < min_file_size) { + LOG_DBG("%s: file too small (%ld bytes) for multi-connection download\n", __func__, content_length); + return false; + } + + // Adjust number of connections based on file size + const long chunk_size = content_length / num_connections; + if (chunk_size < min_chunk_size) { + num_connections = static_cast(content_length / min_chunk_size); + if (num_connections < 2) { + LOG_DBG("%s: adjusted connection count results in less than 2 connections\n", __func__); + return false; + } + } + + LOG_INF("%s: starting multi-connection download with %d connections for %ld bytes (chunk size: %ld)\n", + __func__, num_connections, content_length, content_length / num_connections); + + std::vector chunks; + std::vector> futures; + + // Create chunk files and prepare for download + for (int i = 0; i < num_connections; ++i) { + long start = static_cast(i) * (content_length / num_connections); + long end; + if (i == num_connections - 1) { + end = content_length - 1; + } else { + end = start + (content_length / num_connections) - 1; + } + + std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i); + chunks.emplace_back(url, bearer_token, chunk_path, start, end); + } + + // Download chunks in parallel + for (size_t i = 0; i < chunks.size(); ++i) { + futures.push_back(std::async(std::launch::async, [&](size_t chunk_idx) -> bool { + auto & chunk = chunks[chunk_idx]; + + // Initialize CURL for this chunk + chunk.curl = curl_easy_init(); + if (!chunk.curl) { + LOG_ERR("%s: failed to initialize CURL for chunk %zu\n", __func__, chunk_idx); + return false; + } + + // Check if chunk file exists (resume support) + long resume_from = 0; + if (std::filesystem::exists(chunk.path_temporary)) { + resume_from = std::filesystem::file_size(chunk.path_temporary); + chunk.downloaded_bytes = resume_from; + LOG_DBG("%s: resuming chunk %zu from byte %ld\n", __func__, chunk_idx, resume_from); + } + + // Open chunk file for writing + chunk.file = fopen(chunk.path_temporary.c_str(), "ab"); + if (!chunk.file) { + LOG_ERR("%s: failed to open chunk file %s\n", __func__, chunk.path_temporary.c_str()); + curl_easy_cleanup(chunk.curl); + return false; + } + + // Set up CURL options + curl_easy_setopt(chunk.curl, CURLOPT_URL, chunk.url.c_str()); + curl_easy_setopt(chunk.curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(chunk.curl, CURLOPT_WRITEDATA, chunk.file); + curl_easy_setopt(chunk.curl, CURLOPT_WRITEFUNCTION, common_write_callback); + curl_easy_setopt(chunk.curl, CURLOPT_NOPROGRESS, 1L); // Disable progress per chunk + + // Set range for this chunk + long actual_start = chunk.start_byte + resume_from; + if (actual_start <= chunk.end_byte) { + std::string range_str = std::to_string(actual_start) + "-" + std::to_string(chunk.end_byte); + curl_easy_setopt(chunk.curl, CURLOPT_RANGE, range_str.c_str()); + + LOG_DBG("%s: downloading chunk %zu range %s\n", __func__, chunk_idx, range_str.c_str()); + } else { + // Chunk already completed + chunk.completed = true; + fclose(chunk.file); + curl_easy_cleanup(chunk.curl); + LOG_DBG("%s: chunk %zu already completed\n", __func__, chunk_idx); + return true; + } + + // Add authorization header if needed + curl_slist * headers = nullptr; + headers = curl_slist_append(headers, "User-Agent: llama-cpp"); + if (!chunk.bearer_token.empty()) { + std::string auth_header = "Authorization: Bearer " + chunk.bearer_token; + headers = curl_slist_append(headers, auth_header.c_str()); + } + curl_easy_setopt(chunk.curl, CURLOPT_HTTPHEADER, headers); + +#ifdef _WIN32 + curl_easy_setopt(chunk.curl, CURLOPT_SSL_OPTIONS, CURLSSLOPT_NATIVE_CA); +#endif + + // Perform the download + CURLcode res = curl_easy_perform(chunk.curl); + bool success = (res == CURLE_OK); + + if (success) { + long http_code = 0; + curl_easy_getinfo(chunk.curl, CURLINFO_RESPONSE_CODE, &http_code); + if (http_code < 200 || http_code >= 400) { + LOG_ERR("%s: chunk %zu failed with HTTP code %ld\n", __func__, chunk_idx, http_code); + success = false; + } else { + LOG_DBG("%s: chunk %zu completed successfully (HTTP %ld)\n", __func__, chunk_idx, http_code); + } + } else { + LOG_ERR("%s: chunk %zu failed: %s\n", __func__, chunk_idx, curl_easy_strerror(res)); + } + + // Cleanup + if (headers) { + curl_slist_free_all(headers); + } + fclose(chunk.file); + curl_easy_cleanup(chunk.curl); + + chunk.completed = success; + return success; + }, i)); + } + + // Wait for all chunks to complete + bool all_success = true; + for (size_t i = 0; i < futures.size(); ++i) { + if (!futures[i].get()) { + LOG_ERR("%s: chunk %zu failed\n", __func__, i); + all_success = false; + } + } + + if (!all_success) { + LOG_ERR("%s: one or more chunks failed to download\n", __func__); + // Clean up any partial chunk files + for (const auto & chunk : chunks) { + if (std::filesystem::exists(chunk.path_temporary)) { + std::filesystem::remove(chunk.path_temporary); + } + } + return false; + } + + // Combine chunks into final file + const std::string path_temporary = path + ".downloadInProgress"; + std::ofstream final_file(path_temporary, std::ios::binary); + if (!final_file) { + LOG_ERR("%s: failed to create final file %s\n", __func__, path_temporary.c_str()); + return false; + } + + LOG_INF("%s: combining %zu chunks into final file\n", __func__, chunks.size()); + for (size_t i = 0; i < chunks.size(); ++i) { + std::ifstream chunk_file(chunks[i].path_temporary, std::ios::binary); + if (!chunk_file) { + LOG_ERR("%s: failed to open chunk file %s for combining\n", __func__, chunks[i].path_temporary.c_str()); + final_file.close(); + std::filesystem::remove(path_temporary); + return false; + } + + // Copy chunk to final file + final_file << chunk_file.rdbuf(); + chunk_file.close(); + + // Verify chunk was written properly + if (final_file.fail()) { + LOG_ERR("%s: failed to write chunk %zu to final file\n", __func__, i); + final_file.close(); + std::filesystem::remove(path_temporary); + return false; + } + + // Remove chunk file after successful combination + std::filesystem::remove(chunks[i].path_temporary); + LOG_DBG("%s: combined and removed chunk %zu\n", __func__, i); + } + + final_file.close(); + LOG_INF("%s: multi-connection download completed successfully\n", __func__); + return true; +} + // download one single file from remote URL to local path static bool common_download_file_single(const std::string & url, const std::string & path, @@ -485,9 +712,10 @@ static bool common_download_file_single(const std::string & url, // Write the updated JSON metadata file. metadata.update({ - { "url", url }, - { "etag", headers.etag }, - { "lastModified", headers.last_modified } + { "url", url }, + { "etag", headers.etag }, + { "lastModified", headers.last_modified }, + { "contentLength", headers.content_length } }); write_file(metadata_path, metadata.dump(4)); LOG_DBG("%s: file metadata saved: %s\n", __func__, metadata_path.c_str()); @@ -496,7 +724,51 @@ static bool common_download_file_single(const std::string & url, LOG_INF("%s: trying to download model from %s to %s (server_etag:%s, server_last_modified:%s)...\n", __func__, llama_download_hide_password_in_url(url).c_str(), path_temporary.c_str(), headers.etag.c_str(), headers.last_modified.c_str()); - const bool was_pull_successful = common_pull_file(curl.get(), path_temporary); + + bool was_pull_successful = false; + + // Try multi-connection download if conditions are met + if (accept_ranges_supported && headers.content_length > 0 && should_download_from_scratch) { + LOG_INF("%s: server supports range requests with content length %ld bytes\n", __func__, headers.content_length); + + // Store chunk info in metadata for progress tracking + metadata["multiconn"] = { + {"content_length", headers.content_length}, + {"chunks_used", true}, + {"attempt_time", std::time(nullptr)} + }; + write_file(metadata_path, metadata.dump(4)); + + was_pull_successful = common_download_file_multiconn(url, path, bearer_token, headers.content_length); + + if (!was_pull_successful) { + LOG_WRN("%s: multi-connection download failed, falling back to single connection\n", __func__); + // Remove failed chunk metadata + metadata.erase("multiconn"); + write_file(metadata_path, metadata.dump(4)); + + // Clean up any remaining chunk files + try { + for (int i = 0; i < 10; ++i) { // Check up to 10 possible chunks + std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i); + if (std::filesystem::exists(chunk_path)) { + std::filesystem::remove(chunk_path); + LOG_DBG("%s: cleaned up chunk file %s\n", __func__, chunk_path.c_str()); + } + } + } catch (const std::exception& e) { + LOG_WRN("%s: error cleaning up chunk files: %s\n", __func__, e.what()); + } + } + } else { + LOG_DBG("%s: multi-connection download not attempted: accept_ranges=%d, content_length=%ld, from_scratch=%d\n", + __func__, accept_ranges_supported ? 1 : 0, headers.content_length, should_download_from_scratch ? 1 : 0); + } + + // Fall back to single-connection download if multi-connection failed or wasn't attempted + if (!was_pull_successful) { + was_pull_successful = common_pull_file(curl.get(), path_temporary); + } if (!was_pull_successful) { if (i + 1 < max_attempts) { const int exponential_backoff_delay = std::pow(retry_delay_seconds, i) * 1000;