Skip to content

Commit d0336b1

Browse files
authored
[ENH]: Plumb prefix path all the way to the bf writer (#4743)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Plumbs the tenant and database id from compactor orchestrator to the segment writer. - The segment writer constructs the prefix path as per format: tenant/{tenant}/database/{database_id}/collection/{collection_id}/segment/{segment_id} - It then passes this prefix in the construction of various blockfile writers as part of the write options - New functionality - ... ## Test plan _How are these changes tested?_ - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None
1 parent b49197a commit d0336b1

File tree

21 files changed

+671
-309
lines changed

21 files changed

+671
-309
lines changed

rust/blockstore/benches/blockfile_writer.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,17 @@ fn bench_writer_for_generator_and_size<D: DataGenerator>(
134134
name, data_byte_size
135135
);
136136
let data = generator.data();
137+
let prefix = String::from("");
137138

138139
let name_writer_options_data = [
139140
(
140141
"UnorderedBlockfileWriter",
141-
BlockfileWriterOptions::new().unordered_mutations(),
142+
BlockfileWriterOptions::new(prefix.clone()).unordered_mutations(),
142143
data.clone(),
143144
),
144145
(
145146
"OrderedBlockfileWriter",
146-
BlockfileWriterOptions::new().ordered_mutations(),
147+
BlockfileWriterOptions::new(prefix.clone()).ordered_mutations(),
147148
{
148149
let mut data = data;
149150
data.sort_unstable_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
@@ -162,7 +163,7 @@ fn bench_writer_for_generator_and_size<D: DataGenerator>(
162163
|| data.clone(),
163164
|data| async {
164165
let writer = provider
165-
.write::<D::Key, D::Value>(*writer_options)
166+
.write::<D::Key, D::Value>(writer_options.clone())
166167
.await
167168
.unwrap();
168169
for (prefix, key, value) in data {
@@ -180,7 +181,9 @@ fn bench_writer_for_generator_and_size<D: DataGenerator>(
180181
{
181182
let populated_blockfile_id = runner.block_on(async {
182183
let writer = provider
183-
.write::<D::Key, D::Value>(BlockfileWriterOptions::new().unordered_mutations())
184+
.write::<D::Key, D::Value>(
185+
BlockfileWriterOptions::new(prefix).unordered_mutations(),
186+
)
184187
.await
185188
.unwrap();
186189

@@ -204,7 +207,9 @@ fn bench_writer_for_generator_and_size<D: DataGenerator>(
204207
|| data.clone(),
205208
|data| async {
206209
let writer = provider
207-
.write::<D::Key, D::Value>(writer_options.fork(populated_blockfile_id))
210+
.write::<D::Key, D::Value>(
211+
writer_options.clone().fork(populated_blockfile_id),
212+
)
208213
.await
209214
.unwrap();
210215

rust/blockstore/src/arrow/blockfile.rs

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -767,8 +767,9 @@ mod tests {
767767
block_cache,
768768
sparse_index_cache,
769769
);
770+
let prefix_path = String::from("");
770771
let writer = blockfile_provider
771-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
772+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path))
772773
.await
773774
.unwrap();
774775
let id = writer.id();
@@ -807,10 +808,11 @@ mod tests {
807808
block_cache,
808809
sparse_index_cache,
809810
);
811+
let prefix_path = String::from("");
810812

811813
// Test no keys
812814
let writer = blockfile_provider
813-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
815+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()))
814816
.await
815817
.unwrap();
816818

@@ -820,7 +822,7 @@ mod tests {
820822

821823
// Test 2 keys
822824
let writer = blockfile_provider
823-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
825+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()))
824826
.await
825827
.unwrap();
826828

@@ -839,7 +841,7 @@ mod tests {
839841

840842
// Test add keys after commit, before flush
841843
let writer = blockfile_provider
842-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
844+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()))
843845
.await
844846
.unwrap();
845847

@@ -861,7 +863,7 @@ mod tests {
861863

862864
// Test count after flush
863865
let writer = blockfile_provider
864-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
866+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path))
865867
.await
866868
.unwrap();
867869
let flusher = writer.commit::<&str, Vec<u32>>().await.unwrap();
@@ -880,8 +882,9 @@ mod tests {
880882
block_cache,
881883
sparse_index_cache,
882884
);
885+
let prefix_path = String::from("");
883886
let writer = blockfile_provider
884-
.write::<&str, u32>(BlockfileWriterOptions::default())
887+
.write::<&str, u32>(BlockfileWriterOptions::new(prefix_path))
885888
.await
886889
.unwrap();
887890
let id = writer.id();
@@ -947,8 +950,9 @@ mod tests {
947950
block_cache,
948951
sparse_index_cache,
949952
);
953+
let prefix_path = String::from("");
950954
let writer = blockfile_provider
951-
.write::<&str, u32>(BlockfileWriterOptions::default())
955+
.write::<&str, u32>(BlockfileWriterOptions::new(prefix_path))
952956
.await
953957
.unwrap();
954958
let id = writer.id();
@@ -1104,8 +1108,9 @@ mod tests {
11041108
block_cache,
11051109
sparse_index_cache,
11061110
);
1111+
let prefix_path = String::from("");
11071112
let writer = blockfile_provider
1108-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
1113+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path))
11091114
.await
11101115
.unwrap();
11111116
let id = writer.id();
@@ -1144,8 +1149,9 @@ mod tests {
11441149
block_cache,
11451150
sparse_index_cache,
11461151
);
1152+
let prefix_path = String::from("");
11471153
let writer = blockfile_provider
1148-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
1154+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()))
11491155
.await
11501156
.unwrap();
11511157
let id_1 = writer.id();
@@ -1182,7 +1188,7 @@ mod tests {
11821188

11831189
// Add 5 new entries to the first block
11841190
let writer = blockfile_provider
1185-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new().fork(id_1))
1191+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()).fork(id_1))
11861192
.await
11871193
.unwrap();
11881194
let id_2 = writer.id();
@@ -1217,7 +1223,7 @@ mod tests {
12171223

12181224
// Add 1200 more entries, causing splits
12191225
let writer = blockfile_provider
1220-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new().fork(id_2))
1226+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path).fork(id_2))
12211227
.await
12221228
.unwrap();
12231229
let id_3 = writer.id();
@@ -1261,8 +1267,9 @@ mod tests {
12611267
block_cache,
12621268
sparse_index_cache,
12631269
);
1270+
let prefix_path = String::from("");
12641271
let writer = blockfile_provider
1265-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
1272+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path))
12661273
.await
12671274
.unwrap();
12681275
let id_1 = writer.id();
@@ -1306,9 +1313,10 @@ mod tests {
13061313
block_cache,
13071314
sparse_index_cache,
13081315
);
1316+
let prefix_path = String::from("");
13091317

13101318
let writer = blockfile_provider
1311-
.write::<&str, String>(BlockfileWriterOptions::default())
1319+
.write::<&str, String>(BlockfileWriterOptions::new(prefix_path))
13121320
.await
13131321
.unwrap();
13141322
let id = writer.id();
@@ -1346,9 +1354,10 @@ mod tests {
13461354
block_cache,
13471355
sparse_index_cache,
13481356
);
1357+
let prefix_path = String::from("");
13491358

13501359
let writer = provider
1351-
.write::<f32, String>(BlockfileWriterOptions::default())
1360+
.write::<f32, String>(BlockfileWriterOptions::new(prefix_path))
13521361
.await
13531362
.unwrap();
13541363
let id = writer.id();
@@ -1384,8 +1393,9 @@ mod tests {
13841393
sparse_index_cache,
13851394
);
13861395

1396+
let prefix_path = String::from("");
13871397
let writer = blockfile_provider
1388-
.write::<&str, roaring::RoaringBitmap>(BlockfileWriterOptions::default())
1398+
.write::<&str, roaring::RoaringBitmap>(BlockfileWriterOptions::new(prefix_path))
13891399
.await
13901400
.unwrap();
13911401
let id = writer.id();
@@ -1434,8 +1444,9 @@ mod tests {
14341444
sparse_index_cache,
14351445
);
14361446

1447+
let prefix_path = String::from("");
14371448
let writer = blockfile_provider
1438-
.write::<u32, u32>(BlockfileWriterOptions::default())
1449+
.write::<u32, u32>(BlockfileWriterOptions::new(prefix_path))
14391450
.await
14401451
.unwrap();
14411452
let id = writer.id();
@@ -1471,8 +1482,9 @@ mod tests {
14711482
sparse_index_cache,
14721483
);
14731484

1485+
let prefix_path = String::from("");
14741486
let writer = blockfile_provider
1475-
.write::<&str, &DataRecord>(BlockfileWriterOptions::default())
1487+
.write::<&str, &DataRecord>(BlockfileWriterOptions::new(prefix_path))
14761488
.await
14771489
.unwrap();
14781490
let id = writer.id();
@@ -1526,8 +1538,9 @@ mod tests {
15261538
sparse_index_cache,
15271539
);
15281540

1541+
let prefix_path = String::from("");
15291542
let writer = blockfile_provider
1530-
.write::<&str, String>(BlockfileWriterOptions::default())
1543+
.write::<&str, String>(BlockfileWriterOptions::new(prefix_path))
15311544
.await
15321545
.unwrap();
15331546
let id = writer.id();
@@ -1566,8 +1579,9 @@ mod tests {
15661579
block_cache,
15671580
sparse_index_cache,
15681581
);
1582+
let prefix_path = String::from("");
15691583
let writer = blockfile_provider
1570-
.write::<&str, String>(BlockfileWriterOptions::default())
1584+
.write::<&str, String>(BlockfileWriterOptions::new(prefix_path.clone()))
15711585
.await
15721586
.unwrap();
15731587
let id = writer.id();
@@ -1593,7 +1607,7 @@ mod tests {
15931607
}
15941608

15951609
let writer = blockfile_provider
1596-
.write::<&str, String>(BlockfileWriterOptions::new().fork(id))
1610+
.write::<&str, String>(BlockfileWriterOptions::new(prefix_path).fork(id))
15971611
.await
15981612
.unwrap();
15991613
let id = writer.id();
@@ -1636,8 +1650,9 @@ mod tests {
16361650
block_cache,
16371651
sparse_index_cache,
16381652
);
1653+
let prefix_path = String::from("");
16391654
let writer = blockfile_provider
1640-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
1655+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path))
16411656
.await
16421657
.unwrap();
16431658
let id_1 = writer.id();
@@ -1675,8 +1690,9 @@ mod tests {
16751690
block_cache,
16761691
sparse_index_cache,
16771692
);
1693+
let prefix_path = String::from("");
16781694
let writer = blockfile_provider
1679-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::default())
1695+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()))
16801696
.await
16811697
.unwrap();
16821698
let id_1 = writer.id();
@@ -1697,7 +1713,7 @@ mod tests {
16971713
flusher.flush::<&str, Vec<u32>>().await.unwrap();
16981714
// Create another writer.
16991715
let writer = blockfile_provider
1700-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new().fork(id_1))
1716+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()).fork(id_1))
17011717
.await
17021718
.expect("BlockfileWriter fork unsuccessful");
17031719
// Delete everything but the last 10 keys.
@@ -1730,7 +1746,7 @@ mod tests {
17301746
}
17311747

17321748
let writer = blockfile_provider
1733-
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new().fork(id_2))
1749+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path).fork(id_2))
17341750
.await
17351751
.expect("BlockfileWriter fork unsuccessful");
17361752
// Add everything back.
@@ -1770,9 +1786,10 @@ mod tests {
17701786
block_cache,
17711787
sparse_index_cache,
17721788
);
1789+
let prefix_path = String::from("");
17731790

17741791
let writer = blockfile_provider
1775-
.write::<&str, u32>(BlockfileWriterOptions::default())
1792+
.write::<&str, u32>(BlockfileWriterOptions::new(prefix_path))
17761793
.await
17771794
.unwrap();
17781795
let id = writer.id();
@@ -1928,9 +1945,9 @@ mod tests {
19281945

19291946
// Test that a v1.1 writer can read a v1 blockfile and dirty a block
19301947
// successfully hydrating counts for ALL blocks it needs to set counts for
1931-
1948+
let prefix_path = String::from("");
19321949
let writer = blockfile_provider
1933-
.write::<&str, String>(BlockfileWriterOptions::new().fork(first_write_id))
1950+
.write::<&str, String>(BlockfileWriterOptions::new(prefix_path).fork(first_write_id))
19341951
.await
19351952
.unwrap();
19361953
let second_write_id = writer.id();

rust/blockstore/src/arrow/concurrency_test.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ mod tests {
3030
block_cache,
3131
sparse_index_cache,
3232
);
33+
let prefix_path = String::from("");
3334
let writer = future::block_on(
34-
blockfile_provider.write::<&str, u32>(BlockfileWriterOptions::default()),
35+
blockfile_provider.write::<&str, u32>(BlockfileWriterOptions::new(prefix_path)),
3536
)
3637
.unwrap();
3738
let id = writer.id();
@@ -105,9 +106,10 @@ mod tests {
105106
block_cache,
106107
sparse_index_cache,
107108
);
109+
let prefix_path = String::from("");
108110
let reader = future::block_on(async {
109111
let writer = blockfile_provider
110-
.write::<&str, u32>(BlockfileWriterOptions::default())
112+
.write::<&str, u32>(BlockfileWriterOptions::new(prefix_path))
111113
.await
112114
.expect("Failed to create writer");
113115
let id = writer.id();

0 commit comments

Comments
 (0)