55import java .util .List ;
66import java .util .Optional ;
77import java .util .concurrent .Callable ;
8+ import java .util .concurrent .ExecutionException ;
89import java .util .concurrent .ExecutorService ;
9- import java .util .concurrent .Executors ;
1010import java .util .concurrent .Future ;
1111import java .util .concurrent .atomic .AtomicReference ;
1212import java .util .stream .Collectors ;
1616import org .slf4j .Logger ;
1717import org .slf4j .LoggerFactory ;
1818
19- import tech .ydb .common .retry .RetryForever ;
2019import tech .ydb .coordination .CoordinationClient ;
2120import tech .ydb .coordination .CoordinationSession ;
2221import tech .ydb .coordination .description .SemaphoreDescription ;
2322import tech .ydb .coordination .recipes .locks .LockInternals ;
2423import tech .ydb .coordination .recipes .util .Listenable ;
24+ import tech .ydb .coordination .recipes .util .ListenableContainer ;
2525import tech .ydb .coordination .recipes .util .SessionListenableProvider ;
2626import tech .ydb .coordination .recipes .util .SemaphoreObserver ;
2727import tech .ydb .coordination .settings .DescribeSemaphoreMode ;
2828import tech .ydb .coordination .settings .WatchSemaphoreMode ;
29+ import tech .ydb .core .Status ;
2930import tech .ydb .core .StatusCode ;
3031
31- // TODO: настройки + переименовать переменные + рекомендации по коду + логгирование + backoff политика
32+ // TODO: backoff политика + документцаия / логгирование / рекомендации по коду
3233public class LeaderElection implements Closeable , SessionListenableProvider {
3334 private static final Logger logger = LoggerFactory .getLogger (LeaderElection .class );
3435 private static final long MAX_LEASE = 1L ;
3536
36- private final CoordinationClient client ;
3737 private final LeaderElectionListener leaderElectionListener ;
3838 private final String coordinationNodePath ;
3939 private final String electionName ;
4040 private final byte [] data ;
41+
4142 private final ExecutorService electionExecutor ;
43+ private final CoordinationSession coordinationSession ;
44+ private final ListenableContainer <CoordinationSession .State > sessionListenable ;
4245 private final LockInternals lock ;
4346 private final SemaphoreObserver semaphoreObserver ;
4447
4548 private AtomicReference <State > state = new AtomicReference <>(State .CREATED );
49+ private Future <Status > sessionConnectionTask = null ;
50+ private Future <Void > electionTask = null ;
4651 private volatile boolean autoRequeue = false ;
4752 private volatile boolean isLeader = false ;
48- private Future <Void > electionTask = null ;
4953
5054 private enum State {
5155 CREATED ,
@@ -55,62 +59,77 @@ private enum State {
5559
5660 public LeaderElection (
5761 CoordinationClient client ,
58- LeaderElectionListener leaderElectionListener ,
5962 String coordinationNodePath ,
6063 String electionName ,
61- byte [] data
64+ byte [] data ,
65+ LeaderElectionListener leaderElectionListener
6266 ) {
6367 this (
6468 client ,
65- leaderElectionListener ,
6669 coordinationNodePath ,
6770 electionName ,
6871 data ,
69- Executors .newSingleThreadExecutor ()
72+ leaderElectionListener ,
73+ LeaderElectionSettings .newBuilder ()
74+ .build ()
7075 );
7176 }
7277
7378 public LeaderElection (
7479 CoordinationClient client ,
75- LeaderElectionListener leaderElectionListener ,
7680 String coordinationNodePath ,
7781 String electionName ,
7882 byte [] data ,
79- ExecutorService executorService
83+ LeaderElectionListener leaderElectionListener ,
84+ LeaderElectionSettings settings
8085 ) {
81- this .client = client ;
82- this .leaderElectionListener = leaderElectionListener ;
8386 this .coordinationNodePath = coordinationNodePath ;
8487 this .electionName = electionName ;
8588 this .data = data ;
86- this .electionExecutor = executorService ;
89+ this .leaderElectionListener = leaderElectionListener ;
90+ this .electionExecutor = settings .getExecutorService ();
91+
92+ this .coordinationSession = client .createSession (coordinationNodePath );
93+ this .sessionListenable = new ListenableContainer <>();
94+ coordinationSession .addStateListener (sessionListenable ::notifyListeners );
8795 this .lock = new LockInternals (
88- MAX_LEASE ,
89- client ,
90- coordinationNodePath ,
91- electionName
96+ coordinationSession ,
97+ electionName ,
98+ MAX_LEASE
9299 );
93100 this .semaphoreObserver = new SemaphoreObserver (
94- lock . getCoordinationSession () ,
101+ coordinationSession ,
95102 electionName ,
96103 WatchSemaphoreMode .WATCH_OWNERS ,
97104 DescribeSemaphoreMode .WITH_OWNERS_AND_WAITERS ,
98- new RetryForever ( 100 ) // TODO: передавать снаружи
105+ settings . getRetryPolicy ()
99106 );
100107 }
101108
109+ private CoordinationSession connectedSession () {
110+ if (sessionConnectionTask == null ) {
111+ throw new IllegalStateException ("Not started yet" );
112+ }
113+ try {
114+ sessionConnectionTask .get ().expectSuccess ("Unable to connect to session" );
115+ } catch (InterruptedException | ExecutionException e ) {
116+ throw new RuntimeException (e );
117+ }
118+ return coordinationSession ;
119+ }
120+
102121 public void start () {
103122 Preconditions .checkState (state .compareAndSet (State .CREATED , State .STARTED ), "Already started or closed" );
104- // TODO: create session ?
105- CoordinationSession coordinationSession = lock . getCoordinationSession ();
106- // TODO: retry on create? Non idempotent - will not be retried automatically
107- lock . start ();
108- coordinationSession . createSemaphore ( electionName , MAX_LEASE ). thenAccept ( status -> {
109- if ( status . isSuccess () || status . getCode () == StatusCode . ALREADY_EXISTS ) {
110- semaphoreObserver . start ();
111- }
112- status . expectSuccess ( "Unable to create semaphore" ) ;
113- // TODO: set status == error
123+ // TODO: handle errors retries and logging ?
124+ this . sessionConnectionTask = coordinationSession . connect (). thenCompose ( connectionStatus -> {
125+ connectionStatus . expectSuccess ( "Unable to establish session" );
126+ return coordinationSession . createSemaphore ( electionName , MAX_LEASE ). thenApply ( semaphoreStatus -> {
127+ if ( semaphoreStatus . isSuccess () || semaphoreStatus . getCode () == StatusCode . ALREADY_EXISTS ) {
128+ semaphoreObserver . start ();
129+ }
130+ semaphoreStatus . expectSuccess ( "Unable to create semaphore" );
131+ return semaphoreStatus ;
132+ });
114133 });
115134
116135 if (autoRequeue ) {
@@ -173,7 +192,6 @@ private void doWork() throws Exception {
173192 isLeader = false ;
174193
175194 try {
176- lock .getConnectedCoordinationSession (); // asserts that session is connected or throws exception
177195 lock .tryAcquire (
178196 null ,
179197 true ,
@@ -221,6 +239,7 @@ private boolean isQueued() {
221239
222240 /**
223241 * Не гарантированы все, кроме лидера
242+ *
224243 * @return
225244 */
226245 public List <ElectionParticipant > getParticipants () {
@@ -257,7 +276,7 @@ private static ElectionParticipant mapParticipant(SemaphoreDescription.Session s
257276
258277 @ Override
259278 public Listenable <CoordinationSession .State > getSessionListenable () {
260- return lock . getSessionListenable () ;
279+ return sessionListenable ;
261280 }
262281
263282 @ Override
0 commit comments