|
9 | 9 | import json |
10 | 10 |
|
11 | 11 | from .cluster import Cluster |
| 12 | +from .scrub_tests import ScrubbingTestThreadBackground, ScrubRecoveryThreadBackground |
12 | 13 |
|
13 | 14 |
|
14 | 15 | logger = logging.getLogger("cbt") |
@@ -1292,216 +1293,3 @@ def run(self): |
1292 | 1293 | self.states[self.state]() |
1293 | 1294 | common.pdsh(settings.getnodes('head'), self.logcmd('Exiting recovery test thread. Last state was: %s' % self.state)).communicate() |
1294 | 1295 |
|
1295 | | - |
1296 | | -class ScrubbingTestThreadBackground(threading.Thread): |
1297 | | - def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): |
1298 | | - threading.Thread.__init__(self) |
1299 | | - self.config = config |
1300 | | - self.cluster = cluster |
1301 | | - self.callback = callback |
1302 | | - self.state = 'pre' |
1303 | | - self.states = {'pre': self.pre, 'osdout': self.osdout, 'osdin':self.osdin, |
1304 | | - 'post': self.post, 'done': self.done} |
1305 | | - self.startiorequest = startiorequest |
1306 | | - self.stoprequest = stoprequest |
1307 | | - self.haltrequest = haltrequest |
1308 | | - self.outhealthtries = 0 |
1309 | | - self.inhealthtries = 0 |
1310 | | - self.maxhealthtries = 60 |
1311 | | - self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] |
1312 | | - self.ceph_cmd = self.cluster.ceph_cmd |
1313 | | - self.lasttime = time.time() |
1314 | | - |
1315 | | - def logcmd(self, message): |
1316 | | - return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir')) |
1317 | | - |
1318 | | - def pre(self): |
1319 | | - pre_time = self.config.get("pre_time", 60) |
1320 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate() |
1321 | | - time.sleep(pre_time) |
1322 | | - self.state = 'osdout' |
1323 | | - |
1324 | | - def osdout(self): |
1325 | | - scrub_log = "%s/scrub.log" % self.config.get('run_dir') |
1326 | | - scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') |
1327 | | - ret = self.cluster.check_health(self.health_checklist, None, None) |
1328 | | - |
1329 | | - common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() |
1330 | | - |
1331 | | - self.cluster.maybe_populate_scrubbing_pool() |
1332 | | - common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate() |
1333 | | - time.sleep(10) |
1334 | | - self.lasttime = time.time() |
1335 | | - self.state = "osdin" |
1336 | | - |
1337 | | - def osdin(self): |
1338 | | - scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') |
1339 | | - self.startiorequest.set() |
1340 | | - self.cluster.initiate_scrubbing() |
1341 | | - ret = self.cluster.check_scrub(scrub_stats_log) |
1342 | | - if ret == 1: |
1343 | | - self.state = "post" |
1344 | | - |
1345 | | - def post(self): |
1346 | | - if self.stoprequest.isSet(): |
1347 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() |
1348 | | - self.haltrequest.set() |
1349 | | - return |
1350 | | - |
1351 | | - if self.config.get("repeat", False): |
1352 | | - # reset counters |
1353 | | - self.outhealthtries = 0 |
1354 | | - self.inhealthtries = 0 |
1355 | | - |
1356 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate() |
1357 | | - self.state = "osdout" |
1358 | | - return |
1359 | | - |
1360 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() |
1361 | | - self.state = "done" |
1362 | | - |
1363 | | - def done(self): |
1364 | | - common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() |
1365 | | - self.callback() |
1366 | | - self.haltrequest.set() |
1367 | | - |
1368 | | - def join(self, timeout=None): |
1369 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() |
1370 | | - super(ScrubbingTestThreadBackground, self).join(timeout) |
1371 | | - |
1372 | | - def run(self): |
1373 | | - self.haltrequest.clear() |
1374 | | - self.stoprequest.clear() |
1375 | | - self.startiorequest.clear() |
1376 | | - while not self.haltrequest.isSet(): |
1377 | | - self.states[self.state]() |
1378 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate() |
1379 | | - |
1380 | | - |
1381 | | -class ScrubRecoveryThreadBackground(threading.Thread): |
1382 | | - def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): |
1383 | | - threading.Thread.__init__(self) |
1384 | | - self.config = config |
1385 | | - self.cluster = cluster |
1386 | | - self.callback = callback |
1387 | | - self.state = 'pre' |
1388 | | - self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin, |
1389 | | - 'post': self.post, 'done': self.done} |
1390 | | - self.startiorequest = startiorequest |
1391 | | - self.stoprequest = stoprequest |
1392 | | - self.haltrequest = haltrequest |
1393 | | - self.outhealthtries = 0 |
1394 | | - self.inhealthtries = 0 |
1395 | | - self.maxhealthtries = 60 |
1396 | | - self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] |
1397 | | - self.ceph_cmd = self.cluster.ceph_cmd |
1398 | | - self.lasttime = time.time() |
1399 | | - |
1400 | | - def logcmd(self, message): |
1401 | | - return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) |
1402 | | - |
1403 | | - def pre(self): |
1404 | | - pre_time = self.config.get("pre_time", 60) |
1405 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() |
1406 | | - time.sleep(pre_time) |
1407 | | - lcmd = self.logcmd("Setting the ceph osd noup flag") |
1408 | | - common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() |
1409 | | - self.state = 'markdown' |
1410 | | - |
1411 | | - def markdown(self): |
1412 | | - for osdnum in self.config.get('osds'): |
1413 | | - lcmd = self.logcmd("Marking OSD %s down." % osdnum) |
1414 | | - common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() |
1415 | | - lcmd = self.logcmd("Marking OSD %s out." % osdnum) |
1416 | | - common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() |
1417 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() |
1418 | | - self.lasttime = time.time() |
1419 | | - self.state = 'osdout' |
1420 | | - |
1421 | | - |
1422 | | - def osdout(self): |
1423 | | - reclog = "%s/recovery.log" % self.config.get('run_dir') |
1424 | | - recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') |
1425 | | - ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) |
1426 | | - |
1427 | | - common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() |
1428 | | - |
1429 | | - if ret == 0: |
1430 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() |
1431 | | - else: |
1432 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() |
1433 | | - rectime = str(time.time() - self.lasttime) |
1434 | | - common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() |
1435 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() |
1436 | | - |
1437 | | - # Populate the recovery pool |
1438 | | - self.cluster.maybe_populate_recovery_pool() |
1439 | | - |
1440 | | - common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() |
1441 | | - time.sleep(10) |
1442 | | - lcmd = self.logcmd("Unsetting the ceph osd noup flag") |
1443 | | - self.cluster.disable_recovery() |
1444 | | - common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() |
1445 | | - for osdnum in self.config.get('osds'): |
1446 | | - lcmd = self.logcmd("Marking OSD %s up." % osdnum) |
1447 | | - common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() |
1448 | | - lcmd = self.logcmd("Marking OSD %s in." % osdnum) |
1449 | | - common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() |
1450 | | - self.lasttime = time.time() |
1451 | | - # Populate the scrub pool |
1452 | | - logger.info("Sleep before scrub populate") |
1453 | | - time.sleep(10) |
1454 | | - self.cluster.maybe_populate_scrubbing_pool() |
1455 | | - self.state = "osdin" |
1456 | | - |
1457 | | - |
1458 | | - def osdin(self): |
1459 | | - #Start scrub |
1460 | | - self.startiorequest.set() |
1461 | | - self.cluster.initiate_scrubbing() |
1462 | | - self.cluster.enable_recovery() |
1463 | | - recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') |
1464 | | - scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') |
1465 | | - backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) |
1466 | | - scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) |
1467 | | - backfill.start() |
1468 | | - scrub_check.start() |
1469 | | - backfill.join() |
1470 | | - scrub_check.join() |
1471 | | - self.state = "post" |
1472 | | - |
1473 | | - |
1474 | | - def post(self): |
1475 | | - if self.stoprequest.isSet(): |
1476 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() |
1477 | | - self.haltrequest.set() |
1478 | | - return |
1479 | | - |
1480 | | - if self.config.get("repeat", False): |
1481 | | - # reset counters |
1482 | | - self.outhealthtries = 0 |
1483 | | - self.inhealthtries = 0 |
1484 | | - |
1485 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() |
1486 | | - self.state = "markdown" |
1487 | | - return |
1488 | | - |
1489 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() |
1490 | | - self.state = "done" |
1491 | | - |
1492 | | - def done(self): |
1493 | | - common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() |
1494 | | - self.callback() |
1495 | | - self.haltrequest.set() |
1496 | | - |
1497 | | - def join(self, timeout=None): |
1498 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() |
1499 | | - super(ScrubRecoveryThreadBackground, self).join(timeout) |
1500 | | - |
1501 | | - def run(self): |
1502 | | - self.haltrequest.clear() |
1503 | | - self.stoprequest.clear() |
1504 | | - self.startiorequest.clear() |
1505 | | - while not self.haltrequest.isSet(): |
1506 | | - self.states[self.state]() |
1507 | | - common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate() |
0 commit comments