@@ -891,115 +891,123 @@ def fix_pgp_num(self, pool=None):
891891 if self .ceph_manager .set_pool_pgpnum (pool , force ):
892892 self .pools_to_fix_pgp_num .discard (pool )
893893
894+ def get_rand_pg_acting_set (self , pool_id = None ):
895+ """
896+ Return an acting set of a random PG, you
897+ have the option to specify which pool you
898+ want the PG from.
899+ """
900+ pgs = self .ceph_manager .get_pg_stats ()
901+ if not pgs :
902+ self .log ('No pgs; doing nothing' )
903+ return
904+ if pool_id :
905+ pgs_in_pool = [pg for pg in pgs if int (pg ['pgid' ].split ('.' )[0 ]) == pool_id ]
906+ pg = random .choice (pgs_in_pool )
907+ else :
908+ pg = random .choice (pgs )
909+ self .log ('Choosing PG {id} with acting set {act}' .format (id = pg ['pgid' ],act = pg ['acting' ]))
910+ return pg ['acting' ]
911+
912+ def get_k_m_ec_pool (self , pool , pool_json ):
913+ """
914+ Returns k and m
915+ """
916+ k = 0
917+ m = 99
918+ try :
919+ ec_profile = self .ceph_manager .get_pool_property (pool , 'erasure_code_profile' )
920+ ec_profile = pool_json ['erasure_code_profile' ]
921+ ec_profile_json = self .ceph_manager .raw_cluster_cmd (
922+ 'osd' ,
923+ 'erasure-code-profile' ,
924+ 'get' ,
925+ ec_profile ,
926+ '--format=json' )
927+ ec_json = json .loads (ec_profile_json )
928+ local_k = int (ec_json ['k' ])
929+ local_m = int (ec_json ['m' ])
930+ self .log ("pool {pool} local_k={k} local_m={m}" .format (pool = pool ,
931+ k = local_k , m = local_m ))
932+ if local_k > k :
933+ self .log ("setting k={local_k} from previous {k}" .format (local_k = local_k , k = k ))
934+ k = local_k
935+ if local_m < m :
936+ self .log ("setting m={local_m} from previous {m}" .format (local_m = local_m , m = m ))
937+ m = local_m
938+ except CommandFailedError :
939+ self .log ("failed to read erasure_code_profile. %s was likely removed" , pool )
940+ return None , None
941+
942+ return k , m
943+
894944 def test_pool_min_size (self ):
895945 """
896- Loop to selectively push PGs below their min_size and test that recovery
897- still occurs.
946+ Loop to selectively push PGs to their min_size and test that recovery
947+ still occurs. We achieve this by randomly picking a PG and fail the OSDs
948+ according to the PG's acting set.
898949 """
899950 self .log ("test_pool_min_size" )
900951 self .all_up ()
901952 time .sleep (60 ) # buffer time for recovery to start.
902953 self .ceph_manager .wait_for_recovery (
903954 timeout = self .config .get ('timeout' )
904955 )
905- minout = int (self .config .get ("min_out" , 1 ))
906- minlive = int (self .config .get ("min_live" , 2 ))
907- mindead = int (self .config .get ("min_dead" , 1 ))
908956 self .log ("doing min_size thrashing" )
909957 self .ceph_manager .wait_for_clean (timeout = 180 )
910958 assert self .ceph_manager .is_clean (), \
911959 'not clean before minsize thrashing starts'
912- while not self .stopping :
960+ start = time .time ()
961+ while time .time () - start < self .config .get ("test_min_size_duration" , 1800 ):
913962 # look up k and m from all the pools on each loop, in case it
914963 # changes as the cluster runs
915- k = 0
916- m = 99
917- has_pools = False
918964 pools_json = self .ceph_manager .get_osd_dump_json ()['pools' ]
919-
965+ if len (pools_json ) == 0 :
966+ self .log ("No pools yet, waiting" )
967+ time .sleep (5 )
968+ continue
920969 for pool_json in pools_json :
921970 pool = pool_json ['pool_name' ]
922- has_pools = True
971+ pool_id = pool_json [ 'pool' ]
923972 pool_type = pool_json ['type' ] # 1 for rep, 3 for ec
924973 min_size = pool_json ['min_size' ]
925974 self .log ("pool {pool} min_size is {min_size}" .format (pool = pool ,min_size = min_size ))
926- try :
927- ec_profile = self .ceph_manager .get_pool_property (pool , 'erasure_code_profile' )
928- if pool_type != PoolType .ERASURE_CODED :
929- continue
930- ec_profile = pool_json ['erasure_code_profile' ]
931- ec_profile_json = self .ceph_manager .raw_cluster_cmd (
932- 'osd' ,
933- 'erasure-code-profile' ,
934- 'get' ,
935- ec_profile ,
936- '--format=json' )
937- ec_json = json .loads (ec_profile_json )
938- local_k = int (ec_json ['k' ])
939- local_m = int (ec_json ['m' ])
940- self .log ("pool {pool} local_k={k} local_m={m}" .format (pool = pool ,
941- k = local_k , m = local_m ))
942- if local_k > k :
943- self .log ("setting k={local_k} from previous {k}" .format (local_k = local_k , k = k ))
944- k = local_k
945- if local_m < m :
946- self .log ("setting m={local_m} from previous {m}" .format (local_m = local_m , m = m ))
947- m = local_m
948- except CommandFailedError :
949- self .log ("failed to read erasure_code_profile. %s was likely removed" , pool )
975+ if pool_type != PoolType .ERASURE_CODED :
950976 continue
951-
952- if has_pools :
953- self .log ("using k={k}, m={m}" .format (k = k ,m = m ))
954- else :
955- self .log ("No pools yet, waiting" )
956- time .sleep (5 )
957- continue
958-
959- if minout > len (self .out_osds ): # kill OSDs and mark out
960- self .log ("forced to out an osd" )
961- self .kill_osd (mark_out = True )
962- continue
963- elif mindead > len (self .dead_osds ): # kill OSDs but force timeout
964- self .log ("forced to kill an osd" )
965- self .kill_osd ()
966- continue
967- else : # make mostly-random choice to kill or revive OSDs
968- minup = max (minlive , k )
969- rand_val = random .uniform (0 , 1 )
970- self .log ("choosing based on number of live OSDs and rand val {rand}" .\
971- format (rand = rand_val ))
972- if len (self .live_osds ) > minup + 1 and rand_val < 0.5 :
973- # chose to knock out as many OSDs as we can w/out downing PGs
974-
975- most_killable = min (len (self .live_osds ) - minup , m )
976- self .log ("chose to kill {n} OSDs" .format (n = most_killable ))
977- for i in range (1 , most_killable ):
978- self .kill_osd (mark_out = True )
979- time .sleep (10 )
980- # try a few times since there might be a concurrent pool
981- # creation or deletion
982- with safe_while (
983- sleep = 25 , tries = 5 ,
984- action = 'check for active or peered' ) as proceed :
985- while proceed ():
986- if self .ceph_manager .all_active_or_peered ():
987- break
988- self .log ('not all PGs are active or peered' )
989- else : # chose to revive OSDs, bring up a random fraction of the dead ones
990- self .log ("chose to revive osds" )
991- for i in range (1 , int (rand_val * len (self .dead_osds ))):
992- self .revive_osd (i )
993-
994- # let PGs repair themselves or our next knockout might kill one
995- self .ceph_manager .wait_for_clean (timeout = self .config .get ('timeout' ))
996-
997- # / while not self.stopping
998- self .all_up_in ()
999-
1000- self .ceph_manager .wait_for_recovery (
1001- timeout = self .config .get ('timeout' )
1002- )
977+ else :
978+ k , m = self .get_k_m_ec_pool (pool , pool_json )
979+ if k == None and m == None :
980+ continue
981+ self .log ("using k={k}, m={m}" .format (k = k ,m = m ))
982+
983+ self .log ("dead_osds={d}, live_osds={ld}" .format (d = self .dead_osds , ld = self .live_osds ))
984+ minup = max (min_size , k )
985+ # Choose a random PG and kill OSDs until only min_size remain
986+ most_killable = min (len (self .live_osds ) - minup , m )
987+ self .log ("chose to kill {n} OSDs" .format (n = most_killable ))
988+ acting_set = self .get_rand_pg_acting_set (pool_id )
989+ assert most_killable < len (acting_set )
990+ for i in range (0 , most_killable ):
991+ self .kill_osd (osd = acting_set [i ], mark_out = True )
992+ self .log ("dead_osds={d}, live_osds={ld}" .format (d = self .dead_osds , ld = self .live_osds ))
993+ with safe_while (
994+ sleep = 25 , tries = 5 ,
995+ action = 'check for active or peered' ) as proceed :
996+ while proceed ():
997+ if self .ceph_manager .all_active_or_peered ():
998+ break
999+ self .log ('not all PGs are active or peered' )
1000+ self .all_up_in () # revive all OSDs
1001+ # let PGs repair themselves or our next knockout might kill one
1002+ # wait_for_recovery since some workloads won't be able to go clean
1003+ self .ceph_manager .wait_for_recovery (
1004+ timeout = self .config .get ('timeout' )
1005+ )
1006+ # while not self.stopping
1007+ self .all_up_in () # revive all OSDs
1008+
1009+ # Wait until all PGs are active+clean after we have revived all the OSDs
1010+ self .ceph_manager .wait_for_clean (timeout = self .config .get ('timeout' ))
10031011
10041012 def inject_pause (self , conf_key , duration , check_after , should_be_down ):
10051013 """
@@ -2952,8 +2960,10 @@ def all_active_or_peered(self):
29522960 """
29532961 Wrapper to check if all PGs are active or peered
29542962 """
2963+ self .log ("checking for active or peered" )
29552964 pgs = self .get_pg_stats ()
29562965 if self ._get_num_active (pgs ) + self ._get_num_peered (pgs ) == len (pgs ):
2966+ self .log ("all pgs are active or peered!" )
29572967 return True
29582968 else :
29592969 self .dump_pgs_not_active_peered (pgs )
0 commit comments