Skip to content

Commit bf7f722

Browse files
committed
init
1 parent c650f95 commit bf7f722

22 files changed

+1482
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package tech.ydb.coordination.recipes.election;
2+
3+
public interface LeaderElectionListener {
4+
void takeLeadership() throws Exception;
5+
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package tech.ydb.coordination.recipes.election;
2+
3+
import com.google.common.base.Preconditions;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import tech.ydb.coordination.CoordinationClient;
7+
import tech.ydb.coordination.CoordinationSession;
8+
import tech.ydb.coordination.recipes.locks.LockInternals;
9+
import tech.ydb.coordination.recipes.util.Listenable;
10+
import tech.ydb.coordination.recipes.util.ListenableProvider;
11+
import tech.ydb.coordination.recipes.watch.Participant;
12+
import tech.ydb.coordination.recipes.watch.SemaphoreWatchListener;
13+
14+
import java.io.Closeable;
15+
import java.util.List;
16+
import java.util.Optional;
17+
import java.util.concurrent.Callable;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
public class LeaderElector implements Closeable, ListenableProvider<CoordinationSession.State> {
24+
private static final Logger logger = LoggerFactory.getLogger(LeaderElector.class);
25+
26+
private final CoordinationClient client;
27+
private final LeaderElectionListener leaderElectionListener;
28+
private final String coordinationNodePath;
29+
private final String semaphoreName;
30+
private final ExecutorService electionExecutor;
31+
private final LockInternals lock;
32+
private final SemaphoreWatchListener semaphoreWatchAdapter;
33+
34+
private AtomicReference<State> state = new AtomicReference<>(State.STARTED);
35+
private volatile boolean autoRequeue = false;
36+
private volatile boolean isLeader = false;
37+
private Future<Void> electionTask = null;
38+
39+
40+
private enum State { // TODO: needs third state (CREATED)?
41+
STARTED,
42+
CLOSED
43+
}
44+
45+
public LeaderElector(
46+
CoordinationClient client,
47+
LeaderElectionListener leaderElectionListener,
48+
String coordinationNodePath,
49+
String semaphoreName
50+
) {
51+
this(client, leaderElectionListener, coordinationNodePath, semaphoreName, Executors.newSingleThreadExecutor());
52+
}
53+
54+
public LeaderElector(
55+
CoordinationClient client,
56+
LeaderElectionListener leaderElectionListener,
57+
String coordinationNodePath,
58+
String semaphoreName,
59+
ExecutorService executorService
60+
) {
61+
this.client = client;
62+
this.leaderElectionListener = leaderElectionListener;
63+
this.coordinationNodePath = coordinationNodePath;
64+
this.semaphoreName = semaphoreName;
65+
this.electionExecutor = executorService;
66+
this.lock = new LockInternals(
67+
client,
68+
coordinationNodePath,
69+
semaphoreName
70+
);
71+
this.lock.start();
72+
this.semaphoreWatchAdapter = new SemaphoreWatchListener(lock.getCoordinationSession(), semaphoreName);
73+
semaphoreWatchAdapter.start();
74+
}
75+
76+
public boolean isLeader() {
77+
return isLeader;
78+
}
79+
80+
public synchronized void interruptLeadership() {
81+
Future<?> task = electionTask;
82+
if (task != null) {
83+
task.cancel(true);
84+
}
85+
}
86+
87+
/**
88+
* Re-queue an attempt for leadership. If this instance is already queued, nothing
89+
* happens and false is returned. If the instance was not queued, it is re-queued and true
90+
* is returned
91+
*
92+
* @return true if re-enqueue was successful
93+
*/
94+
public boolean requeue() {
95+
Preconditions.checkState(state.get() == State.STARTED, "Already closed or not yet started");
96+
97+
return enqueueElection();
98+
}
99+
100+
public void autoRequeue() {
101+
autoRequeue = true;
102+
}
103+
104+
private synchronized boolean enqueueElection() {
105+
if (!isQueued() && state.get() == State.STARTED) {
106+
electionTask = electionExecutor.submit(new Callable<Void>() {
107+
@Override
108+
public Void call() throws Exception {
109+
try {
110+
doWork();
111+
} finally {
112+
finishTask();
113+
}
114+
return null;
115+
}
116+
});
117+
return true;
118+
}
119+
120+
return false;
121+
}
122+
123+
private void doWork() throws Exception {
124+
isLeader = false;
125+
126+
try {
127+
lock.tryAcquire(
128+
null,
129+
true,
130+
null
131+
);
132+
isLeader = true;
133+
try {
134+
leaderElectionListener.takeLeadership();
135+
} catch (InterruptedException e) {
136+
Thread.currentThread().interrupt();
137+
throw e;
138+
} catch (Throwable e) {
139+
logger.debug("takeLeadership exception", e);
140+
}
141+
} catch (InterruptedException e) {
142+
Thread.currentThread().interrupt();
143+
throw e;
144+
} finally {
145+
if (isLeader) {
146+
isLeader = false;
147+
boolean wasInterrupted = Thread.interrupted();
148+
try {
149+
lock.release();
150+
} catch (Exception e) {
151+
logger.error("Lock release exception for: " + coordinationNodePath);
152+
} finally {
153+
if (wasInterrupted) {
154+
Thread.currentThread().interrupt();
155+
}
156+
}
157+
}
158+
}
159+
}
160+
161+
private synchronized void finishTask() {
162+
electionTask = null;
163+
if (autoRequeue) { // TODO: requeue if critical exception?
164+
enqueueElection();
165+
}
166+
}
167+
168+
private boolean isQueued() {
169+
return electionTask != null;
170+
}
171+
172+
public List<Participant> getParticipants() {
173+
return semaphoreWatchAdapter.getParticipants();
174+
}
175+
176+
public Optional<Participant> getLeader() {
177+
return semaphoreWatchAdapter.getOwners().stream().findFirst();
178+
}
179+
180+
@Override
181+
public synchronized void close() {
182+
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed");
183+
184+
Future<Void> task = electionTask;
185+
if (task != null) {
186+
task.cancel(true);
187+
}
188+
189+
electionTask = null;
190+
electionExecutor.shutdown();
191+
semaphoreWatchAdapter.close();
192+
getListenable().clearListeners();
193+
}
194+
195+
@Override
196+
public Listenable<CoordinationSession.State> getListenable() {
197+
return lock.getListenable();
198+
}
199+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package tech.ydb.coordination.recipes.locks;
2+
3+
import tech.ydb.coordination.CoordinationSession;
4+
import tech.ydb.coordination.recipes.util.Listenable;
5+
6+
import java.time.Duration;
7+
8+
public interface InterProcessLock extends Listenable<CoordinationSession.State> {
9+
void acquire() throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException;
10+
11+
/**
12+
* @return true - if successfully acquired lock, false - if lock waiting time expired
13+
*/
14+
boolean acquire(Duration waitDuration) throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException;
15+
16+
/**
17+
* @return false if nothing to release
18+
*/
19+
boolean release() throws Exception;
20+
21+
/**
22+
* @return true if the lock is acquired by a thread in this JVM
23+
*/
24+
boolean isAcquiredInThisProcess();
25+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package tech.ydb.coordination.recipes.locks;
2+
3+
import tech.ydb.coordination.CoordinationClient;
4+
import tech.ydb.coordination.CoordinationSession;
5+
import tech.ydb.coordination.recipes.util.Listenable;
6+
import tech.ydb.coordination.recipes.util.ListenableProvider;
7+
8+
import javax.annotation.concurrent.ThreadSafe;
9+
import java.io.Closeable;
10+
import java.time.Duration;
11+
import java.time.Instant;
12+
13+
@ThreadSafe
14+
public class InterProcessMutex implements InterProcessLock, ListenableProvider<CoordinationSession.State>, Closeable {
15+
private final LockInternals lockInternals;
16+
17+
public InterProcessMutex(
18+
CoordinationClient client,
19+
String coordinationNodePath,
20+
String lockName
21+
) {
22+
lockInternals = new LockInternals(client, coordinationNodePath, lockName);
23+
lockInternals.start();
24+
}
25+
26+
@Override
27+
public void acquire() throws Exception {
28+
lockInternals.tryAcquire(
29+
null,
30+
true,
31+
null
32+
);
33+
}
34+
35+
@Override
36+
public boolean acquire(Duration waitDuration) throws Exception {
37+
Instant deadline = Instant.now().plus(waitDuration);
38+
return lockInternals.tryAcquire(
39+
deadline,
40+
true,
41+
null
42+
) != null;
43+
}
44+
45+
@Override
46+
public boolean release() throws Exception {
47+
return lockInternals.release();
48+
}
49+
50+
@Override
51+
public boolean isAcquiredInThisProcess() {
52+
return lockInternals.isAcquired();
53+
}
54+
55+
@Override
56+
public Listenable<CoordinationSession.State> getListenable() {
57+
return null;
58+
}
59+
60+
@Override
61+
public void close() {
62+
lockInternals.close();
63+
}
64+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package tech.ydb.coordination.recipes.locks;
2+
3+
public class LockAcquireFailedException extends RuntimeException {
4+
private final String coordinationNodePath;
5+
private final String semaphoreName;
6+
7+
public LockAcquireFailedException(String message, String coordinationNodePath, String semaphoreName) {
8+
super("Failed to acquire semaphore=" + semaphoreName +
9+
", on coordination node=" + coordinationNodePath +
10+
": '" + message + "'");
11+
this.coordinationNodePath = coordinationNodePath;
12+
this.semaphoreName = semaphoreName;
13+
}
14+
15+
public LockAcquireFailedException(String coordinationNodePath, String semaphoreName) {
16+
super("Failed to acquire semaphore=" + semaphoreName + ", on coordination node=" + coordinationNodePath);
17+
this.coordinationNodePath = coordinationNodePath;
18+
this.semaphoreName = semaphoreName;
19+
}
20+
21+
public String getCoordinationNodePath() {
22+
return coordinationNodePath;
23+
}
24+
25+
public String getSemaphoreName() {
26+
return semaphoreName;
27+
}
28+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package tech.ydb.coordination.recipes.locks;
2+
3+
public class LockAlreadyAcquiredException extends LockAcquireFailedException {
4+
public LockAlreadyAcquiredException(String coordinationNodePath, String semaphoreName) {
5+
super(
6+
"Lock=" + semaphoreName + " on path=" + coordinationNodePath + " is already acquired",
7+
coordinationNodePath,
8+
semaphoreName
9+
);
10+
}
11+
}

0 commit comments

Comments
 (0)