@@ -863,20 +863,68 @@ def fix_pgp_num(self, pool=None):
863863 if self .ceph_manager .set_pool_pgpnum (pool , force ):
864864 self .pools_to_fix_pgp_num .discard (pool )
865865
866+ def get_rand_pg_acting_set (self , pool_id = None ):
867+ """
868+ Return an acting set of a random PG, you
869+ have the option to specify which pool you
870+ want the PG from.
871+ """
872+ pgs = self .ceph_manager .get_pg_stats ()
873+ if not pgs :
874+ self .log ('No pgs; doing nothing' )
875+ return
876+ if pool_id :
877+ pgs_in_pool = [pg for pg in pgs if int (pg ['pgid' ].split ('.' )[0 ]) == pool_id ]
878+ pg = random .choice (pgs_in_pool )
879+ else :
880+ pg = random .choice (pgs )
881+ self .log ('Choosing PG {id} with acting set {act}' .format (id = pg ['pgid' ],act = pg ['acting' ]))
882+ return pg ['acting' ]
883+
884+ def get_k_m_ec_pool (self , pool , pool_json ):
885+ """
886+ Returns k and m
887+ """
888+ k = 0
889+ m = 99
890+ try :
891+ ec_profile = self .ceph_manager .get_pool_property (pool , 'erasure_code_profile' )
892+ ec_profile = pool_json ['erasure_code_profile' ]
893+ ec_profile_json = self .ceph_manager .raw_cluster_cmd (
894+ 'osd' ,
895+ 'erasure-code-profile' ,
896+ 'get' ,
897+ ec_profile ,
898+ '--format=json' )
899+ ec_json = json .loads (ec_profile_json )
900+ local_k = int (ec_json ['k' ])
901+ local_m = int (ec_json ['m' ])
902+ self .log ("pool {pool} local_k={k} local_m={m}" .format (pool = pool ,
903+ k = local_k , m = local_m ))
904+ if local_k > k :
905+ self .log ("setting k={local_k} from previous {k}" .format (local_k = local_k , k = k ))
906+ k = local_k
907+ if local_m < m :
908+ self .log ("setting m={local_m} from previous {m}" .format (local_m = local_m , m = m ))
909+ m = local_m
910+ except CommandFailedError :
911+ self .log ("failed to read erasure_code_profile. %s was likely removed" , pool )
912+ return None , None
913+
914+ return k , m
915+
866916 def test_pool_min_size (self ):
867917 """
868- Loop to selectively push PGs below their min_size and test that recovery
869- still occurs.
918+ Loop to selectively push PGs to their min_size and test that recovery
919+ still occurs. We achieve this by randomly picking a PG and fail the OSDs
920+ according to the PG's acting set.
870921 """
871922 self .log ("test_pool_min_size" )
872923 self .all_up ()
873924 time .sleep (60 ) # buffer time for recovery to start.
874925 self .ceph_manager .wait_for_recovery (
875926 timeout = self .config .get ('timeout' )
876927 )
877- minout = int (self .config .get ("min_out" , 1 ))
878- minlive = int (self .config .get ("min_live" , 2 ))
879- mindead = int (self .config .get ("min_dead" , 1 ))
880928 self .log ("doing min_size thrashing" )
881929 self .ceph_manager .wait_for_clean (timeout = 180 )
882930 assert self .ceph_manager .is_clean (), \
@@ -885,94 +933,54 @@ def test_pool_min_size(self):
885933 while time .time () - start < self .config .get ("test_min_size_duration" , 1800 ):
886934 # look up k and m from all the pools on each loop, in case it
887935 # changes as the cluster runs
888- k = 0
889- m = 99
890- has_pools = False
891936 pools_json = self .ceph_manager .get_osd_dump_json ()['pools' ]
892-
937+ if len (pools_json ) == 0 :
938+ self .log ("No pools yet, waiting" )
939+ time .sleep (5 )
940+ continue
893941 for pool_json in pools_json :
894942 pool = pool_json ['pool_name' ]
895- has_pools = True
943+ pool_id = pool_json [ 'pool' ]
896944 pool_type = pool_json ['type' ] # 1 for rep, 3 for ec
897945 min_size = pool_json ['min_size' ]
898946 self .log ("pool {pool} min_size is {min_size}" .format (pool = pool ,min_size = min_size ))
899- try :
900- ec_profile = self .ceph_manager .get_pool_property (pool , 'erasure_code_profile' )
901- if pool_type != PoolType .ERASURE_CODED :
902- continue
903- ec_profile = pool_json ['erasure_code_profile' ]
904- ec_profile_json = self .ceph_manager .raw_cluster_cmd (
905- 'osd' ,
906- 'erasure-code-profile' ,
907- 'get' ,
908- ec_profile ,
909- '--format=json' )
910- ec_json = json .loads (ec_profile_json )
911- local_k = int (ec_json ['k' ])
912- local_m = int (ec_json ['m' ])
913- self .log ("pool {pool} local_k={k} local_m={m}" .format (pool = pool ,
914- k = local_k , m = local_m ))
915- if local_k > k :
916- self .log ("setting k={local_k} from previous {k}" .format (local_k = local_k , k = k ))
917- k = local_k
918- if local_m < m :
919- self .log ("setting m={local_m} from previous {m}" .format (local_m = local_m , m = m ))
920- m = local_m
921- except CommandFailedError :
922- self .log ("failed to read erasure_code_profile. %s was likely removed" , pool )
947+ if pool_type != PoolType .ERASURE_CODED :
923948 continue
924-
925- if has_pools :
926- self .log ("using k={k}, m={m}" .format (k = k ,m = m ))
927- else :
928- self .log ("No pools yet, waiting" )
929- time .sleep (5 )
930- continue
931-
932- if minout > len (self .out_osds ): # kill OSDs and mark out
933- self .log ("forced to out an osd" )
934- self .kill_osd (mark_out = True )
935- continue
936- elif mindead > len (self .dead_osds ): # kill OSDs but force timeout
937- self .log ("forced to kill an osd" )
938- self .kill_osd ()
939- continue
940- else : # make mostly-random choice to kill or revive OSDs
941- minup = max (minlive , k )
942- rand_val = random .uniform (0 , 1 )
943- self .log ("choosing based on number of live OSDs and rand val {rand}" .\
944- format (rand = rand_val ))
945- if len (self .live_osds ) > minup + 1 and rand_val < 0.5 :
946- # chose to knock out as many OSDs as we can w/out downing PGs
947-
948- most_killable = min (len (self .live_osds ) - minup , m )
949- self .log ("chose to kill {n} OSDs" .format (n = most_killable ))
950- for i in range (1 , most_killable ):
951- self .kill_osd (mark_out = True )
952- time .sleep (10 )
953- # try a few times since there might be a concurrent pool
954- # creation or deletion
955- with safe_while (
956- sleep = 25 , tries = 5 ,
957- action = 'check for active or peered' ) as proceed :
958- while proceed ():
959- if self .ceph_manager .all_active_or_peered ():
960- break
961- self .log ('not all PGs are active or peered' )
962- else : # chose to revive OSDs, bring up a random fraction of the dead ones
963- self .log ("chose to revive osds" )
964- for i in range (1 , int (rand_val * len (self .dead_osds ))):
965- self .revive_osd (i )
966-
967- # let PGs repair themselves or our next knockout might kill one
968- self .ceph_manager .wait_for_clean (timeout = self .config .get ('timeout' ))
969-
970- # / while not self.stopping
971- self .all_up_in ()
972-
973- self .ceph_manager .wait_for_recovery (
974- timeout = self .config .get ('timeout' )
975- )
949+ else :
950+ k , m = self .get_k_m_ec_pool (pool , pool_json )
951+ if k == None and m == None :
952+ continue
953+ self .log ("using k={k}, m={m}" .format (k = k ,m = m ))
954+
955+ self .log ("dead_osds={d}, live_osds={ld}" .format (d = self .dead_osds , ld = self .live_osds ))
956+ minup = max (min_size , k )
957+ # Choose a random PG and kill OSDs until only min_size remain
958+ most_killable = min (len (self .live_osds ) - minup , m )
959+ self .log ("chose to kill {n} OSDs" .format (n = most_killable ))
960+ acting_set = self .get_rand_pg_acting_set (pool_id )
961+ assert most_killable < len (acting_set )
962+ for i in range (0 , most_killable ):
963+ self .kill_osd (osd = acting_set [i ], mark_out = True )
964+ self .log ("dead_osds={d}, live_osds={ld}" .format (d = self .dead_osds , ld = self .live_osds ))
965+ self .log ("check for active or peered" )
966+ with safe_while (
967+ sleep = 25 , tries = 5 ,
968+ action = 'check for active or peered' ) as proceed :
969+ while proceed ():
970+ if self .ceph_manager .all_active_or_peered ():
971+ break
972+ self .log ('not all PGs are active or peered' )
973+ self .all_up_in () # revive all OSDs
974+ # let PGs repair themselves or our next knockout might kill one
975+ # wait_for_recovery since some workloads won't be able to go clean
976+ self .ceph_manager .wait_for_recovery (
977+ timeout = self .config .get ('timeout' )
978+ )
979+ # while not self.stopping
980+ self .all_up_in () # revive all OSDs
981+
982+ # Wait until all PGs are active+clean after we have revived all the OSDs
983+ self .ceph_manager .wait_for_clean (timeout = self .config .get ('timeout' ))
976984
977985 def inject_pause (self , conf_key , duration , check_after , should_be_down ):
978986 """
0 commit comments