55import java .util .List ;
66import java .util .Optional ;
77import java .util .concurrent .Callable ;
8- import java .util .concurrent .ExecutionException ;
8+ import java .util .concurrent .CompletableFuture ;
9+ import java .util .concurrent .CountDownLatch ;
910import java .util .concurrent .ExecutorService ;
11+ import java .util .concurrent .Executors ;
1012import java .util .concurrent .Future ;
13+ import java .util .concurrent .ScheduledExecutorService ;
1114import java .util .concurrent .atomic .AtomicReference ;
15+ import java .util .function .Supplier ;
1216import java .util .stream .Collectors ;
1317import java .util .stream .Stream ;
1418
1519import com .google .common .base .Preconditions ;
1620import org .slf4j .Logger ;
1721import org .slf4j .LoggerFactory ;
1822
23+ import tech .ydb .common .retry .RetryPolicy ;
1924import tech .ydb .coordination .CoordinationClient ;
2025import tech .ydb .coordination .CoordinationSession ;
2126import tech .ydb .coordination .description .SemaphoreDescription ;
2227import tech .ydb .coordination .recipes .locks .LockInternals ;
2328import tech .ydb .coordination .recipes .util .Listenable ;
2429import tech .ydb .coordination .recipes .util .ListenableContainer ;
30+ import tech .ydb .coordination .recipes .util .RetryableTask ;
2531import tech .ydb .coordination .recipes .util .SessionListenableProvider ;
2632import tech .ydb .coordination .recipes .util .SemaphoreObserver ;
2733import tech .ydb .coordination .settings .DescribeSemaphoreMode ;
2834import tech .ydb .coordination .settings .WatchSemaphoreMode ;
2935import tech .ydb .core .Status ;
3036import tech .ydb .core .StatusCode ;
3137
32- // TODO: backoff политика + документцаия / логгирование / рекомендации по коду
38+ // TODO: документцаия / логгирование / рекомендации по коду
3339public class LeaderElection implements Closeable , SessionListenableProvider {
3440 private static final Logger logger = LoggerFactory .getLogger (LeaderElection .class );
3541 private static final long MAX_LEASE = 1L ;
@@ -38,22 +44,27 @@ public class LeaderElection implements Closeable, SessionListenableProvider {
3844 private final String coordinationNodePath ;
3945 private final String electionName ;
4046 private final byte [] data ;
47+ private final RetryPolicy retryPolicy ;
4148
42- private final ExecutorService electionExecutor ;
49+ private final ScheduledExecutorService scheduledExecutor ;
50+ private final ExecutorService blockingExecutor ;
4351 private final CoordinationSession coordinationSession ;
4452 private final ListenableContainer <CoordinationSession .State > sessionListenable ;
4553 private final LockInternals lock ;
4654 private final SemaphoreObserver semaphoreObserver ;
4755
48- private AtomicReference <State > state = new AtomicReference <>(State .CREATED );
49- private Future <Status > sessionConnectionTask = null ;
56+ private final CountDownLatch startingLatch = new CountDownLatch (1 );
57+ private AtomicReference <State > state = new AtomicReference <>(State .INITIAL );
58+ private AtomicReference <Future <Status >> initializingTask = new AtomicReference <>(null );
5059 private Future <Void > electionTask = null ;
5160 private volatile boolean autoRequeue = false ;
5261 private volatile boolean isLeader = false ;
5362
5463 private enum State {
55- CREATED ,
64+ INITIAL ,
65+ STARTING ,
5666 STARTED ,
67+ FAILED ,
5768 CLOSED
5869 }
5970
@@ -87,11 +98,20 @@ public LeaderElection(
8798 this .electionName = electionName ;
8899 this .data = data ;
89100 this .leaderElectionListener = leaderElectionListener ;
90- this .electionExecutor = settings .getExecutorService ();
101+ this .scheduledExecutor = settings .getScheduledExecutor ();
102+ this .blockingExecutor = Executors .newSingleThreadExecutor (); // TODO: thread factory
103+ this .retryPolicy = settings .getRetryPolicy ();
91104
92105 this .coordinationSession = client .createSession (coordinationNodePath );
93106 this .sessionListenable = new ListenableContainer <>();
94- coordinationSession .addStateListener (sessionListenable ::notifyListeners );
107+ coordinationSession .addStateListener (sessionState -> {
108+ if (sessionState == CoordinationSession .State .LOST || sessionState == CoordinationSession .State .CLOSED ) {
109+ logger .error ("Coordination session unexpectedly changed to {} state, marking election as FAILED" ,
110+ sessionState );
111+ state .set (State .FAILED );
112+ }
113+ sessionListenable .notifyListeners (sessionState );
114+ });
95115 this .lock = new LockInternals (
96116 coordinationSession ,
97117 electionName ,
@@ -102,41 +122,60 @@ public LeaderElection(
102122 electionName ,
103123 WatchSemaphoreMode .WATCH_OWNERS ,
104124 DescribeSemaphoreMode .WITH_OWNERS_AND_WAITERS ,
105- settings .getRetryPolicy ()
125+ settings .getRetryPolicy (),
126+ settings .getScheduledExecutor ()
106127 );
107128 }
108129
109- private CoordinationSession connectedSession () {
110- if (sessionConnectionTask == null ) {
111- throw new IllegalStateException ("Not started yet" );
112- }
113- try {
114- sessionConnectionTask .get ().expectSuccess ("Unable to connect to session" );
115- } catch (InterruptedException | ExecutionException e ) {
116- throw new RuntimeException (e );
117- }
118- return coordinationSession ;
119- }
120-
121130 public void start () {
122- Preconditions .checkState (state .compareAndSet (State .CREATED , State .STARTED ), "Already started or closed" );
123- // TODO: handle errors retries and logging?
124- this .sessionConnectionTask = coordinationSession .connect ().thenCompose (connectionStatus -> {
125- connectionStatus .expectSuccess ("Unable to establish session" );
126- return coordinationSession .createSemaphore (electionName , MAX_LEASE ).thenApply (semaphoreStatus -> {
127- if (semaphoreStatus .isSuccess () || semaphoreStatus .getCode () == StatusCode .ALREADY_EXISTS ) {
128- semaphoreObserver .start ();
129- }
130- semaphoreStatus .expectSuccess ("Unable to create semaphore" );
131- return semaphoreStatus ;
132- });
133- });
131+ Preconditions .checkState (
132+ state .compareAndSet (State .INITIAL , State .STARTING ),
133+ "Leader election may be started only once"
134+ );
135+
136+ CompletableFuture <Status > connectionTask = executeWithRetry (coordinationSession ::connect );
137+ CompletableFuture <Status > semaphoreCreateTask = executeWithRetry (
138+ () -> coordinationSession .createSemaphore (electionName , MAX_LEASE )
139+ .thenCompose (status -> {
140+ if (status .getCode () == StatusCode .ALREADY_EXISTS ) {
141+ return CompletableFuture .completedFuture (Status .SUCCESS );
142+ }
143+ return CompletableFuture .completedFuture (status );
144+ })
145+ );
146+
147+ CompletableFuture <Status > initializingRetriedTask = connectionTask
148+ .thenCompose (connectionStatus -> {
149+ connectionStatus .expectSuccess ("Unable to establish session" );
150+ return semaphoreCreateTask ;
151+ })
152+ .thenApply (semaphoreStatus -> {
153+ if (semaphoreStatus .isSuccess ()) {
154+ state .set (State .STARTED );
155+ semaphoreObserver .start ();
156+ startingLatch .countDown ();
157+ }
158+ semaphoreStatus .expectSuccess ("Unable to create semaphore" );
159+ return semaphoreStatus ;
160+ }).exceptionally (ex -> {
161+ logger .error ("Leader election initializing task failed" , ex );
162+ state .set (State .FAILED );
163+ semaphoreObserver .close ();
164+ startingLatch .countDown ();
165+ return Status .of (StatusCode .CLIENT_INTERNAL_ERROR );
166+ });
167+
168+ initializingTask .set (initializingRetriedTask );
134169
135170 if (autoRequeue ) {
136171 enqueueElection ();
137172 }
138173 }
139174
175+ private CompletableFuture <Status > executeWithRetry (Supplier <CompletableFuture <Status >> taskSupplier ) {
176+ return new RetryableTask ("leaderElectionInitialize" , taskSupplier , scheduledExecutor , retryPolicy ).execute ();
177+ }
178+
140179 public void autoRequeue () {
141180 autoRequeue = true ;
142181 }
@@ -153,9 +192,12 @@ public boolean isLeader() {
153192 * @return true if re-enqueue was successful
154193 */
155194 public boolean requeue () {
156- Preconditions .checkState (state .get () == State .STARTED , "Already closed or not yet started" );
195+ State localState = state .get ();
196+ Preconditions .checkState (
197+ localState == State .STARTED || localState == State .STARTING ,
198+ "Unexpected state: " + localState .name ()
199+ );
157200
158- // TODO: корректно обрабатывать если старт еще не кончился
159201 return enqueueElection ();
160202 }
161203
@@ -170,8 +212,9 @@ public synchronized boolean interruptLeadership() {
170212 }
171213
172214 private synchronized boolean enqueueElection () {
173- if (!isQueued () && state .get () == State .STARTED ) {
174- electionTask = electionExecutor .submit (new Callable <Void >() {
215+ State localState = state .get ();
216+ if (!isQueued () && (localState == State .STARTED || localState == State .STARTING )) {
217+ electionTask = blockingExecutor .submit (new Callable <Void >() {
175218 @ Override
176219 public Void call () throws Exception {
177220 try {
@@ -192,6 +235,7 @@ private void doWork() throws Exception {
192235 isLeader = false ;
193236
194237 try {
238+ waitStartedState ();
195239 lock .tryAcquire (
196240 null ,
197241 true ,
@@ -226,9 +270,22 @@ private void doWork() throws Exception {
226270 }
227271 }
228272
273+ private void waitStartedState () throws InterruptedException {
274+ State localState = state .get ();
275+ if (localState == State .STARTING ) {
276+ startingLatch .await ();
277+ localState = state .get ();
278+ }
279+
280+ if (localState == State .INITIAL || localState == State .CLOSED || localState == State .FAILED ) {
281+ throw new IllegalStateException ("Unexpected state: " + localState .name ());
282+ }
283+ }
284+
229285 private synchronized void finishTask () {
230286 electionTask = null ;
231- if (autoRequeue ) {
287+ State localState = state .get ();
288+ if (autoRequeue && localState != State .CLOSED && localState != State .FAILED ) {
232289 enqueueElection ();
233290 }
234291 }
@@ -281,6 +338,7 @@ public Listenable<CoordinationSession.State> getSessionListenable() {
281338
282339 @ Override
283340 public synchronized void close () {
341+ // TODO: Учесть все стейты
284342 Preconditions .checkState (state .compareAndSet (State .STARTED , State .CLOSED ), "Already closed" );
285343
286344 Future <Void > localTask = electionTask ;
@@ -289,7 +347,7 @@ public synchronized void close() {
289347 electionTask = null ;
290348 }
291349
292- electionExecutor .shutdown ();
350+ blockingExecutor .shutdown ();
293351 semaphoreObserver .close ();
294352 }
295353}
0 commit comments