Skip to content

Commit ce93f0b

Browse files
authored
SC-61223: fix sparse global order reader to do "repeatable reads" when there are multiple fragments with the same timestamp (#5459)
When multiple fragments have the same timestamp, the sparse global order reader (and likely others) do not have "repeatable reads". That is, if you run the same read query multiple times, then you do not receive the same result. The story provides a reproducer which does something like: ``` for i in range(0, 10): write a new fragment A[0] = i at timestamp t=1 for j in range(0, 10): read A[0] ``` Below is the result: ``` python tmp/write_consistency.py OrderedDict([('a', array([], dtype=int32)), ('d1', array([], dtype=int64))]) Wrote 0, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 1, read [0, 1, 1, 1, 1, 1, 1, 1, 1, 0] Wrote 2, read [0, 1, 0, 0, 1, 2, 1, 0, 1, 1] Wrote 3, read [0, 0, 0, 0, 3, 2, 2, 1, 2, 1] Wrote 4, read [0, 3, 4, 1, 4, 2, 1, 2, 1, 4] Wrote 5, read [1, 1, 5, 4, 3, 0, 0, 4, 0, 0] Wrote 6, read [4, 6, 0, 2, 0, 3, 4, 4, 4, 4] Wrote 7, read [4, 4, 4, 4, 7, 4, 5, 4, 4, 7] Wrote 8, read [5, 0, 0, 7, 7, 0, 0, 4, 4, 4] Wrote 9, read [8, 4, 3, 5, 2, 7, 9, 8, 2, 7] ``` We can see that the reads do not all produce the same value. In the story we discussed whether we would be satisfied with "repeatable read", in which each row from the reproducing script contains just one distinct value; or whether we need "strict write order", in which each row `i` contains only values `i`. Repeatable read is sufficient, so that's what is implemented in this pull request. An example new result of the above script is: ``` OrderedDict([('a', array([], dtype=int32)), ('d1', array([], dtype=int64))]) Wrote 0, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 1, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 2, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 3, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 4, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 5, read [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] Wrote 6, read [6, 6, 6, 6, 6, 6, 6, 6, 6, 6] Wrote 7, read [6, 6, 6, 6, 6, 6, 6, 6, 6, 6] Wrote 8, read [6, 6, 6, 6, 6, 6, 6, 6, 6, 6] Wrote 9, read [6, 6, 6, 6, 6, 6, 6, 6, 6, 6] ``` # Implementation The implementation ultimately is straightforward. The coordinates comparator breaks ties in coordinate values using the tile timestamp. We add an additional tiebreaker which compares the fragment index. The fragment index is itself determined by the lexicographic ordering of fragment names, which includes the timestamp and UUID; as such for a fixed array the fragment indices will always be the same for each read. The greatest fragment index in typical cases is the most recently written, so it wins the comparison. ("In typical cases"... meaning what? If two fragments are written at the same timestamp which is `now()`, then there is logic in the UUID generation to ensure that the first fragment's UUID is lexicographically less than the second fragment's UUID. However, this logic is incorrect if the array was opened at a fixed non-`now()` timestamp. Multiple fragments written at a fixed timestamp will have uncorrelated UUIDs and thus can appear in any order when reading the array. Hence we get a repeatable read order but not always one which matches the physical order of the writes..) In any case, I spent a bit of time diving into the above, and there are some artifacts which I elected to leave in here because they are both harmless and tested - specifically some additional accessors to components of `FragmentID`. --- TYPE: BUG DESC: repeatable read for multiple fragments written at fixed timestamp
1 parent 022af98 commit ce93f0b

File tree

5 files changed

+299
-21
lines changed

5 files changed

+299
-21
lines changed

test/src/unit-sparse-global-order-reader.cc

Lines changed: 171 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ struct CSparseGlobalOrderFx {
461461
int* coords, uint64_t* coords_size, int* data, uint64_t* data_size);
462462

463463
template <typename Asserter, FragmentType Fragment>
464-
void write_fragment(const Fragment& fragment);
464+
void write_fragment(const Fragment& fragment, CApiArray* existing = nullptr);
465465

466466
void write_1d_fragment_strings(
467467
int* coords,
@@ -630,9 +630,16 @@ void CSparseGlobalOrderFx::write_1d_fragment(
630630
* Writes a generic `FragmentType` into the array.
631631
*/
632632
template <typename Asserter, FragmentType Fragment>
633-
void CSparseGlobalOrderFx::write_fragment(const Fragment& fragment) {
634-
// Open array for writing.
635-
CApiArray array(context(), array_name_.c_str(), TILEDB_WRITE);
633+
void CSparseGlobalOrderFx::write_fragment(
634+
const Fragment& fragment, CApiArray* existing) {
635+
std::optional<CApiArray> constructed;
636+
if (!existing) {
637+
// Open array for writing.
638+
constructed.emplace(context(), array_name_.c_str(), TILEDB_WRITE);
639+
existing = &constructed.value();
640+
}
641+
642+
CApiArray& array = *existing;
636643

637644
const auto dimensions = fragment.dimensions();
638645
const auto attributes = fragment.attributes();
@@ -772,6 +779,15 @@ int32_t CSparseGlobalOrderFx::read(
772779
// Open array for reading.
773780
CApiArray array(context(), array_name_.c_str(), TILEDB_READ);
774781

782+
const char* dimname = array->array()
783+
->array_schema_latest()
784+
.domain()
785+
.dimension_ptr(0)
786+
->name()
787+
.c_str();
788+
const char* attname =
789+
array->array()->array_schema_latest().attribute(0)->name().c_str();
790+
775791
// Create query.
776792
tiledb_query_t* query;
777793
auto rc = tiledb_query_alloc(context(), array, TILEDB_READ, &query);
@@ -797,7 +813,12 @@ int32_t CSparseGlobalOrderFx::read(
797813
if (qc_idx == 1) {
798814
int32_t val = 11;
799815
rc = tiledb_query_condition_init(
800-
context(), query_condition, "a", &val, sizeof(int32_t), TILEDB_LT);
816+
context(),
817+
query_condition,
818+
attname,
819+
&val,
820+
sizeof(int32_t),
821+
TILEDB_LT);
801822
CHECK(rc == TILEDB_OK);
802823
} else if (qc_idx == 2) {
803824
// Negated query condition should produce the same results.
@@ -806,7 +827,7 @@ int32_t CSparseGlobalOrderFx::read(
806827
rc = tiledb_query_condition_alloc(context(), &qc);
807828
CHECK(rc == TILEDB_OK);
808829
rc = tiledb_query_condition_init(
809-
context(), qc, "a", &val, sizeof(int32_t), TILEDB_GE);
830+
context(), qc, attname, &val, sizeof(int32_t), TILEDB_GE);
810831
CHECK(rc == TILEDB_OK);
811832
rc = tiledb_query_condition_negate(context(), qc, &query_condition);
812833
CHECK(rc == TILEDB_OK);
@@ -822,9 +843,10 @@ int32_t CSparseGlobalOrderFx::read(
822843

823844
rc = tiledb_query_set_layout(context(), query, TILEDB_GLOBAL_ORDER);
824845
CHECK(rc == TILEDB_OK);
825-
rc = tiledb_query_set_data_buffer(context(), query, "a", data, data_size);
846+
rc = tiledb_query_set_data_buffer(context(), query, attname, data, data_size);
826847
CHECK(rc == TILEDB_OK);
827-
rc = tiledb_query_set_data_buffer(context(), query, "d", coords, coords_size);
848+
rc = tiledb_query_set_data_buffer(
849+
context(), query, dimname, coords, coords_size);
828850
CHECK(rc == TILEDB_OK);
829851

830852
// Submit query.
@@ -2968,6 +2990,147 @@ TEST_CASE_METHOD(
29682990
tiledb_query_free(&query);
29692991
}
29702992

2993+
/**
2994+
* Test that we get "repeatable reads" when multiple fragments
2995+
* are written at the same timestamp. That is, for a fixed array
2996+
* A, reading the contents of A should always produce the same results.
2997+
*/
2998+
TEST_CASE_METHOD(
2999+
CSparseGlobalOrderFx,
3000+
"Sparse global order reader: repeatable reads for sub-millisecond "
3001+
"fragments",
3002+
"[sparse-global-order][sub-millisecond][rest][rapidcheck]") {
3003+
auto doit = [this]<typename Asserter>(
3004+
const std::vector<uint64_t> fragment_same_timestamp_runs) {
3005+
uint64_t num_fragments = 0;
3006+
for (const auto same_timestamp_run : fragment_same_timestamp_runs) {
3007+
num_fragments += same_timestamp_run;
3008+
}
3009+
3010+
FxRun2D instance;
3011+
instance.allow_dups = false;
3012+
3013+
/*
3014+
* Each fragment (T, F) writes its fragment index into both (1, 1)
3015+
* and (2 + T, 2 + F).
3016+
*
3017+
* (1, 1) is the coordinate where they must be de-duplicated.
3018+
* The other coordinate is useful for ensuring that all fragments
3019+
* are included in the result set, and for debugging by inspecting tile
3020+
* MBRs.
3021+
*/
3022+
for (uint64_t t = 0; t < fragment_same_timestamp_runs.size(); t++) {
3023+
for (uint64_t f = 0; f < fragment_same_timestamp_runs[t]; f++) {
3024+
FxRun2D::FragmentType fragment;
3025+
fragment.d1_ = {1, 2 + static_cast<int>(t)};
3026+
fragment.d2_ = {1, 2 + static_cast<int>(f)};
3027+
std::get<0>(fragment.atts_) = {
3028+
static_cast<int>(instance.fragments.size()),
3029+
static_cast<int>(instance.fragments.size())};
3030+
3031+
instance.fragments.push_back(fragment);
3032+
}
3033+
}
3034+
3035+
create_array<Asserter, decltype(instance)>(instance);
3036+
3037+
DeleteArrayGuard arrayguard(context(), array_name_.c_str());
3038+
3039+
/*
3040+
* Write each fragment at a fixed timestamp.
3041+
* Opening for write at timestamp `t` causes all the fragments to have `t`
3042+
* as their start and end timestamps.
3043+
*/
3044+
for (uint64_t i = 0, t = 0; t < fragment_same_timestamp_runs.size(); t++) {
3045+
tiledb_array_t* raw_array;
3046+
TRY(context(),
3047+
tiledb_array_alloc(context(), array_name_.c_str(), &raw_array));
3048+
TRY(context(),
3049+
tiledb_array_set_open_timestamp_start(context(), raw_array, t + 1));
3050+
TRY(context(),
3051+
tiledb_array_set_open_timestamp_end(context(), raw_array, t + 1));
3052+
3053+
CApiArray array(context(), raw_array, TILEDB_WRITE);
3054+
for (uint64_t f = 0; f < fragment_same_timestamp_runs[t]; f++, i++) {
3055+
write_fragment<Asserter, decltype(instance.fragments[i])>(
3056+
instance.fragments[i], &array);
3057+
}
3058+
}
3059+
3060+
CApiArray array(context(), array_name_.c_str(), TILEDB_READ);
3061+
3062+
// Value from (1, 1).
3063+
// Because all the writes are at the same timestamp we make no guarantee
3064+
// of ordering. `attvalue` may be the value from any of the fragments,
3065+
// but it must be the same value each time we read.
3066+
std::optional<int> attvalue;
3067+
3068+
for (uint64_t f = 0; f < num_fragments; f++) {
3069+
std::vector<int> dim1(instance.fragments.size() * 4);
3070+
std::vector<int> dim2(instance.fragments.size() * 4);
3071+
std::vector<int> atts(instance.fragments.size() * 4);
3072+
uint64_t dim1_size = sizeof(int) * dim1.size();
3073+
uint64_t dim2_size = sizeof(int) * dim2.size();
3074+
uint64_t atts_size = sizeof(int) * atts.size();
3075+
3076+
tiledb_query_t* query;
3077+
TRY(context(), tiledb_query_alloc(context(), array, TILEDB_READ, &query));
3078+
TRY(context(),
3079+
tiledb_query_set_layout(context(), query, TILEDB_GLOBAL_ORDER));
3080+
TRY(context(),
3081+
tiledb_query_set_data_buffer(
3082+
context(), query, "d1", dim1.data(), &dim1_size));
3083+
TRY(context(),
3084+
tiledb_query_set_data_buffer(
3085+
context(), query, "d2", dim2.data(), &dim2_size));
3086+
TRY(context(),
3087+
tiledb_query_set_data_buffer(
3088+
context(), query, "a1", atts.data(), &atts_size));
3089+
3090+
tiledb_query_status_t status;
3091+
TRY(context(), tiledb_query_submit(context(), query));
3092+
TRY(context(), tiledb_query_get_status(context(), query, &status));
3093+
tiledb_query_free(&query);
3094+
ASSERTER(status == TILEDB_COMPLETED);
3095+
3096+
ASSERTER(dim1_size == (1 + num_fragments) * sizeof(int));
3097+
ASSERTER(dim2_size == (1 + num_fragments) * sizeof(int));
3098+
ASSERTER(atts_size == (1 + num_fragments) * sizeof(int));
3099+
3100+
ASSERTER(dim1[0] == 1);
3101+
ASSERTER(dim2[0] == 1);
3102+
if (attvalue.has_value()) {
3103+
ASSERTER(attvalue.value() == atts[0]);
3104+
} else {
3105+
attvalue.emplace(atts[0]);
3106+
}
3107+
3108+
uint64_t c = 1;
3109+
for (uint64_t t = 0; t < fragment_same_timestamp_runs.size(); t++) {
3110+
for (uint64_t f = 0; f < fragment_same_timestamp_runs[t]; f++, c++) {
3111+
ASSERTER(dim1[c] == 2 + static_cast<int>(t));
3112+
ASSERTER(dim2[c] == 2 + static_cast<int>(f));
3113+
ASSERTER(atts[c] == static_cast<int>(c - 1));
3114+
}
3115+
}
3116+
}
3117+
};
3118+
3119+
SECTION("Example") {
3120+
doit.operator()<AsserterCatch>({10});
3121+
}
3122+
3123+
SECTION("Rapidcheck") {
3124+
rc::prop("rapidcheck consistent read order for sub-millisecond", [doit]() {
3125+
const auto runs = *rc::gen::suchThat(
3126+
rc::gen::nonEmpty(rc::gen::container<std::vector<uint64_t>>(
3127+
rc::gen::inRange(1, 8))),
3128+
[](auto value) { return value.size() <= 6; });
3129+
doit.operator()<AsserterRapidcheck>(runs);
3130+
});
3131+
}
3132+
}
3133+
29713134
/**
29723135
* Creates an array with a schema whose dimensions and attributes
29733136
* come from `Instance`.

test/support/src/array_helpers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ struct CApiArray {
101101
throw_if_error(ctx, tiledb_array_open(ctx, array_, mode));
102102
}
103103

104+
CApiArray(tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_query_type_t mode)
105+
: ctx_(ctx)
106+
, array_(array) {
107+
throw_if_error(ctx, tiledb_array_open(ctx, array_, mode));
108+
}
109+
104110
CApiArray(CApiArray&& from)
105111
: ctx_(from.ctx_)
106112
, array_(from.movefrom()) {

tiledb/sm/fragment/fragment_identifier.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ enum class FragmentNameVersion { ONE, TWO, THREE };
7272
*/
7373
class FragmentID : private URI {
7474
private:
75+
/**
76+
* Whitebox testing class provides additional accessors to components of the
77+
* fragment name.
78+
*/
79+
friend class WhiteboxFragmentID;
80+
7581
/** The fragment name. */
7682
std::string name_;
7783
/** The timestamp range. */
@@ -94,7 +100,7 @@ class FragmentID : private URI {
94100
~FragmentID() = default;
95101

96102
/** Accessor to the fragment name. */
97-
inline const std::string& name() {
103+
inline const std::string& name() const {
98104
return name_;
99105
}
100106

@@ -103,7 +109,7 @@ class FragmentID : private URI {
103109
* For array format version <= 2, only the range start is valid
104110
* (the range end is ignored).
105111
*/
106-
inline timestamp_range_type timestamp_range() {
112+
inline timestamp_range_type timestamp_range() const {
107113
return timestamp_range_;
108114
}
109115

@@ -118,7 +124,7 @@ class FragmentID : private URI {
118124
* - Name version 3 corresponds to format version 5 or higher
119125
* * __t1_t2_uuid_version
120126
*/
121-
inline int name_version() {
127+
inline int name_version() const {
122128
if (name_version_ == FragmentNameVersion::ONE) {
123129
return 1;
124130
} else if (name_version_ == FragmentNameVersion::TWO) {
@@ -132,7 +138,7 @@ class FragmentID : private URI {
132138
* Accessor to the array format version.
133139
* Returns UINT32_MAX for name versions <= 2.
134140
*/
135-
inline format_version_t array_format_version() {
141+
inline format_version_t array_format_version() const {
136142
return array_format_version_;
137143
}
138144
};

0 commit comments

Comments
 (0)