Skip to content

Commit 49988d1

Browse files
laramielcopybara-github
authored andcommitted
Allow ForEachCoalescedRequest to accept ranges which have an unbounded max.
Currently this is used in two places, each of which already resolves unbounded max values prior to the call, however this may allow additional uses. PiperOrigin-RevId: 836797350 Change-Id: I5abb7a8489b57116d69d9d3cd9d8366953f0d77d
1 parent 1452f12 commit 49988d1

File tree

6 files changed

+503
-31
lines changed

6 files changed

+503
-31
lines changed

tensorstore/kvstore/BUILD

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,30 @@ tensorstore_cc_library(
397397
"//tensorstore/util:span",
398398
"@abseil-cpp//absl/container:inlined_vector",
399399
"@abseil-cpp//absl/log:absl_check",
400+
"@abseil-cpp//absl/status",
401+
"@abseil-cpp//absl/strings:cord",
400402
"@abseil-cpp//absl/synchronization",
401403
"@abseil-cpp//absl/time",
402404
],
403405
)
404406

407+
tensorstore_cc_test(
408+
name = "batch_util_test",
409+
srcs = ["batch_util_test.cc"],
410+
deps = [
411+
":batch_util",
412+
":byte_range",
413+
":kvstore",
414+
"//tensorstore/util:future",
415+
"//tensorstore/util:span",
416+
"//tensorstore/util:status_testutil",
417+
"@abseil-cpp//absl/status",
418+
"@abseil-cpp//absl/strings:cord",
419+
"@abseil-cpp//absl/time",
420+
"@googletest//:gtest_main",
421+
],
422+
)
423+
405424
tensorstore_cc_library(
406425
name = "common_metrics",
407426
hdrs = ["common_metrics.h"],

tensorstore/kvstore/batch_util.h

Lines changed: 87 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
#include "absl/container/inlined_vector.h"
3131
#include "absl/log/absl_check.h"
32+
#include "absl/status/status.h"
33+
#include "absl/strings/cord.h"
3234
#include "absl/synchronization/mutex.h"
3335
#include "absl/time/time.h"
3436
#include "tensorstore/batch.h"
@@ -272,6 +274,9 @@ template <typename Request>
272274
void SortRequestsByStartByte(tensorstore::span<Request> requests) {
273275
std::sort(requests.begin(), requests.end(),
274276
[](const Request& a, const Request& b) {
277+
if (a.byte_range.inclusive_min == b.byte_range.inclusive_min) {
278+
return a.byte_range.exclusive_max < b.byte_range.exclusive_max;
279+
}
275280
return a.byte_range.inclusive_min < b.byte_range.inclusive_min;
276281
});
277282
}
@@ -281,56 +286,116 @@ template <typename Request>
281286
void ResolveCoalescedRequests(ByteRange coalesced_byte_range,
282287
tensorstore::span<Request> coalesced_requests,
283288
kvstore::ReadResult&& read_result) {
284-
for (auto& request : coalesced_requests) {
289+
static_assert(IsByteRangeReadRequestLikeV<Request>);
290+
if (read_result.state == kvstore::ReadResult::kValue) {
291+
ABSL_DCHECK_EQ(coalesced_byte_range.size(), read_result.value.size());
292+
}
293+
294+
for (auto& r : coalesced_requests) {
285295
kvstore::ReadResult sub_read_result;
286296
sub_read_result.stamp = read_result.stamp;
287297
sub_read_result.state = read_result.state;
288298
if (read_result.state == kvstore::ReadResult::kValue) {
289-
ABSL_DCHECK_EQ(coalesced_byte_range.size(), read_result.value.size());
290-
int64_t request_start =
291-
request.byte_range.inclusive_min - coalesced_byte_range.inclusive_min;
292-
int64_t request_size = request.byte_range.size();
293-
sub_read_result.value =
294-
read_result.value.Subcord(request_start, request_size);
299+
int64_t inclusive_min =
300+
r.byte_range.inclusive_min >= 0
301+
? r.byte_range.inclusive_min
302+
: coalesced_byte_range.exclusive_max + r.byte_range.inclusive_min;
303+
304+
int64_t exclusive_max = r.byte_range.exclusive_max == -1
305+
? coalesced_byte_range.exclusive_max
306+
: r.byte_range.exclusive_max;
307+
if (inclusive_min == exclusive_max) {
308+
// Satisfy 0-size reads in all valid cases.
309+
sub_read_result.value = absl::Cord();
310+
} else if (inclusive_min < coalesced_byte_range.inclusive_min ||
311+
exclusive_max > coalesced_byte_range.exclusive_max ||
312+
inclusive_min >= coalesced_byte_range.exclusive_max) {
313+
r.promise.SetResult(absl::OutOfRangeError(
314+
tensorstore::StrCat("Requested byte range ", r.byte_range,
315+
" is not valid for returned value of size ",
316+
read_result.value.size(), " with byte range ",
317+
coalesced_byte_range)));
318+
continue;
319+
} else {
320+
sub_read_result.value = read_result.value.Subcord(
321+
inclusive_min - coalesced_byte_range.inclusive_min,
322+
exclusive_max - inclusive_min);
323+
}
295324
}
296-
request.promise.SetResult(std::move(sub_read_result));
325+
r.promise.SetResult(std::move(sub_read_result));
297326
}
298327
}
299328

300329
// Determines a set of coalesced requests that will satisfy all requests in
301330
// `requests`.
302331
//
303-
// \param requests Requests to attempt to coalesce. All byte ranges must have
304-
// already been resolved and satisfy `OptionalByteRangeRequest::IsRange()`.
332+
// \param requests Requests to attempt to coalesce. When the input ranges have
333+
// already been resolved and satisfy `OptionalByteRangeRequest::IsRange()`,
334+
// then the output ranges will also satisfy `IsRange()`.
305335
// \param predicate Function with signature `bool (ByteRange
306336
// coalesced_byte_range, int64_t next_inclusive_min)` that determines
307337
// whether an additional non-overlapping byte range starting at the
308338
// specified offset should be coalesced with an existing (possibly
309339
// coalesced) byte range. Overlapping byte ranges are always coalesced.
310340
// Commonly a `CoalescingOptions` object may be specified as the predicate.
311-
// \param callback Callback with signature `void (ByteRange
312-
// coalesced_byte_range, span<Request> coalesced_requests)` to be invoked
313-
// for each coalesced set of requests.
341+
// \param callback Callback with signature `void (OptionalByteRangeRequest
342+
// coalesced_byte_range, tensorstore::span<Request> coalesced_requests)` to
343+
// be invoked for each coalesced set of requests.
314344
template <typename Request, typename Predicate, typename Callback>
315345
void ForEachCoalescedRequest(tensorstore::span<Request> requests,
316346
Predicate predicate, Callback callback) {
317-
static_assert(IsByteRangeReadRequestLikeV<Request>);
347+
static_assert(std::is_invocable_v<Predicate, ByteRange, int64_t>);
348+
static_assert(std::is_invocable_v<Callback, OptionalByteRangeRequest,
349+
tensorstore::span<Request>>);
318350

319351
SortRequestsByStartByte(requests);
320352

353+
// Find the first non-suffix request.
321354
size_t request_i = 0;
355+
for (request_i = 0; request_i < requests.size(); ++request_i) {
356+
if (!requests[request_i].byte_range.IsSuffixLength()) {
357+
break;
358+
}
359+
}
360+
361+
// If the first request is a full request, then all requests may be issued
362+
// as a combined request for the full range.
363+
if (request_i < requests.size() && requests[request_i].byte_range.IsFull()) {
364+
OptionalByteRangeRequest full_byte_range;
365+
callback(full_byte_range, requests);
366+
return;
367+
}
368+
369+
// Otherwise all suffix requests can be issued together, but they cannot be
370+
// coalesced with other requests.
371+
if (request_i != 0) {
372+
OptionalByteRangeRequest coalesced_byte_range = requests[0].byte_range;
373+
callback(coalesced_byte_range, requests.subspan(0, request_i));
374+
}
375+
322376
while (request_i < requests.size()) {
323-
auto coalesced_byte_range = requests[request_i].byte_range.AsByteRange();
377+
// Coalesce overlapping requests.
378+
OptionalByteRangeRequest coalesced_byte_range =
379+
requests[request_i].byte_range;
380+
324381
size_t end_request_i;
325382
for (end_request_i = request_i + 1; end_request_i < requests.size();
326383
++end_request_i) {
327-
auto next_byte_range = requests[end_request_i].byte_range.AsByteRange();
328-
if (next_byte_range.inclusive_min < coalesced_byte_range.exclusive_max ||
329-
predicate(coalesced_byte_range, next_byte_range.inclusive_min)) {
384+
if (coalesced_byte_range.exclusive_max == -1) {
385+
end_request_i = requests.size();
386+
break;
387+
}
388+
auto next_byte_range = requests[end_request_i].byte_range;
389+
if (next_byte_range.inclusive_min >= coalesced_byte_range.exclusive_max &&
390+
!predicate(coalesced_byte_range.AsByteRange(),
391+
next_byte_range.inclusive_min)) {
392+
break;
393+
}
394+
if (next_byte_range.IsRange()) {
330395
coalesced_byte_range.exclusive_max = std::max(
331396
coalesced_byte_range.exclusive_max, next_byte_range.exclusive_max);
332397
} else {
333-
break;
398+
coalesced_byte_range.exclusive_max = -1;
334399
}
335400
}
336401
callback(coalesced_byte_range,
@@ -406,9 +471,9 @@ struct CoalescingOptions {
406471
// Maximum target size for coalescing. Once this size limit is reached,
407472
// additional non-overlapping requests won't be added. However, this limit may
408473
// still be exceeded if an individual request, or set of overlapping requests,
409-
// exceeds this size. This can be set to balance per-request overhead with
410-
// additional parallelism that may be obtained from a greater number of
411-
// requests.
474+
// exceeds this size, or when reading to the end of a file.
475+
// This can be set to balance per-request overhead with additional parallelism
476+
// that may be obtained from a greater number of requests.
412477
int64_t target_coalesced_size = std::numeric_limits<int64_t>::max();
413478

414479
// Checks if a new byte range starting at `next_inclusive_min` should be

0 commit comments

Comments
 (0)