22import logging
33import os
44import re
5-
6- from shlex import split as shlex_split
5+ from io import StringIO
76
87from tasks .ceph_test_case import CephTestCase
98
@@ -49,7 +48,32 @@ def restore(self, mntobj):
4948 mntobj .hostfs_mntpt = self .hostfs_mntpt
5049
5150
52- class CephFSTestCase (CephTestCase ):
51+ class RunCephCmd :
52+
53+ def run_ceph_cmd (self , * args , ** kwargs ):
54+ if kwargs .get ('args' ) is None and args :
55+ if len (args ) == 1 :
56+ args = args [0 ]
57+ kwargs ['args' ] = args
58+ return self .mon_manager .run_cluster_cmd (** kwargs )
59+
60+ def get_ceph_cmd_result (self , * args , ** kwargs ):
61+ if kwargs .get ('args' ) is None and args :
62+ if len (args ) == 1 :
63+ args = args [0 ]
64+ kwargs ['args' ] = args
65+ return self .run_ceph_cmd (** kwargs ).exitstatus
66+
67+ def get_ceph_cmd_stdout (self , * args , ** kwargs ):
68+ if kwargs .get ('args' ) is None and args :
69+ if len (args ) == 1 :
70+ args = args [0 ]
71+ kwargs ['args' ] = args
72+ kwargs ['stdout' ] = kwargs .pop ('stdout' , StringIO ())
73+ return self .run_ceph_cmd (** kwargs ).stdout .getvalue ()
74+
75+
76+ class CephFSTestCase (CephTestCase , RunCephCmd ):
5377 """
5478 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
5579 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
@@ -90,25 +114,38 @@ def _remove_blocklist(self):
90114 # In case anything is in the OSD blocklist list, clear it out. This is to avoid
91115 # the OSD map changing in the background (due to blocklist expiry) while tests run.
92116 try :
93- self .mds_cluster . mon_manager . run_cluster_cmd ( args = "osd blocklist clear" )
117+ self .run_ceph_cmd ( "osd blocklist clear" )
94118 except CommandFailedError :
95119 # Fallback for older Ceph cluster
96120 try :
97- blocklist = json .loads (self .mds_cluster . mon_manager . raw_cluster_cmd ("osd" ,
98- "dump" , "--format=json-pretty" ))['blocklist' ]
121+ blocklist = json .loads (self .get_ceph_cmd_stdout ("osd" ,
122+ "dump" , "--format=json-pretty" ))['blocklist' ]
99123 log .info (f"Removing { len (blocklist )} blocklist entries" )
100124 for addr , blocklisted_at in blocklist .items ():
101- self .mds_cluster . mon_manager . raw_cluster_cmd ("osd" , "blocklist" , "rm" , addr )
125+ self .run_ceph_cmd ("osd" , "blocklist" , "rm" , addr )
102126 except KeyError :
103127 # Fallback for more older Ceph clusters, who will use 'blacklist' instead.
104- blacklist = json .loads (self .mds_cluster . mon_manager . raw_cluster_cmd ("osd" ,
105- "dump" , "--format=json-pretty" ))['blacklist' ]
128+ blacklist = json .loads (self .get_ceph_cmd_stdout ("osd" ,
129+ "dump" , "--format=json-pretty" ))['blacklist' ]
106130 log .info (f"Removing { len (blacklist )} blacklist entries" )
107131 for addr , blocklisted_at in blacklist .items ():
108- self .mds_cluster .mon_manager .raw_cluster_cmd ("osd" , "blacklist" , "rm" , addr )
132+ self .run_ceph_cmd ("osd" , "blacklist" , "rm" , addr )
133+
134+ def _init_mon_manager (self ):
135+ # if vstart_runner.py has invoked this code
136+ if 'Local' in str (type (self .ceph_cluster )):
137+ from tasks .vstart_runner import LocalCephManager
138+ self .mon_manager = LocalCephManager (ctx = self .ctx )
139+ # else teuthology has invoked this code
140+ else :
141+ from tasks .ceph_manager import CephManager
142+ self .mon_manager = CephManager (self .ceph_cluster .admin_remote ,
143+ ctx = self .ctx , logger = log .getChild ('ceph_manager' ))
109144
110145 def setUp (self ):
111146 super (CephFSTestCase , self ).setUp ()
147+ self ._init_mon_manager ()
148+ self .admin_remote = self .ceph_cluster .admin_remote
112149
113150 self .config_set ('mon' , 'mon_allow_pool_delete' , True )
114151
@@ -154,7 +191,7 @@ def setUp(self):
154191 for entry in self .auth_list ():
155192 ent_type , ent_id = entry ['entity' ].split ("." )
156193 if ent_type == "client" and ent_id not in client_mount_ids and not (ent_id == "admin" or ent_id [:6 ] == 'mirror' ):
157- self .mds_cluster . mon_manager . raw_cluster_cmd ("auth" , "del" , entry ['entity' ])
194+ self .run_ceph_cmd ("auth" , "del" , entry ['entity' ])
158195
159196 if self .REQUIRE_FILESYSTEM :
160197 self .fs = self .mds_cluster .newfs (create = True )
@@ -165,11 +202,11 @@ def setUp(self):
165202 'osd' , f'allow rw tag cephfs data={ self .fs .name } ' ,
166203 'mds' , 'allow' ]
167204
168- if self .run_cluster_cmd_result ( cmd ) == 0 :
205+ if self .get_ceph_cmd_result ( * cmd ) == 0 :
169206 break
170207
171208 cmd [1 ] = 'add'
172- if self .run_cluster_cmd_result ( cmd ) != 0 :
209+ if self .get_ceph_cmd_result ( * cmd ) != 0 :
173210 raise RuntimeError (f'Failed to create new client { cmd [2 ]} ' )
174211
175212 # wait for ranks to become active
@@ -182,9 +219,8 @@ def setUp(self):
182219 if self .REQUIRE_BACKUP_FILESYSTEM :
183220 if not self .REQUIRE_FILESYSTEM :
184221 self .skipTest ("backup filesystem requires a primary filesystem as well" )
185- self .fs .mon_manager .raw_cluster_cmd ('fs' , 'flag' , 'set' ,
186- 'enable_multiple' , 'true' ,
187- '--yes-i-really-mean-it' )
222+ self .run_ceph_cmd ('fs' , 'flag' , 'set' , 'enable_multiple' , 'true' ,
223+ '--yes-i-really-mean-it' )
188224 self .backup_fs = self .mds_cluster .newfs (name = "backup_fs" )
189225 self .backup_fs .wait_for_daemons ()
190226
@@ -220,9 +256,8 @@ def auth_list(self):
220256 """
221257 Convenience wrapper on "ceph auth ls"
222258 """
223- return json .loads (self .mds_cluster .mon_manager .raw_cluster_cmd (
224- "auth" , "ls" , "--format=json-pretty"
225- ))['auth_dump' ]
259+ return json .loads (self .get_ceph_cmd_stdout ("auth" , "ls" ,
260+ "--format=json-pretty" ))['auth_dump' ]
226261
227262 def assert_session_count (self , expected , ls_data = None , mds_id = None ):
228263 if ls_data is None :
@@ -405,16 +440,6 @@ def _wait_random_subtrees(self, count, status=None, rank=None, path=None):
405440 except contextutil .MaxWhileTries as e :
406441 raise RuntimeError ("rank {0} failed to reach desired subtree state" .format (rank )) from e
407442
408- def run_cluster_cmd (self , cmd ):
409- if isinstance (cmd , str ):
410- cmd = shlex_split (cmd )
411- return self .fs .mon_manager .raw_cluster_cmd (* cmd )
412-
413- def run_cluster_cmd_result (self , cmd ):
414- if isinstance (cmd , str ):
415- cmd = shlex_split (cmd )
416- return self .fs .mon_manager .raw_cluster_cmd_result (* cmd )
417-
418443 def create_client (self , client_id , moncap = None , osdcap = None , mdscap = None ):
419444 if not (moncap or osdcap or mdscap ):
420445 if self .fs :
@@ -432,5 +457,5 @@ def create_client(self, client_id, moncap=None, osdcap=None, mdscap=None):
432457 if mdscap :
433458 cmd += ['mds' , mdscap ]
434459
435- self .run_cluster_cmd ( cmd )
436- return self .run_cluster_cmd (f'auth get { self .client_name } ' )
460+ self .run_ceph_cmd ( * cmd )
461+ return self .run_ceph_cmd (f'auth get { self .client_name } ' )
0 commit comments