@@ -503,12 +503,11 @@ struct SegmentWithHLL {
503
503
indexes : Vec < usize > ,
504
504
new_hlls : Vec < Option < BlockHLL > > ,
505
505
}
506
-
507
506
enum State {
508
507
ReadData ( Option < PartInfoPtr > ) ,
509
508
CollectNDV {
510
509
location : Location ,
511
- info : CompactSegmentInfo ,
510
+ info : Arc < CompactSegmentInfo > ,
512
511
hlls : Vec < RawBlockHLL > ,
513
512
} ,
514
513
GenerateHLL ,
@@ -601,6 +600,8 @@ impl Processor for AnalyzeCollectNDVSource {
601
600
new_hlls,
602
601
} ) ;
603
602
self . state = State :: GenerateHLL ;
603
+ } else {
604
+ self . state = State :: ReadData ( None ) ;
604
605
}
605
606
}
606
607
State :: SyncGen => {
@@ -629,6 +630,7 @@ impl Processor for AnalyzeCollectNDVSource {
629
630
let part = FuseLazyPartInfo :: from_part ( & part) ?;
630
631
let ( path, ver) = & part. segment_location ;
631
632
if * ver < 2 {
633
+ self . state = State :: ReadData ( None ) ;
632
634
return Ok ( ( ) ) ;
633
635
}
634
636
let load_param = LoadParams {
@@ -654,6 +656,11 @@ impl Processor for AnalyzeCollectNDVSource {
654
656
} else {
655
657
vec ! [ vec![ ] ; block_count]
656
658
} ;
659
+ self . state = State :: CollectNDV {
660
+ location : part. segment_location . clone ( ) ,
661
+ info : compact_segment_info,
662
+ hlls : block_hlls,
663
+ } ;
657
664
}
658
665
State :: GenerateHLL => {
659
666
let segment_with_hll = self . segment_with_hll . as_mut ( ) . unwrap ( ) ;
@@ -669,9 +676,7 @@ impl Processor for AnalyzeCollectNDVSource {
669
676
let storage_format = self . storage_format . clone ( ) ;
670
677
let block_meta = segment_with_hll. blocks [ idx] . clone ( ) ;
671
678
let ndv_columns_map = self . ndv_columns_map . clone ( ) ;
672
- let handler: databend_common_base:: JoinHandle <
673
- std:: result:: Result < Option < HashMap < u32 , MetaHLL > > , ErrorCode > ,
674
- > = runtime. spawn ( async move {
679
+ let handler = runtime. spawn ( async move {
675
680
let block = block_reader
676
681
. read_by_meta ( & settings, & block_meta, & storage_format)
677
682
. await ?;
@@ -690,8 +695,11 @@ impl Processor for AnalyzeCollectNDVSource {
690
695
} ) ?;
691
696
let new_hlls = joint. into_iter ( ) . collect :: < Result < Vec < _ > > > ( ) ?;
692
697
if new_hlls. iter ( ) . all ( |v| v. is_none ( ) ) {
698
+ self . segment_with_hll = None ;
699
+ self . state = State :: ReadData ( None ) ;
693
700
} else {
694
701
segment_with_hll. new_hlls = new_hlls;
702
+ self . state = State :: Write ;
695
703
}
696
704
}
697
705
State :: Write => {
@@ -719,6 +727,7 @@ impl Processor for AnalyzeCollectNDVSource {
719
727
new_segment
720
728
. write_meta_through_cache ( & self . dal , segment_loc)
721
729
. await ?;
730
+ self . state = State :: ReadData ( None ) ;
722
731
}
723
732
_ => return Err ( ErrorCode :: Internal ( "It's a bug." ) ) ,
724
733
}
0 commit comments