Skip to content

Commit 60cb50a

Browse files
jbmscopybara-github
authored andcommitted
Implement transactional List support
This enables querying storage statistics before committing a transaction. PiperOrigin-RevId: 750757186 Change-Id: I0f9c655748d948926b1f131eb2e36c3a8d2d552e
1 parent 37e3560 commit 60cb50a

File tree

21 files changed

+1080
-78
lines changed

21 files changed

+1080
-78
lines changed

tensorstore/driver/zarr3/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ tensorstore_cc_test(
309309
"//tensorstore:open",
310310
"//tensorstore:open_mode",
311311
"//tensorstore:schema",
312+
"//tensorstore:transaction",
312313
"//tensorstore/index_space:dim_expression",
313314
"//tensorstore/internal:json_gtest",
314315
"//tensorstore/kvstore",

tensorstore/driver/zarr3/storage_statistics_test.cc

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "tensorstore/open_mode.h"
3333
#include "tensorstore/schema.h"
3434
#include "tensorstore/tensorstore.h"
35+
#include "tensorstore/transaction.h"
3536
#include "tensorstore/util/result.h"
3637
#include "tensorstore/util/status_testutil.h"
3738

@@ -199,6 +200,143 @@ TEST_F(StorageStatisticsTest, FullyLexicographicOrder) {
199200
}
200201
}
201202

203+
TEST_F(StorageStatisticsTest, FullyLexicographicOrderTransactional) {
204+
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
205+
auto store, tensorstore::Open(
206+
{
207+
{"driver", "zarr3"},
208+
{"kvstore", {{"driver", "mock_key_value_store"}}},
209+
{"store_data_equal_to_fill_value", true},
210+
},
211+
Schema::Shape({100, 200, 300}), dtype_v<uint8_t>,
212+
ChunkLayout::ChunkShape({10, 20, 30}), context,
213+
tensorstore::OpenMode::create)
214+
.result());
215+
216+
auto transaction = tensorstore::Transaction(tensorstore::isolated);
217+
218+
mock_kvstore->request_log.pop_all();
219+
{
220+
auto transformed =
221+
store | transaction |
222+
tensorstore::AllDims().HalfOpenInterval({1, 1, 1}, {20, 5, 5});
223+
EXPECT_THAT(tensorstore::GetStorageStatistics(
224+
transformed, ArrayStorageStatistics::query_not_stored)
225+
.result(),
226+
::testing::Optional(ArrayStorageStatistics{
227+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
228+
/*.not_stored=*/true}));
229+
// Individual "stat" requests for the 2 chunks since they can't be
230+
// combined into a single range.
231+
EXPECT_THAT(mock_kvstore->request_log.pop_all(),
232+
::testing::UnorderedElementsAreArray({
233+
JsonSubValuesMatch({{"/type", "read"},
234+
{"/key", "c/0/0/0"},
235+
{"/byte_range_exclusive_max", 0}}),
236+
JsonSubValuesMatch({{"/type", "read"},
237+
{"/key", "c/1/0/0"},
238+
{"/byte_range_exclusive_max", 0}}),
239+
}));
240+
TENSORSTORE_ASSERT_OK(
241+
tensorstore::Write(tensorstore::MakeScalarArray<uint8_t>(42),
242+
transformed)
243+
.result());
244+
mock_kvstore->request_log.pop_all();
245+
// Unconditionally overwritten within the transaction. No I/O is
246+
// needed for subsequent storage requests.
247+
EXPECT_THAT(tensorstore::GetStorageStatistics(
248+
transformed, ArrayStorageStatistics::query_not_stored)
249+
.result(),
250+
::testing::Optional(ArrayStorageStatistics{
251+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
252+
/*.not_stored=*/false}));
253+
EXPECT_THAT(mock_kvstore->request_log.pop_all(), ::testing::ElementsAre());
254+
EXPECT_THAT(tensorstore::GetStorageStatistics(
255+
transformed, ArrayStorageStatistics::query_not_stored,
256+
ArrayStorageStatistics::query_fully_stored)
257+
.result(),
258+
::testing::Optional(ArrayStorageStatistics{
259+
/*.mask=*/ArrayStorageStatistics::query_not_stored |
260+
ArrayStorageStatistics::query_fully_stored,
261+
/*.not_stored=*/false, /*.fully_stored=*/true}));
262+
EXPECT_THAT(mock_kvstore->request_log.pop_all(), ::testing::ElementsAre());
263+
}
264+
265+
// Test listing entire array
266+
{
267+
EXPECT_THAT(
268+
tensorstore::GetStorageStatistics(
269+
store | transaction, ArrayStorageStatistics::query_not_stored,
270+
ArrayStorageStatistics::query_fully_stored)
271+
.result(),
272+
::testing::Optional(ArrayStorageStatistics{
273+
/*.mask=*/ArrayStorageStatistics::query_not_stored |
274+
ArrayStorageStatistics::query_fully_stored,
275+
/*.not_stored=*/false, /*.fully_stored=*/false}));
276+
// List request for all chunks.
277+
EXPECT_THAT(
278+
mock_kvstore->request_log.pop_all(),
279+
::testing::UnorderedElementsAreArray({
280+
JsonSubValuesMatch({{"/type", "list"}, {"/range", {"c/", "c0"}}}),
281+
}));
282+
}
283+
284+
// Test listing with single-dimension prefix
285+
{
286+
EXPECT_THAT(
287+
tensorstore::GetStorageStatistics(
288+
store | transaction | tensorstore::Dims(0).HalfOpenInterval(12, 15),
289+
ArrayStorageStatistics::query_not_stored,
290+
ArrayStorageStatistics::query_fully_stored)
291+
.result(),
292+
::testing::Optional(ArrayStorageStatistics{
293+
/*.mask=*/ArrayStorageStatistics::query_not_stored |
294+
ArrayStorageStatistics::query_fully_stored,
295+
/*.not_stored=*/false, /*.fully_stored=*/false}));
296+
// List request for all chunks starting with `1/`.
297+
EXPECT_THAT(mock_kvstore->request_log.pop_all(),
298+
::testing::ElementsAre(JsonSubValuesMatch(
299+
{{"/type", "list"}, {"/range", {"c/1/", "c/10"}}})));
300+
}
301+
302+
// Test listing with a single (not present) chunk.
303+
{
304+
EXPECT_THAT(tensorstore::GetStorageStatistics(
305+
store | transaction |
306+
tensorstore::AllDims().IndexSlice({10, 25, 35}),
307+
ArrayStorageStatistics::query_not_stored,
308+
ArrayStorageStatistics::query_fully_stored)
309+
.result(),
310+
::testing::Optional(ArrayStorageStatistics{
311+
/*.mask=*/ArrayStorageStatistics::query_not_stored |
312+
ArrayStorageStatistics::query_fully_stored,
313+
/*.not_stored=*/true, /*.fully_stored=*/false}));
314+
// "Stat" request for single chunk.
315+
EXPECT_THAT(mock_kvstore->request_log.pop_all(),
316+
::testing::ElementsAre(
317+
JsonSubValuesMatch({{"/type", "read"},
318+
{"/key", "c/1/1/1"},
319+
{"/byte_range_exclusive_max", 0}})));
320+
}
321+
322+
// Test listing with a single (present) chunk.
323+
{
324+
EXPECT_THAT(
325+
tensorstore::GetStorageStatistics(
326+
store | transaction | tensorstore::AllDims().IndexSlice({2, 2, 2}),
327+
ArrayStorageStatistics::query_not_stored,
328+
ArrayStorageStatistics::query_fully_stored)
329+
.result(),
330+
::testing::Optional(ArrayStorageStatistics{
331+
/*.mask=*/ArrayStorageStatistics::query_not_stored |
332+
ArrayStorageStatistics::query_fully_stored,
333+
/*.not_stored=*/false, /*.fully_stored=*/true}));
334+
// Chunk was unconditionally overwritten within the transaction,
335+
// no I/O required.
336+
EXPECT_THAT(mock_kvstore->request_log.pop_all(), ::testing::ElementsAre());
337+
}
338+
}
339+
202340
TEST_F(StorageStatisticsTest, SemiLexicographicOrder) {
203341
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
204342
auto store, tensorstore::Open(
@@ -247,6 +385,57 @@ TEST_F(StorageStatisticsTest, SemiLexicographicOrder) {
247385
}));
248386
}
249387

388+
TEST_F(StorageStatisticsTest, SemiLexicographicOrderTransactional) {
389+
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
390+
auto store, tensorstore::Open(
391+
{
392+
{"driver", "zarr3"},
393+
{"kvstore", {{"driver", "mock_key_value_store"}}},
394+
},
395+
Schema::Shape({100, 100, 100}), dtype_v<uint8_t>,
396+
ChunkLayout::ChunkShape({1, 1, 1}), context,
397+
tensorstore::OpenMode::create)
398+
.result());
399+
mock_kvstore->request_log.pop_all();
400+
401+
auto transaction = tensorstore::Transaction(tensorstore::isolated);
402+
403+
EXPECT_THAT(
404+
tensorstore::GetStorageStatistics(
405+
store | transaction | tensorstore::Dims(0).HalfOpenInterval(8, 15),
406+
ArrayStorageStatistics::query_not_stored)
407+
.result(),
408+
::testing::Optional(ArrayStorageStatistics{
409+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
410+
/*.not_stored=*/true}));
411+
EXPECT_THAT(
412+
mock_kvstore->request_log.pop_all(),
413+
::testing::UnorderedElementsAreArray({
414+
JsonSubValuesMatch({{"/type", "list"}, {"/range", {"c/8/", "c/80"}}}),
415+
JsonSubValuesMatch({{"/type", "list"}, {"/range", {"c/9/", "c/90"}}}),
416+
JsonSubValuesMatch(
417+
{{"/type", "list"}, {"/range", {"c/10/", "c/140"}}}),
418+
}));
419+
420+
EXPECT_THAT(tensorstore::GetStorageStatistics(
421+
store | transaction |
422+
tensorstore::Dims(0, 1).HalfOpenInterval({3, 8}, {4, 15}),
423+
ArrayStorageStatistics::query_not_stored)
424+
.result(),
425+
::testing::Optional(ArrayStorageStatistics{
426+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
427+
/*.not_stored=*/true}));
428+
EXPECT_THAT(mock_kvstore->request_log.pop_all(),
429+
::testing::UnorderedElementsAreArray({
430+
JsonSubValuesMatch(
431+
{{"/type", "list"}, {"/range", {"c/3/8/", "c/3/80"}}}),
432+
JsonSubValuesMatch(
433+
{{"/type", "list"}, {"/range", {"c/3/9/", "c/3/90"}}}),
434+
JsonSubValuesMatch(
435+
{{"/type", "list"}, {"/range", {"c/3/10/", "c/3/140"}}}),
436+
}));
437+
}
438+
250439
TEST_F(StorageStatisticsTest, Sharded) {
251440
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
252441
auto store, tensorstore::Open(
@@ -320,6 +509,82 @@ TEST_F(StorageStatisticsTest, Sharded) {
320509
}));
321510
}
322511

512+
TEST_F(StorageStatisticsTest, ShardedTransactional) {
513+
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
514+
auto store, tensorstore::Open(
515+
{
516+
{"driver", "zarr3"},
517+
{"kvstore", {{"driver", "mock_key_value_store"}}},
518+
},
519+
Schema::Shape({100, 100, 100}), dtype_v<uint8_t>,
520+
ChunkLayout::ReadChunkShape({1, 1, 1}),
521+
ChunkLayout::WriteChunkShape({8, 8, 8}), context,
522+
tensorstore::OpenMode::create)
523+
.result());
524+
mock_kvstore->request_log.pop_all();
525+
526+
auto transaction = tensorstore::Transaction(tensorstore::isolated);
527+
528+
EXPECT_THAT(
529+
tensorstore::GetStorageStatistics(
530+
store | transaction | tensorstore::Dims(0).HalfOpenInterval(8, 15),
531+
ArrayStorageStatistics::query_not_stored)
532+
.result(),
533+
::testing::Optional(ArrayStorageStatistics{
534+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
535+
/*.not_stored=*/true}));
536+
// No data present.
537+
EXPECT_THAT(
538+
mock_kvstore->request_log.pop_all(),
539+
::testing::UnorderedElementsAreArray({
540+
JsonSubValuesMatch({{"/type", "list"}, {"/range", {"c/1/", "c/10"}}}),
541+
}));
542+
543+
// Write to one chunk.
544+
TENSORSTORE_ASSERT_OK(
545+
tensorstore::Write(
546+
tensorstore::MakeScalarArray<uint8_t>(42),
547+
store | transaction | tensorstore::AllDims().IndexSlice({30, 50, 70}))
548+
.result());
549+
550+
mock_kvstore->request_log.pop_all();
551+
552+
EXPECT_THAT(
553+
tensorstore::GetStorageStatistics(
554+
store | transaction | tensorstore::Dims(0).HalfOpenInterval(24, 32),
555+
ArrayStorageStatistics::query_not_stored)
556+
.result(),
557+
::testing::Optional(ArrayStorageStatistics{
558+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
559+
/*.not_stored=*/false}));
560+
561+
// Shard index is not retrieved since query covers entire shard.
562+
EXPECT_THAT(
563+
mock_kvstore->request_log.pop_all(),
564+
::testing::UnorderedElementsAreArray({
565+
JsonSubValuesMatch({{"/type", "list"}, {"/range", {"c/3/", "c/30"}}}),
566+
}));
567+
568+
mock_kvstore->request_log.pop_all();
569+
570+
EXPECT_THAT(
571+
tensorstore::GetStorageStatistics(
572+
store | transaction | tensorstore::Dims(0).HalfOpenInterval(24, 31),
573+
ArrayStorageStatistics::query_not_stored)
574+
.result(),
575+
::testing::Optional(ArrayStorageStatistics{
576+
/*.mask=*/ArrayStorageStatistics::query_not_stored,
577+
/*.not_stored=*/false}));
578+
579+
// Shard is retrieved since query does not cover entire shard.
580+
EXPECT_THAT(
581+
mock_kvstore->request_log.pop_all(),
582+
::testing::UnorderedElementsAreArray({
583+
JsonSubValuesMatch({{"/type", "list"}, {"/range", {"c/3/", "c/30"}}}),
584+
JsonSubValuesMatch({{"/type", "read"}, {"/key", "c/3/6/8"}}),
585+
}));
586+
}
587+
323588
TEST_F(StorageStatisticsTest, ShardedTranspose) {
324589
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
325590
auto store, tensorstore::Open(

tensorstore/kvstore/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ tensorstore_cc_library(
201201
"//tensorstore/util/execution",
202202
"//tensorstore/util/execution:any_receiver",
203203
"//tensorstore/util/execution:any_sender",
204+
"//tensorstore/util/execution:flow_sender_operation_state",
204205
"//tensorstore/util/execution:future_collecting_receiver",
205206
"//tensorstore/util/execution:future_sender",
206207
"//tensorstore/util/execution:sender",
@@ -335,6 +336,7 @@ tensorstore_cc_test(
335336
deps = [
336337
":byte_range",
337338
":generation",
339+
":key_range",
338340
":kvstore",
339341
":mock_kvstore",
340342
":test_matchers",

tensorstore/kvstore/driver.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,20 @@ class Driver {
250250
/// Implementation of `List` that driver implementations must define.
251251
virtual void ListImpl(ListOptions options, ListReceiver receiver);
252252

253+
/// Implementation of transactional `List` that driver implementations must
254+
/// define.
255+
virtual void TransactionalListImpl(
256+
const internal::OpenTransactionPtr& transaction, ListOptions options,
257+
ListReceiver receiver);
258+
253259
/// List keys in the key-value store.
254260
///
255261
/// The keys are emitted in arbitrary order.
256262
///
257-
/// This simply forwards to `ListImpl`.
263+
/// This simply forwards to `ListImpl` or `TransactionalListImpl`.
258264
ListSender List(ListOptions options);
265+
ListSender List(ListOptions options,
266+
const internal::OpenTransactionPtr& transaction);
259267

260268
/// Returns a Spec that can be used to re-open this key-value store.
261269
///

0 commit comments

Comments
 (0)