Skip to content

Commit d07cfc0

Browse files
committed
casync-http: add max-active-chunks option
This commit introduces a new option --max-active-chunks=<MAX> that limits the number of simultaneous chunk transfers from the remote. The MAX number is the sum of: 1. the number of chunks added to the cURL multi interface, and 2. chunks downloaded and waiting to be sent to the remote. It limits the number of chunks stored in memory that are ready to be sent to the casync process; this limit the memory usage in the situation where the network is faster than the pipe communication between the helper and casync.
1 parent 1d05b93 commit d07cfc0

File tree

7 files changed

+127
-1
lines changed

7 files changed

+127
-1
lines changed

doc/casync.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ General options:
164164
--cache=<PATH> Directory to use as encoder cache
165165
--cache-auto, -c Pick encoder cache directory automatically
166166
--rate-limit-bps=<LIMIT> Maximum bandwidth in bytes/s for remote communication
167+
--max-active-chunks=<MAX> Maximum number of simultaneously active chunks for remote communication
167168
--exclude-nodump=no Don't exclude files with chattr(1)'s +d **nodump** flag when creating archive
168169
--exclude-submounts=yes Exclude submounts when creating archive
169170
--exclude-file=no Don't respect .caexclude files in the file tree

src/caremote.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ struct CaRemote {
6060

6161
int log_level;
6262
uint64_t rate_limit_bps;
63+
unsigned max_active_chunks;
6364

6465
ReallocBuffer input_buffer;
6566
ReallocBuffer output_buffer;
@@ -248,6 +249,15 @@ int ca_remote_set_rate_limit_bps(CaRemote *rr, uint64_t rate_limit_bps) {
248249
return 0;
249250
}
250251

252+
int ca_remote_set_max_active_chunks(CaRemote *rr, unsigned max_active_chunks) {
253+
if (!rr)
254+
return -EINVAL;
255+
256+
rr->max_active_chunks = max_active_chunks;
257+
258+
return 0;
259+
}
260+
251261
int ca_remote_set_local_feature_flags(CaRemote *rr, uint64_t flags) {
252262
if (!rr)
253263
return -EINVAL;
@@ -1001,6 +1011,9 @@ static int ca_remote_start(CaRemote *rr) {
10011011
if (rr->rate_limit_bps != UINT64_MAX)
10021012
argc++;
10031013

1014+
if (rr->max_active_chunks)
1015+
argc++;
1016+
10041017
args = newa(char*, argc + 1);
10051018

10061019
if (rr->callout) {
@@ -1046,6 +1059,14 @@ static int ca_remote_start(CaRemote *rr) {
10461059
i++;
10471060
}
10481061

1062+
if (rr->max_active_chunks) {
1063+
r = asprintf(args + i, "--max-active-chunks=%u", rr->max_active_chunks);
1064+
if (r < 0)
1065+
return log_oom();
1066+
1067+
i++;
1068+
}
1069+
10491070
args[i + CA_REMOTE_ARG_OPERATION] = (char*) ((rr->local_feature_flags & (CA_PROTOCOL_PUSH_CHUNKS|CA_PROTOCOL_PUSH_INDEX|CA_PROTOCOL_PUSH_ARCHIVE)) ? "push" : "pull");
10501071
args[i + CA_REMOTE_ARG_BASE_URL] = /* rr->base_url ? rr->base_url + skip :*/ (char*) "-";
10511072
args[i + CA_REMOTE_ARG_ARCHIVE_URL] = rr->archive_url ? rr->archive_url + skip : (char*) "-";

src/caremote.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ int ca_remote_get_digest_type(CaRemote *rr, CaDigestType *ret);
5252

5353
int ca_remote_set_log_level(CaRemote *rr, int log_level);
5454
int ca_remote_set_rate_limit_bps(CaRemote *rr, uint64_t rate_limit_bps);
55+
int ca_remote_set_max_active_chunks(CaRemote *rr, unsigned max_active_chunks);
5556

5657
int ca_remote_set_io_fds(CaRemote *rr, int input_fd, int output_fd);
5758
int ca_remote_get_io_fds(CaRemote *rr, int *ret_input_fd, int *ret_output_fd);

src/casync-http.c

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,38 @@
1313
#include "util.h"
1414
#include "list.h"
1515

16+
/* The maximum number of active chunks is defined as the sum of:
17+
* - number of chunks added to curl multi for download
18+
* - number of chunks downloaded, and waiting to be sent to remote
19+
*
20+
* In situations where the server is local and super fast (ie. we receive chunks
21+
* faster than we can send them to the remote), around 95% of the active chunks
22+
* are chunks waiting to be sent to remote, hence this number can be seen as
23+
* "maximum number of chunks sitting in ram".
24+
*
25+
* In situations where the server is away, around 95% of the active chunks are
26+
* chunks added to curl multi. It doesn't mean "being downloaded" though, it's more
27+
* a "maximum limit for concurrent downloads". The real number of running downloads
28+
* might be lower, because:
29+
* - if we're doing HTTP/1 and parallel connections, the hard limit is actually
30+
* defined by `MAX_HOST_CONNECTIONS`.
31+
* - if we're doing HTTP/2 over a multiplexed connection, the number of parallel
32+
* streams is negociated between client and server.
33+
*
34+
* In effect, *I think* it's best to make this number quite large, because we
35+
* don't want to underfeed libcurl and underperform. I think it's better to feed
36+
* too many handles to the curl multi, and let libcurl decide internally what's
37+
* best to do with it. Libcurl knows every details about the HTTP connection and
38+
* will handle (parallel/multiplex/whatever) downloads better than us.
39+
*/
40+
#define MAX_ACTIVE_CHUNKS 64
41+
1642
static volatile sig_atomic_t quit = false;
1743

1844
static int arg_log_level = -1;
1945
static bool arg_verbose = false;
2046
static curl_off_t arg_rate_limit_bps = 0;
47+
static unsigned arg_max_active_chunks = MAX_ACTIVE_CHUNKS;
2148
static bool arg_ssl_trust_peer = false;
2249

2350
typedef enum Protocol {
@@ -506,7 +533,7 @@ static CaChunkDownloader *ca_chunk_downloader_new(CaRemote *rr, const char *stor
506533
if (!dl->completed)
507534
goto fail;
508535

509-
for (i = 0; i < 64; i++) {
536+
for (i = 0; i < arg_max_active_chunks; i++) {
510537
CURL *handle = NULL;
511538
ChunkData *cd = NULL;
512539

@@ -1203,6 +1230,7 @@ static int parse_argv(int argc, char *argv[]) {
12031230

12041231
enum {
12051232
ARG_RATE_LIMIT_BPS = 0x100,
1233+
ARG_MAX_ACTIVE_CHUNKS,
12061234
ARG_SSL_TRUST_PEER,
12071235
};
12081236

@@ -1211,6 +1239,7 @@ static int parse_argv(int argc, char *argv[]) {
12111239
{ "log-level", required_argument, NULL, 'l' },
12121240
{ "verbose", no_argument, NULL, 'v' },
12131241
{ "rate-limit-bps", required_argument, NULL, ARG_RATE_LIMIT_BPS },
1242+
{ "max-active-chunks", required_argument, NULL, ARG_MAX_ACTIVE_CHUNKS },
12141243
{ "ssl-trust-peer", no_argument, NULL, ARG_SSL_TRUST_PEER },
12151244
{}
12161245
};
@@ -1261,6 +1290,14 @@ static int parse_argv(int argc, char *argv[]) {
12611290
arg_rate_limit_bps = strtoll(optarg, NULL, 10);
12621291
break;
12631292

1293+
case ARG_MAX_ACTIVE_CHUNKS:
1294+
r = safe_atou(optarg, &arg_max_active_chunks);
1295+
if (r < 0 || arg_max_active_chunks == 0) {
1296+
log_error("Invalid value for max-active-chunks, refusing");
1297+
return -EINVAL;
1298+
}
1299+
break;
1300+
12641301
case ARG_SSL_TRUST_PEER:
12651302
arg_ssl_trust_peer = true;
12661303
break;

src/casync-tool.c

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ static size_t arg_chunk_size_min = 0;
6666
static size_t arg_chunk_size_avg = 0;
6767
static size_t arg_chunk_size_max = 0;
6868
static uint64_t arg_rate_limit_bps = UINT64_MAX;
69+
static unsigned arg_max_active_chunks = 0;
6970
static uint64_t arg_with = 0;
7071
static uint64_t arg_without = 0;
7172
static uid_t arg_uid_shift = 0, arg_uid_range = 0x10000U;
@@ -107,6 +108,8 @@ static void help(void) {
107108
" -c --cache-auto Pick encoder cache directory automatically\n"
108109
" --rate-limit-bps=LIMIT Maximum bandwidth in bytes/s for remote\n"
109110
" communication\n"
111+
" --max-active-chunks=MAX Maximum number of simultaneously active chunks for\n"
112+
" remote communication\n"
110113
" --exclude-nodump=no Don't exclude files with chattr(1)'s +d 'nodump'\n"
111114
" flag when creating archive\n"
112115
" --exclude-submounts=yes Exclude submounts when creating archive\n"
@@ -328,6 +331,7 @@ static int parse_argv(int argc, char *argv[]) {
328331
ARG_SEED,
329332
ARG_CACHE,
330333
ARG_RATE_LIMIT_BPS,
334+
ARG_MAX_ACTIVE_CHUNKS,
331335
ARG_WITH,
332336
ARG_WITHOUT,
333337
ARG_WHAT,
@@ -362,6 +366,7 @@ static int parse_argv(int argc, char *argv[]) {
362366
{ "cache", required_argument, NULL, ARG_CACHE },
363367
{ "cache-auto", no_argument, NULL, 'c' },
364368
{ "rate-limit-bps", required_argument, NULL, ARG_RATE_LIMIT_BPS },
369+
{ "max-active-chunks", required_argument, NULL, ARG_MAX_ACTIVE_CHUNKS },
365370
{ "with", required_argument, NULL, ARG_WITH },
366371
{ "without", required_argument, NULL, ARG_WITHOUT },
367372
{ "what", required_argument, NULL, ARG_WHAT },
@@ -475,6 +480,14 @@ static int parse_argv(int argc, char *argv[]) {
475480

476481
break;
477482

483+
case ARG_MAX_ACTIVE_CHUNKS:
484+
r = safe_atou(optarg, &arg_max_active_chunks);
485+
if (r < 0) {
486+
log_error("Failed to parse --max-active-chunks= value %s", optarg);
487+
return -EINVAL;
488+
}
489+
break;
490+
478491
case ARG_WITH: {
479492
uint64_t u;
480493

@@ -1330,6 +1343,12 @@ static int verb_make(int argc, char *argv[]) {
13301343
return log_error_errno(r, "Failed to set rate limit: %m");
13311344
}
13321345

1346+
if (arg_max_active_chunks) {
1347+
r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks);
1348+
if (r < 0)
1349+
return log_error_errno(r, "Failed to set max active chunks: %m");
1350+
}
1351+
13331352
r = ca_sync_set_base_fd(s, input_fd);
13341353
if (r < 0)
13351354
return log_error_errno(r, "Failed to set sync base: %m");
@@ -1635,6 +1654,12 @@ static int verb_extract(int argc, char *argv[]) {
16351654
return log_error_errno(r, "Failed to set rate limit: %m");
16361655
}
16371656

1657+
if (arg_max_active_chunks) {
1658+
r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks);
1659+
if (r < 0)
1660+
return log_error_errno(r, "Failed to set max active chunks: %m");
1661+
}
1662+
16381663
if (seek_path) {
16391664
if (output_fd >= 0)
16401665
r = ca_sync_set_boundary_fd(s, output_fd);
@@ -2796,6 +2821,12 @@ static int verb_mount(int argc, char *argv[]) {
27962821
return log_error_errno(r, "Failed to set rate limit: %m");
27972822
}
27982823

2824+
if (arg_max_active_chunks) {
2825+
r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks);
2826+
if (r < 0)
2827+
return log_error_errno(r, "Failed to set max active chunks: %m");
2828+
}
2829+
27992830
if (operation == MOUNT_ARCHIVE) {
28002831
if (input_fd >= 0)
28012832
r = ca_sync_set_archive_fd(s, input_fd);
@@ -2922,6 +2953,12 @@ static int verb_mkdev(int argc, char *argv[]) {
29222953
return log_error_errno(r, "Failed to set rate limit: %m");
29232954
}
29242955

2956+
if (arg_max_active_chunks) {
2957+
r = ca_sync_set_max_active_chunks(s, arg_max_active_chunks);
2958+
if (r < 0)
2959+
return log_error_errno(r, "Failed to set max active chunks: %m");
2960+
}
2961+
29252962
if (operation == MKDEV_BLOB) {
29262963
if (input_fd >= 0)
29272964
r = ca_sync_set_archive_fd(s, input_fd);
@@ -3489,6 +3526,12 @@ static int verb_pull(int argc, char *argv[]) {
34893526
return log_error_errno(r, "Failed to set rate limit: %m");
34903527
}
34913528

3529+
if (arg_max_active_chunks) {
3530+
r = ca_remote_set_max_active_chunks(rr, arg_max_active_chunks);
3531+
if (r < 0)
3532+
return log_error_errno(r, "Failed to set max active chunks: %m");
3533+
}
3534+
34923535
r = ca_remote_set_io_fds(rr, STDIN_FILENO, STDOUT_FILENO);
34933536
if (r < 0)
34943537
return log_error_errno(r, "Failed to set I/O file descriptors: %m");
@@ -3648,6 +3691,12 @@ static int verb_push(int argc, char *argv[]) {
36483691
log_error_errno(r, "Failed to set rate limit: %m");
36493692
}
36503693

3694+
if (arg_max_active_chunks) {
3695+
r = ca_remote_set_max_active_chunks(rr, arg_max_active_chunks);
3696+
if (r < 0)
3697+
return log_error_errno(r, "Failed to set max active chunks: %m");
3698+
}
3699+
36513700
r = ca_remote_set_io_fds(rr, STDIN_FILENO, STDOUT_FILENO);
36523701
if (r < 0)
36533702
log_error_errno(r, "Failed to set I/O file descriptors: %m");

src/casync.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ struct CaSync {
112112

113113
int log_level;
114114
size_t rate_limit_bps;
115+
unsigned max_active_chunks;
115116

116117
uint64_t feature_flags;
117118
uint64_t feature_flags_mask;
@@ -522,6 +523,15 @@ int ca_sync_set_log_level(CaSync *s, int log_level) {
522523
return 0;
523524
}
524525

526+
int ca_sync_set_max_active_chunks(CaSync *s, unsigned max_active_chunks) {
527+
if (!s)
528+
return -EINVAL;
529+
530+
s->max_active_chunks = max_active_chunks;
531+
532+
return 0;
533+
}
534+
525535
int ca_sync_set_rate_limit_bps(CaSync *s, uint64_t rate_limit_bps) {
526536
if (!s)
527537
return -EINVAL;
@@ -694,6 +704,12 @@ int ca_sync_set_index_remote(CaSync *s, const char *url) {
694704
return r;
695705
}
696706

707+
if (s->max_active_chunks > 0) {
708+
r = ca_remote_set_max_active_chunks(s->remote_index, s->max_active_chunks);
709+
if (r < 0)
710+
return r;
711+
}
712+
697713
r = ca_remote_set_index_url(s->remote_index, url);
698714
if (r < 0)
699715
return r;

src/casync.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(CaSync *, ca_sync_unref);
3333

3434
int ca_sync_set_log_level(CaSync *s, int log_level);
3535
int ca_sync_set_rate_limit_bps(CaSync *s, uint64_t rate_limit_bps);
36+
int ca_sync_set_max_active_chunks(CaSync *s, unsigned max_active_chunks);
3637

3738
int ca_sync_set_feature_flags(CaSync *s, uint64_t flags);
3839
int ca_sync_get_feature_flags(CaSync *s, uint64_t *ret);

0 commit comments

Comments
 (0)