Skip to content

Commit 0e58ade

Browse files
authored
Merge branch 'main' into chore
2 parents 66c8b36 + d0fd5be commit 0e58ade

File tree

30 files changed

+900
-1268
lines changed

30 files changed

+900
-1268
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scripts/benchmark/query/load/tpch10.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ select version();
77
SQL
88

99
for t in customer lineitem nation orders partsupp part region supplier; do
10-
echo "DROP TABLE IF EXISTS $t;" | bendsql query
10+
echo "DROP TABLE IF EXISTS $t;" | bendsql query
1111
done
1212

1313
cat <<SQL | bendsql query
@@ -113,8 +113,8 @@ cat <<SQL | bendsql query
113113
SQL
114114

115115
for t in customer lineitem nation orders partsupp part region supplier; do
116-
echo "loading into $t ..."
117-
cat <<SQL | bendsql query
116+
echo "loading into $t ..."
117+
cat <<SQL | bendsql query
118118
COPY INTO $t FROM 's3://repo.databend.rs/datasets/tpch10/${t}/'
119119
credentials=(aws_key_id='$REPO_ACCESS_KEY_ID' aws_secret_key='$REPO_SECRET_ACCESS_KEY') pattern ='${t}.*'
120120
file_format=(type='CSV' field_delimiter='|' record_delimiter='\\n' skip_header=0);

src/query/expression/src/schema.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,8 @@ impl TableSchema {
364364
Ok(())
365365
}
366366

367-
pub fn to_column_id_set(&self) -> HashSet<ColumnId> {
368-
HashSet::from_iter(self.to_column_ids().iter().cloned())
367+
pub fn to_leaf_column_id_set(&self) -> HashSet<ColumnId> {
368+
HashSet::from_iter(self.to_leaf_column_ids().iter().cloned())
369369
}
370370

371371
pub fn to_column_ids(&self) -> Vec<ColumnId> {

src/query/service/tests/it/storages/fuse/operations/alter_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async fn check_segment_column_ids(
7575
location: snapshot_loc.clone(),
7676
len_hint: None,
7777
ver: TableSnapshot::VERSION,
78-
put_cache: true,
78+
put_cache: false,
7979
};
8080

8181
let snapshot = snapshot_reader.read(&params).await?;
@@ -100,7 +100,7 @@ async fn check_segment_column_ids(
100100
location: seg_loc.clone(),
101101
len_hint: None,
102102
ver: SegmentInfo::VERSION,
103-
put_cache: true,
103+
put_cache: false,
104104
};
105105
let segment_info = segment_reader.read(&params).await?;
106106

src/query/service/tests/it/storages/fuse/operations/clustering.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
9191
location: snapshot_loc.clone(),
9292
len_hint: None,
9393
ver: TableSnapshot::VERSION,
94-
put_cache: true,
94+
put_cache: false,
9595
};
9696

9797
let snapshot = reader.read(&load_params).await?;
@@ -127,7 +127,7 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
127127
location: snapshot_loc.clone(),
128128
len_hint: None,
129129
ver: TableSnapshot::VERSION,
130-
put_cache: true,
130+
put_cache: false,
131131
};
132132

133133
let snapshot = reader.read(&params).await?;

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use common_storage::DataOperator;
3131
use common_storages_fuse::io::MetaReaders;
3232
use common_storages_fuse::io::SegmentInfoReader;
3333
use common_storages_fuse::io::SegmentWriter;
34+
use common_storages_fuse::io::SegmentsIO;
3435
use common_storages_fuse::io::TableMetaLocationGenerator;
3536
use common_storages_fuse::operations::CompactOptions;
3637
use common_storages_fuse::operations::SegmentCompactMutator;
@@ -617,10 +618,9 @@ async fn test_segment_compactor() -> Result<()> {
617618

618619
struct CompactSegmentTestFixture {
619620
threshold: u64,
621+
ctx: Arc<dyn TableContext>,
620622
data_accessor: DataOperator,
621623
location_gen: TableMetaLocationGenerator,
622-
input_segments: Vec<Arc<SegmentInfo>>,
623-
input_segment_locations: Vec<Location>,
624624
// blocks of input_segments, order by segment
625625
input_blocks: Vec<BlockMeta>,
626626
}
@@ -630,11 +630,10 @@ impl CompactSegmentTestFixture {
630630
let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned());
631631
let data_accessor = ctx.get_data_operator()?;
632632
Ok(Self {
633+
ctx: ctx.clone(),
633634
threshold: block_per_seg,
634635
data_accessor,
635636
location_gen,
636-
input_segments: vec![],
637-
input_segment_locations: vec![],
638637
input_blocks: vec![],
639638
})
640639
}
@@ -649,29 +648,37 @@ impl CompactSegmentTestFixture {
649648
let location_gen = &self.location_gen;
650649
let block_writer = BlockWriter::new(data_accessor, location_gen);
651650

651+
let schema = TestFixture::default_table_schema();
652+
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
653+
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
654+
652655
let segment_writer = SegmentWriter::new(data_accessor, location_gen);
653-
let seg_acc = SegmentCompactor::new(block_per_seg, segment_writer.clone());
656+
let seg_acc = SegmentCompactor::new(
657+
block_per_seg,
658+
max_io_requests,
659+
&fuse_segment_io,
660+
segment_writer.clone(),
661+
);
654662

655-
let (segments, locations, blocks) =
663+
let (locations, blocks) =
656664
Self::gen_segments(&block_writer, &segment_writer, num_block_of_segments).await?;
657-
self.input_segments = segments;
658-
self.input_segment_locations = locations;
659665
self.input_blocks = blocks;
660666
let limit = limit.unwrap_or(usize::MAX);
661667
seg_acc
662-
.compact(&self.input_segments, &self.input_segment_locations, limit)
668+
.compact(locations, limit, |status| {
669+
self.ctx.set_status_info(&status);
670+
})
663671
.await
664672
}
665673

666674
async fn gen_segments(
667675
block_writer: &BlockWriter<'_>,
668676
segment_writer: &SegmentWriter<'_>,
669677
block_num_of_segments: &[usize],
670-
) -> Result<(Vec<Arc<SegmentInfo>>, Vec<Location>, Vec<BlockMeta>)> {
671-
let mut segments = vec![];
678+
) -> Result<(Vec<Location>, Vec<BlockMeta>)> {
672679
let mut locations = vec![];
673680
let mut collected_blocks = vec![];
674-
for num_blocks in block_num_of_segments {
681+
for num_blocks in block_num_of_segments.iter().rev() {
675682
let (schema, blocks) = TestFixture::gen_sample_blocks_ex(*num_blocks, 1, 1);
676683
let mut stats_acc = StatisticsAccumulator::default();
677684
for block in blocks {
@@ -696,11 +703,10 @@ impl CompactSegmentTestFixture {
696703
col_stats,
697704
});
698705
let location = segment_writer.write_segment_no_cache(&segment_info).await?;
699-
segments.push(Arc::new(segment_info));
700706
locations.push(location);
701707
}
702708

703-
Ok((segments, locations, collected_blocks))
709+
Ok((locations, collected_blocks))
704710
}
705711

706712
// verify that newly generated segments contain the proper number of blocks
@@ -716,7 +722,7 @@ impl CompactSegmentTestFixture {
716722
location: x.to_string(),
717723
len_hint: None,
718724
ver: SegmentInfo::VERSION,
719-
put_cache: true,
725+
put_cache: false,
720726
};
721727

722728
let seg = segment_reader.read(&load_params).await?;
@@ -791,13 +797,12 @@ impl CompactCase {
791797
let mut block_num_of_output_segments = vec![];
792798

793799
// 4. input blocks should be there and in the original order
794-
// for location in r.segments_locations.iter().rev() {
795-
for location in r.segments_locations.iter() {
800+
for location in r.segments_locations.iter().rev() {
796801
let load_params = LoadParams {
797802
location: location.0.clone(),
798803
len_hint: None,
799804
ver: location.1,
800-
put_cache: true,
805+
put_cache: false,
801806
};
802807

803808
let segment = segment_reader.read(&load_params).await?;
@@ -815,6 +820,7 @@ impl CompactCase {
815820
idx += 1;
816821
}
817822
}
823+
block_num_of_output_segments.reverse();
818824

819825
// 5. statistics should be the same
820826
assert_eq!(

src/query/service/tests/it/storages/fuse/pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ async fn test_block_pruner() -> Result<()> {
163163
location: snapshot_loc.clone(),
164164
len_hint: None,
165165
ver: TableSnapshot::VERSION,
166-
put_cache: true,
166+
put_cache: false,
167167
};
168168

169169
let snapshot = reader.read(&load_params).await?;

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
155155
.iter()
156156
.map(|b| gen_columns_statistics(&b.clone().unwrap(), None, &schema))
157157
.collect::<common_exception::Result<Vec<_>>>()?;
158-
let r = reducers::reduce_block_statistics(&col_stats, None);
158+
let r = reducers::reduce_block_statistics(&col_stats);
159159
assert!(r.is_ok());
160160
let r = r.unwrap();
161161
assert_eq!(3, r.len());
@@ -210,7 +210,7 @@ fn test_reduce_block_statistics_in_memory_size() -> common_exception::Result<()>
210210
// combine two statistics
211211
let col_stats_left = HashMap::from_iter(iter(0).take(num_of_cols));
212212
let col_stats_right = HashMap::from_iter(iter(0).take(num_of_cols));
213-
let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right], None)?;
213+
let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right])?;
214214
assert_eq!(num_of_cols, r.len());
215215
// there should be 100 columns in the result
216216
for idx in 1..=100 {

src/query/sql/src/planner/optimizer/rule/rewrite/filter_join/mark_join_to_semi_join.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<SExpr> {
3232
}
3333

3434
let mark_index = join.marker_index.unwrap();
35+
let mut find_mark_index = false;
3536

3637
// remove mark index filter
3738
for (idx, predicate) in filter.predicates.iter().enumerate() {
3839
if let ScalarExpr::BoundColumnRef(col) = predicate {
3940
if col.column.index == mark_index {
41+
find_mark_index = true;
4042
filter.predicates.remove(idx);
4143
break;
4244
}
@@ -51,6 +53,11 @@ pub fn convert_mark_to_semi_join(s_expr: &SExpr) -> Result<SExpr> {
5153
}
5254
}
5355

56+
if !find_mark_index {
57+
// To be conservative, we do not convert
58+
return Ok(s_expr.clone());
59+
}
60+
5461
join.join_type = match join.join_type {
5562
JoinType::LeftMark => JoinType::RightSemi,
5663
JoinType::RightMark => JoinType::LeftSemi,

src/query/storages/fuse/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
4444
chrono = { workspace = true }
4545
futures = "0.3.24"
4646
futures-util = "0.3.24"
47-
itertools = "0.10.5"
4847
metrics = "0.20.1"
4948
opendal = { workspace = true }
5049
serde = { workspace = true }

0 commit comments

Comments
 (0)