Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 184 additions & 21 deletions common/arg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
#endif

#define JSON_ASSERT GGML_ASSERT
#include <nlohmann/json.hpp>

#include <algorithm>
#include <climits>
#include <cstdarg>
#include <filesystem>
#include <fstream>
#include <list>
#include <mutex>
#include <nlohmann/json.hpp>
#include <regex>
#include <set>
#include <string>
Expand All @@ -37,6 +37,10 @@
#include <curl/curl.h>
#include <curl/easy.h>
#include <future>
# ifndef _WIN32
# include <sys/ioctl.h>
# include <unistd.h>
# endif
#endif

using json = nlohmann::ordered_json;
Expand Down Expand Up @@ -375,6 +379,136 @@ static bool common_download_head(CURL * curl,
return common_curl_perf(curl) == CURLE_OK;
}

// Shared progress state for multi-connection downloads
struct common_multiconn_progress {
std::mutex progress_mutex;
long total_content_length;
std::vector<long> chunk_downloaded_bytes;
std::chrono::steady_clock::time_point start_time;
bool progress_enabled;
bool printed;

common_multiconn_progress(long content_length, int num_chunks, bool enable_progress) :
total_content_length(content_length),
chunk_downloaded_bytes(num_chunks, 0),
start_time(std::chrono::steady_clock::now()),
progress_enabled(enable_progress),
printed(false) {}

void update_chunk_progress(int chunk_idx, long downloaded_bytes) {
if (!progress_enabled) {
return;
}

std::lock_guard<std::mutex> lock(progress_mutex);
chunk_downloaded_bytes[chunk_idx] = downloaded_bytes;

// Calculate total downloaded across all chunks
long total_downloaded = 0;
for (long bytes : chunk_downloaded_bytes) {
total_downloaded += bytes;
}

display_progress(total_downloaded);
}

private:
void display_progress(long total_downloaded) {
if (total_content_length <= 0) {
return;
}

// Calculate percentage
const long percentage = (total_downloaded * 100) / total_content_length;

// Calculate speed
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count();
double speed = 0.0;
if (elapsed > 0) {
speed = (total_downloaded * 1000.0) / elapsed; // bytes per second
}

// Calculate ETA
double eta = 0.0;
if (speed > 0) {
eta = (total_content_length - total_downloaded) / speed;
}

// Format progress display
std::string progress_prefix = string_format("%3ld%%", percentage);

// Format size display
auto format_size = [](long bytes) -> std::string {
const char * units[] = { "B", "KB", "MB", "GB" };
double size = bytes;
int unit_idx = 0;
while (size >= 1024.0 && unit_idx < 3) {
size /= 1024.0;
unit_idx++;
}
return string_format("%.1f%s", size, units[unit_idx]);
};

// Format speed display
auto format_speed = [&](double bytes_per_sec) -> std::string {
const char * units[] = { "B/s", "KB/s", "MB/s", "GB/s" };
double speed = bytes_per_sec;
int unit_idx = 0;
while (speed >= 1024.0 && unit_idx < 3) {
speed /= 1024.0;
unit_idx++;
}
return string_format("%.1f%s", speed, units[unit_idx]);
};

std::string progress_suffix =
string_format("%s/%s %s ETA:%ds", format_size(total_downloaded).c_str(),
format_size(total_content_length).c_str(), format_speed(speed).c_str(), (int) eta);

// Calculate terminal width and progress bar
int terminal_width = 80; // Default fallback
# ifndef _WIN32
struct winsize w;
if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &w) == 0) {
terminal_width = w.ws_col;
}
# endif

int prefix_suffix_len = progress_prefix.length() + progress_suffix.length() + 4; // " | "
int progress_bar_width = std::max(10, terminal_width - prefix_suffix_len);

// Generate progress bar
std::string progress_bar;
const long pos = (percentage * progress_bar_width) / 100;
for (int i = 0; i < progress_bar_width; ++i) {
progress_bar.append((i < pos) ? "█" : " ");
}

// Print progress (clear line and print new progress)
fprintf(stderr, "\r%s %s| %s", progress_prefix.c_str(), progress_bar.c_str(), progress_suffix.c_str());
fflush(stderr);
printed = true;
}
};

// Progress callback for individual chunks
static size_t common_multiconn_progress_callback(void * ptr,
curl_off_t,
curl_off_t now_downloaded,
curl_off_t,
curl_off_t) {
auto * progress_info = static_cast<std::pair<common_multiconn_progress *, int> *>(ptr);
common_multiconn_progress * progress = progress_info->first;
int chunk_idx = progress_info->second;

if (progress) {
progress->update_chunk_progress(chunk_idx, now_downloaded);
}

return 0;
}

// Structure to manage a single download chunk
struct common_download_chunk {
std::string url;
Expand All @@ -386,22 +520,34 @@ struct common_download_chunk {
curl_ptr curl;
std::unique_ptr<FILE, FILE_deleter> file;
bool completed;

common_download_chunk(const std::string & url_,
const std::string & bearer_token_,
const std::string & path_,
long start, long end)
: url(url_), bearer_token(bearer_token_), path_temporary(path_),
start_byte(start), end_byte(end), downloaded_bytes(0),
curl(nullptr, &curl_easy_cleanup), file(nullptr), completed(false) {}
std::pair<common_multiconn_progress *, int> progress_info;

common_download_chunk(const std::string & url_,
const std::string & bearer_token_,
const std::string & path_,
long start,
long end,
common_multiconn_progress * progress,
int chunk_idx) :
url(url_),
bearer_token(bearer_token_),
path_temporary(path_),
start_byte(start),
end_byte(end),
downloaded_bytes(0),
curl(nullptr, &curl_easy_cleanup),
file(nullptr),
completed(false),
progress_info(progress, chunk_idx) {}
};

// Multi-connection download manager
static bool common_download_file_multiconn(const std::string & url,
const std::string & path,
const std::string & bearer_token,
long content_length,
int num_connections = 4) {
long content_length,
bool enable_progress = true,
int num_connections = 4) {
// Minimum chunk size (2MB) - don't use multi-connection for small files
const long min_chunk_size = 2 * 1024 * 1024;
const long min_file_size = min_chunk_size * 2; // Minimum file size to enable multi-connection
Expand All @@ -423,7 +569,10 @@ static bool common_download_file_multiconn(const std::string & url,

LOG_INF("%s: starting multi-connection download with %d connections for %ld bytes (chunk size: %ld)\n",
__func__, num_connections, content_length, content_length / num_connections);


// Create shared progress tracker
common_multiconn_progress progress_tracker(content_length, num_connections, enable_progress);

std::vector<common_download_chunk> chunks;
std::vector<std::future<bool>> futures;

Expand All @@ -438,7 +587,7 @@ static bool common_download_file_multiconn(const std::string & url,
}

std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i);
chunks.emplace_back(url, bearer_token, chunk_path, start, end);
chunks.emplace_back(url, bearer_token, chunk_path, start, end, &progress_tracker, i);
}

// Download chunks in parallel
Expand Down Expand Up @@ -473,8 +622,16 @@ static bool common_download_file_multiconn(const std::string & url,
curl_easy_setopt(chunk.curl.get(), CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(chunk.curl.get(), CURLOPT_WRITEDATA, chunk.file.get());
curl_easy_setopt(chunk.curl.get(), CURLOPT_WRITEFUNCTION, common_write_callback);
curl_easy_setopt(chunk.curl.get(), CURLOPT_NOPROGRESS, 1L); // Disable progress per chunk


// Set up progress callback
if (enable_progress) {
curl_easy_setopt(chunk.curl.get(), CURLOPT_NOPROGRESS, 0L);
curl_easy_setopt(chunk.curl.get(), CURLOPT_XFERINFOFUNCTION, common_multiconn_progress_callback);
curl_easy_setopt(chunk.curl.get(), CURLOPT_XFERINFODATA, &chunk.progress_info);
} else {
curl_easy_setopt(chunk.curl.get(), CURLOPT_NOPROGRESS, 1L);
}

// Set range for this chunk
long actual_start = chunk.start_byte + resume_from;
if (actual_start <= chunk.end_byte) {
Expand All @@ -497,7 +654,7 @@ static bool common_download_file_multiconn(const std::string & url,
headers = curl_slist_append(headers, auth_header.c_str());
}
curl_easy_setopt(chunk.curl.get(), CURLOPT_HTTPHEADER, headers);

#ifdef _WIN32
curl_easy_setopt(chunk.curl.get(), CURLOPT_SSL_OPTIONS, CURLSSLOPT_NATIVE_CA);
#endif
Expand Down Expand Up @@ -549,7 +706,12 @@ static bool common_download_file_multiconn(const std::string & url,
}
return false;
}


// Clear progress line if we were showing progress
if (enable_progress && progress_tracker.printed) {
fprintf(stderr, "\n");
}

// Combine chunks into final file
const std::string path_temporary = path + ".downloadInProgress";
std::ofstream final_file(path_temporary, std::ios::binary);
Expand Down Expand Up @@ -734,9 +896,10 @@ static bool common_download_file_single(const std::string & url,
{"attempt_time", std::time(nullptr)}
};
write_file(metadata_path, metadata.dump(4));

was_pull_successful = common_download_file_multiconn(url, path, bearer_token, headers.content_length);


was_pull_successful =
common_download_file_multiconn(url, path, bearer_token, headers.content_length, true);

if (!was_pull_successful) {
LOG_WRN("%s: multi-connection download failed, falling back to single connection\n", __func__);
// Remove failed chunk metadata
Expand Down
Loading