Skip to content

Commit 9a46a0d

Browse files
authored
Stateful skipper rc bridge (ydb-platform#23455) (ydb-platform#24299)
2 parents 8f82577 + 0a17ba1 commit 9a46a0d

File tree

7 files changed

+904
-50
lines changed

7 files changed

+904
-50
lines changed

ydb/apps/bridge_skipper_demo/README_ru.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pip3 install textual requests yaml
2525

2626
Пример использования:
2727
```
28-
./skipper.py -e <EXAMPLE.COM> --cluster ydb-test-bridge --tui
28+
./skipper.py -e <EXAMPLE.COM> -s ~/ydb-test-bridge-state.json --cluster ydb-test-bridge --tui
2929
```
3030

3131
![Import](img/skipper_demo.gif)
@@ -35,6 +35,7 @@ pip3 install textual requests yaml
3535
| Параметр | По умолчанию | Описание |
3636
|---|---|---|
3737
| `--endpoint` || Хост YDB для получения информации о кластере. |
38+
| `--state` || Путь к файлу, где skipper хранит свой стейт. |
3839
| `--ydb` | из PATH | Путь к исполняемому файлу YDB CLI. |
3940
| `--disable-auto-failover` | false | Отключить автоматический failover. |
4041
| `--log-level` | INFO | Уровень логирования: `TRACE`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`. |

ydb/apps/bridge_skipper_demo/bridge.py

Lines changed: 114 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
import health
2+
import json
3+
import pickle
4+
import os
5+
from cli_wrapper import *
6+
17
import collections
28
import copy
39
import logging
@@ -8,15 +14,11 @@
814

915
from typing import Dict, List, Optional, Tuple
1016

11-
from cli_wrapper import *
12-
from health import AsyncHealthcheckRunner, PileWorldView, HEALTH_REPLY_TTL_SECONDS
13-
14-
1517
logger = logging.getLogger(__name__)
1618

1719
# Currently when pile goes from 'NOT SYNCHRONIZED -> SYNCHRONIZED' there might be
1820
# healthchecks reporting it bad, we should ignore them for some time.
19-
TO_SYNC_TRANSITION_GRACE_PERIOD = HEALTH_REPLY_TTL_SECONDS * 2
21+
TO_SYNC_TRANSITION_GRACE_PERIOD = health.HEALTH_REPLY_TTL_SECONDS * 2
2022

2123
# Emit an info-level "All is good" / "There are bad piles" at most once per this period (seconds)
2224
STATUS_INFO_PERIOD_SECONDS = 60.0
@@ -116,40 +118,55 @@ def __eq__(self, other):
116118

117119

118120
class GlobalState:
119-
def __init__(self, pile_world_views: Dict[str, PileWorldView], prev_state: "GlobalState"):
121+
def __init__(self, pile_world_views: Dict[str, health.PileWorldView], prev_state: "GlobalState"):
120122
self.wall_ts = time.time()
121123
self.monotonic_ts = time.monotonic()
122124

123125
self.piles = {} # pile name -> PileState
124126
self.generation = None # latest known generation
127+
self.wait_for_generation = None # don't do anything until we see this generation or newer
125128
self.primary_name = None # primary according latest generation
126129

127130
# pile_name -> number of piles reporting it bad including itself
128131
# Not part of state, just aggregation over self.piles.
129132
self._bad_piles = collections.defaultdict(int)
130133

131-
pile_with_latest_generation = None
132-
133134
# step 1: find the latest admin state. Here we assume that no configuration
134135
# mistakes done and latest generation is good and the latest one.
135-
for pile_name, view in pile_world_views.items():
136-
if view.admin_states and view.admin_states.generation:
137-
if not self.generation or view.admin_states.generation > self.generation:
138-
self.generation = view.admin_states.generation
139-
pile_with_latest_generation = pile_name
140136

141-
if not self.generation:
137+
latest_generation, pile_with_latest_generation = health.get_latest_generation_pile(pile_world_views)
138+
139+
# step 2: get pile state from the latest generation (either new one or previously known)
140+
141+
has_previous_generation = prev_state and prev_state.generation
142+
143+
# there should be either just obtained generation or previous one
144+
if not (latest_generation or has_previous_generation):
142145
logger.error(f"healthcheck failed, no admin config found: {pile_world_views}")
143146
return
144147

145-
# step 2: get pile state from the latest generation
146-
147-
admin_items = pile_world_views[pile_with_latest_generation].admin_states.piles
148-
for pile_name, admin_item in admin_items.items():
149-
self.piles[pile_name] = PileState()
150-
self.piles[pile_name].admin_reported_state = admin_item.admin_reported_state
151-
if admin_item.admin_reported_state == "PRIMARY":
152-
self.primary_name = pile_name
148+
if latest_generation and (not has_previous_generation or latest_generation >= prev_state.generation):
149+
# new generation available
150+
logger.debug(f"New config generation: {latest_generation}")
151+
admin_items = pile_world_views[pile_with_latest_generation].admin_states.piles
152+
self.generation = latest_generation
153+
for pile_name, admin_item in admin_items.items():
154+
self.piles[pile_name] = PileState()
155+
self.piles[pile_name].admin_reported_state = admin_item.admin_reported_state
156+
if admin_item.admin_reported_state == "PRIMARY":
157+
self.primary_name = pile_name
158+
else:
159+
# we have seen more recent generation, fall back to it
160+
logger.debug(f"Fall back to previous known generation: {prev_state.generation}")
161+
self.generation = prev_state.generation
162+
for pile_name, pile in prev_state.piles.items():
163+
self.piles[pile_name] = PileState()
164+
self.piles[pile_name].admin_reported_state = pile.admin_reported_state
165+
if pile.admin_reported_state == "PRIMARY":
166+
self.primary_name = pile_name
167+
168+
if prev_state and prev_state.wait_for_generation and prev_state.wait_for_generation > self.generation:
169+
self.wait_for_generation = prev_state.wait_for_generation
153170

154171
# step 3: process pile health part
155172

@@ -212,6 +229,9 @@ def __eq__(self, other):
212229
return NotImplemented
213230
if self.generation != other.generation:
214231
return False
232+
233+
# we intentionally ignore wait_for_generation
234+
215235
if self.primary_name != other.primary_name:
216236
return False
217237
if set(self.piles.keys()) != set(other.piles.keys()):
@@ -233,8 +253,13 @@ def __str__(self):
233253
details_parts.append(f"{name}={pile}")
234254
piles_str = ", ".join(piles_parts)
235255
details_str = ", ".join(details_parts)
236-
return (f"GlobalState(ts={self.wall_ts}, gen={self.generation}, primary={self.primary_name}, "
237-
f"piles=[{piles_str}], details=[{details_str}])")
256+
257+
s = f"GlobalState(ts={self.wall_ts}, gen={self.generation}"
258+
if self.wait_for_generation:
259+
s += f", wait_generation={self.wait_for_generation}"
260+
261+
s += f", primary={self.primary_name}, piles=[{piles_str}], details=[{details_str}])"
262+
return s
238263

239264
def has_responsive_piles(self):
240265
for pile in self.piles.values():
@@ -414,21 +439,28 @@ def add_state(self, state):
414439

415440

416441
class BridgeSkipper:
417-
def __init__(self, path_to_cli: str, initial_piles: Dict[str, List[str]], use_https: bool = False, auto_failover: bool = True):
442+
def __init__(self, path_to_cli: str, initial_piles: Dict[str, List[str]], use_https: bool = False, auto_failover: bool = True, state_path: Optional[str] = None):
418443
self.path_to_cli = path_to_cli
419444
self.initial_piles = initial_piles
420445
self.use_https = use_https
421446
self.auto_failover = auto_failover
447+
self.state_path = state_path
422448

423-
self.async_checker = AsyncHealthcheckRunner(path_to_cli, initial_piles, use_https=use_https)
449+
self.async_checker = health.AsyncHealthcheckRunner(path_to_cli, initial_piles, use_https=use_https)
424450
self.async_checker.start()
425451

426452
# TODO: avoid hack, sleep to give async_checker time to get data
427-
time.sleep(HEALTH_REPLY_TTL_SECONDS)
453+
time.sleep(health.HEALTH_REPLY_TTL_SECONDS)
428454

429455
self.current_state = None
430456
self.state_history = TransitionHistory()
431457

458+
# Attempt to load previous state if provided
459+
try:
460+
self._load_state()
461+
except Exception as e:
462+
logger.debug(f"Failed to load saved state: {e}")
463+
432464
self._state_lock = threading.Lock()
433465
self._run_thread = None
434466
self._last_status_info_ts = 0.0
@@ -454,6 +486,10 @@ def _do_healthcheck(self):
454486
with self._state_lock:
455487
self.current_state = new_state
456488
self.state_history.add_state(self.current_state)
489+
try:
490+
self._save_state()
491+
except Exception as e:
492+
logger.error(f"Failed to save state: {e}")
457493

458494
def _decide(self) -> Optional[Tuple[str, List[List[str]]]]:
459495
# decide is called within the same thread as do_healthcheck,
@@ -468,6 +504,11 @@ def _decide(self) -> Optional[Tuple[str, List[List[str]]]]:
468504
logger.critical(f"All piles are unresponsive: {self.current_state}")
469505
return None
470506

507+
if (self.current_state.wait_for_generation and
508+
self.current_state.generation < self.current_state.wait_for_generation):
509+
logger.debug(f"Current generation {self.current_state}, waiting >= {self.current_state.wait_for_generation}")
510+
return None
511+
471512
bad_piles = [pile for pile, state in self.current_state.piles.items() if not state.is_good()]
472513

473514
monotonicNow = time.monotonic()
@@ -531,6 +572,7 @@ def _apply_decision(self, primary: str, commands: List[List[str]]):
531572

532573
if commands and len(commands) > 0:
533574
some_failed = False
575+
succeeded_count = 0
534576
for command in commands:
535577
command_str = f"{self.path_to_cli} -e grpc://{endpoints[0]}:2135 " + " ".join(command)
536578
if self.auto_failover:
@@ -539,13 +581,23 @@ def _apply_decision(self, primary: str, commands: List[List[str]]):
539581
if result is None:
540582
some_failed = True
541583
logger.error(f"Failed to apply command {command}")
584+
else:
585+
succeeded_count += 1
542586
else:
543587
logger.warning(f"Autofailover disabled, please execute the command: {command_str}")
544588
if some_failed:
545589
logger.critical("Failover failed: cluster might be down!")
546590
else:
547591
if self.auto_failover:
548-
logger.info("Failover commands executed successfully")
592+
commands_s = "command" if succeeded_count == 1 else "commands"
593+
new_min_gen = self.current_state.generation + succeeded_count
594+
logger.info(f"{succeeded_count} failover {commands_s} executed successfully, expecting gen >= {new_min_gen}")
595+
with self._state_lock:
596+
self.current_state.wait_for_generation = new_min_gen
597+
try:
598+
self._save_state()
599+
except Exception as e:
600+
logger.error(f"Failed to save state: {e}")
549601

550602
def _maintain_once(self):
551603
self._do_healthcheck()
@@ -598,6 +650,40 @@ def get_state_and_history(self) -> Tuple[Optional[GlobalState], List[str]]:
598650
transitions_copy = list(self.state_history.get_transitions())
599651
return (state_copy, transitions_copy)
600652

653+
def _save_state(self):
654+
if not self.state_path:
655+
return
656+
state_path = self.state_path
657+
# Truncate and write binary pickle, fsync for durability
658+
fd = None
659+
try:
660+
fd = os.open(state_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644)
661+
with os.fdopen(fd, "wb") as f:
662+
pickle.dump((self.current_state, self.state_history), f, protocol=pickle.HIGHEST_PROTOCOL)
663+
f.flush()
664+
os.fsync(f.fileno())
665+
fd = None
666+
finally:
667+
try:
668+
if fd is not None:
669+
os.close(fd)
670+
except Exception:
671+
pass
672+
673+
def _load_state(self):
674+
if not self.state_path:
675+
return
676+
if not os.path.exists(self.state_path):
677+
return
678+
try:
679+
with open(self.state_path, "rb") as f:
680+
state, history = pickle.load(f)
681+
# Assign loaded objects as-is
682+
self.current_state = state
683+
self.state_history = history
684+
except Exception as e:
685+
logger.debug(f"Failed to read state file: {e}")
686+
601687

602688
def get_max_status_length():
603689
max_length = 0

ydb/apps/bridge_skipper_demo/health.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from cli_wrapper import *
2+
13
import collections
24
import copy
35
import json
@@ -10,8 +12,6 @@
1012
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
1113
from typing import Dict, List, Optional, Tuple, Any
1214

13-
from cli_wrapper import *
14-
1515
# TODO: lower HEALTH_REPLY_TTL_SECONDS and YDB_HEALTHCHECK_TIMEOUT_MS with new
1616
# version of ydbd healthcheck
1717

@@ -208,6 +208,17 @@ def __repr__(self):
208208
return self.__str__()
209209

210210

211+
def get_latest_generation_pile(pile_world_views: Dict[str, PileWorldView]) -> Tuple[Optional[int], Optional[str]]:
212+
latest_generation = None
213+
pile_with_latest_generation = None
214+
for pile_name, view in pile_world_views.items():
215+
if view.admin_states and view.admin_states.generation:
216+
if not latest_generation or view.admin_states.generation > latest_generation:
217+
latest_generation = view.admin_states.generation
218+
pile_with_latest_generation = pile_name
219+
return (latest_generation, pile_with_latest_generation,)
220+
221+
211222
# TODO: use public API instead
212223
def _async_fetch_pile_list(path_to_cli: str, endpoints: List[str], executor: ThreadPoolExecutor) -> Future:
213224
def worker() -> Optional[PileAdminStates]:

ydb/apps/bridge_skipper_demo/skipper.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
# |_/___\__|
1313
#
1414

15+
import bridge
16+
import health
17+
import skipper_tui
18+
1519
import argparse
1620
import atexit
1721
import logging
@@ -22,10 +26,6 @@
2226
import time
2327
import threading
2428

25-
import bridge
26-
import health
27-
import skipper_tui
28-
2929

3030
logger = logging.getLogger(__name__)
3131

@@ -203,6 +203,8 @@ def _parse_args():
203203
parser.add_argument("--endpoint", "-e", required=True,
204204
help="Single endpoint used to resolve piles and their endpoints")
205205

206+
parser.add_argument("--state", "-s", required=True, help="Path to keeper state file (pickle format)")
207+
206208
parser.add_argument("--ydb", required=False, help="Path to ydb cli")
207209
parser.add_argument("--disable-auto-failover", action="store_true", help="Disable automatical failover")
208210

@@ -216,14 +218,14 @@ def _parse_args():
216218
return parser.parse_args()
217219

218220

219-
def _run_no_tui(path_to_cli, piles, use_https, auto_failover):
220-
keeper = bridge.BridgeSkipper(path_to_cli, piles, use_https=use_https, auto_failover=auto_failover)
221+
def _run_no_tui(path_to_cli, piles, use_https, auto_failover, state_path):
222+
keeper = bridge.BridgeSkipper(path_to_cli, piles, use_https=use_https, auto_failover=auto_failover, state_path=state_path)
221223
keeper.run()
222224

223225

224226
def _run_tui(args, path_to_cli, piles):
225227
auto_failover = not args.disable_auto_failover
226-
keeper = bridge.BridgeSkipper(path_to_cli, piles, use_https=args.https, auto_failover=auto_failover)
228+
keeper = bridge.BridgeSkipper(path_to_cli, piles, use_https=args.https, auto_failover=auto_failover, state_path=args.state)
227229
app = skipper_tui.KeeperApp(
228230
keeper=keeper,
229231
cluster_name=args.cluster,
@@ -264,6 +266,28 @@ def main():
264266
path_to_cli = found
265267
logger.debug(f"Found ydb CLI: {path_to_cli}")
266268

269+
# check state path
270+
try:
271+
state_path = args.state
272+
# Last component might not exist; parent dir must exist and be writable/executable
273+
state_dir = os.path.dirname(state_path) or "."
274+
if not os.path.isdir(state_dir):
275+
logger.error(f"State directory does not exist: {state_dir}")
276+
sys.exit(2)
277+
if not (os.access(state_dir, os.W_OK) and os.access(state_dir, os.X_OK)):
278+
logger.error(f"Insufficient permissions on state directory: {state_dir}")
279+
sys.exit(2)
280+
if os.path.exists(state_path):
281+
if not os.path.isfile(state_path):
282+
logger.error(f"State path exists but is not a regular file: {state_path}")
283+
sys.exit(2)
284+
if not (os.access(state_path, os.R_OK) and os.access(state_path, os.W_OK)):
285+
logger.error(f"Insufficient permissions on state file: {state_path}")
286+
sys.exit(2)
287+
except Exception as e:
288+
logger.error(f"Failed to validate state path '{args.state}': {e}")
289+
sys.exit(2)
290+
267291
piles = None
268292
try:
269293
piles = bridge.resolve(args.endpoint, path_to_cli)
@@ -300,7 +324,7 @@ def main():
300324
_run_tui(args, path_to_cli, piles)
301325
else:
302326
auto_failover = not args.disable_auto_failover
303-
_run_no_tui(path_to_cli, piles, args.https, auto_failover)
327+
_run_no_tui(path_to_cli, piles, args.https, auto_failover, args.state)
304328

305329

306330
if __name__ == "__main__":

0 commit comments

Comments
 (0)