Skip to content

Commit 53bacfa

Browse files
committed
feat: added group membership
1 parent 9278dc5 commit 53bacfa

36 files changed

+2726
-1030
lines changed

coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private CompletableFuture<Result<Long>> connectToSession(Stream stream, long ses
162162
}
163163
}, executor);
164164

165-
// and send session start message with id of previos session (or zero if it's first connect)
165+
// and send session start message with id of previous session (or zero if it's first connect)
166166
return stream.sendSessionStart(sessionID, nodePath, connectTimeout, protectionKey);
167167
}
168168

coordination/src/main/java/tech/ydb/coordination/recipes/watch/Participant.java renamed to coordination/src/main/java/tech/ydb/coordination/recipes/election/ElectionParticipant.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,27 @@
1-
package tech.ydb.coordination.recipes.watch;
1+
package tech.ydb.coordination.recipes.election;
22

33
import java.util.Arrays;
44
import java.util.Objects;
55

6-
public class Participant {
7-
private final long id;
6+
public class ElectionParticipant {
7+
private final long sessionId;
88
private final byte[] data;
9-
private final long count;
109
private final boolean isLeader;
1110

12-
public Participant(long id, byte[] data, long count, boolean isLeader) {
13-
this.id = id;
11+
public ElectionParticipant(long id, byte[] data, boolean isLeader) {
12+
this.sessionId = id;
1413
this.data = data;
15-
this.count = count;
1614
this.isLeader = isLeader;
1715
}
1816

19-
public long getId() {
20-
return id;
17+
public long getSessionId() {
18+
return sessionId;
2119
}
2220

2321
public byte[] getData() {
2422
return data;
2523
}
2624

27-
public long getCount() {
28-
return count;
29-
}
30-
3125
public boolean isLeader() {
3226
return isLeader;
3327
}
@@ -40,21 +34,21 @@ public boolean equals(Object o) {
4034
if (o == null || getClass() != o.getClass()) {
4135
return false;
4236
}
43-
Participant that = (Participant) o;
44-
return id == that.id && count == that.count && isLeader == that.isLeader && Objects.deepEquals(data, that.data);
37+
ElectionParticipant that = (ElectionParticipant) o;
38+
return sessionId == that.sessionId && isLeader == that.isLeader &&
39+
Objects.deepEquals(data, that.data);
4540
}
4641

4742
@Override
4843
public int hashCode() {
49-
return Objects.hash(id, Arrays.hashCode(data), count, isLeader);
44+
return Objects.hash(sessionId, Arrays.hashCode(data), isLeader);
5045
}
5146

5247
@Override
5348
public String toString() {
54-
return "Participant{" +
55-
"id=" + id +
49+
return "ElectionParticipant{" +
50+
"sessionId=" + sessionId +
5651
", data=" + Arrays.toString(data) +
57-
", count=" + count +
5852
", isLeader=" + isLeader +
5953
'}';
6054
}
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
package tech.ydb.coordination.recipes.election;
2+
3+
import java.io.Closeable;
4+
import java.util.Collections;
5+
import java.util.List;
6+
import java.util.Optional;
7+
import java.util.concurrent.Callable;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.Future;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
import java.util.stream.Collectors;
13+
import java.util.stream.Stream;
14+
15+
import com.google.common.base.Preconditions;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import tech.ydb.common.retry.RetryForever;
20+
import tech.ydb.coordination.CoordinationClient;
21+
import tech.ydb.coordination.CoordinationSession;
22+
import tech.ydb.coordination.description.SemaphoreDescription;
23+
import tech.ydb.coordination.recipes.locks.LockInternals;
24+
import tech.ydb.coordination.recipes.util.Listenable;
25+
import tech.ydb.coordination.recipes.util.SessionListenableProvider;
26+
import tech.ydb.coordination.recipes.util.SemaphoreObserver;
27+
import tech.ydb.coordination.settings.DescribeSemaphoreMode;
28+
import tech.ydb.coordination.settings.WatchSemaphoreMode;
29+
import tech.ydb.core.StatusCode;
30+
31+
// TODO: настройки + переименовать переменные + рекомендации по коду + логгирование + backoff политика
32+
public class LeaderElection implements Closeable, SessionListenableProvider {
33+
private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);
34+
private static final long MAX_LEASE = 1L;
35+
36+
private final CoordinationClient client;
37+
private final LeaderElectionListener leaderElectionListener;
38+
private final String coordinationNodePath;
39+
private final String electionName;
40+
private final byte[] data;
41+
private final ExecutorService electionExecutor;
42+
private final LockInternals lock;
43+
private final SemaphoreObserver semaphoreObserver;
44+
45+
private AtomicReference<State> state = new AtomicReference<>(State.CREATED);
46+
private volatile boolean autoRequeue = false;
47+
private volatile boolean isLeader = false;
48+
private Future<Void> electionTask = null;
49+
50+
private enum State {
51+
CREATED,
52+
STARTED,
53+
CLOSED
54+
}
55+
56+
public LeaderElection(
57+
CoordinationClient client,
58+
LeaderElectionListener leaderElectionListener,
59+
String coordinationNodePath,
60+
String electionName,
61+
byte[] data
62+
) {
63+
this(
64+
client,
65+
leaderElectionListener,
66+
coordinationNodePath,
67+
electionName,
68+
data,
69+
Executors.newSingleThreadExecutor()
70+
);
71+
}
72+
73+
public LeaderElection(
74+
CoordinationClient client,
75+
LeaderElectionListener leaderElectionListener,
76+
String coordinationNodePath,
77+
String electionName,
78+
byte[] data,
79+
ExecutorService executorService
80+
) {
81+
this.client = client;
82+
this.leaderElectionListener = leaderElectionListener;
83+
this.coordinationNodePath = coordinationNodePath;
84+
this.electionName = electionName;
85+
this.data = data;
86+
this.electionExecutor = executorService;
87+
this.lock = new LockInternals(
88+
MAX_LEASE,
89+
client,
90+
coordinationNodePath,
91+
electionName
92+
);
93+
this.semaphoreObserver = new SemaphoreObserver(
94+
lock.getCoordinationSession(),
95+
electionName,
96+
WatchSemaphoreMode.WATCH_OWNERS,
97+
DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS,
98+
new RetryForever(100) // TODO: передавать снаружи
99+
);
100+
}
101+
102+
public void start() {
103+
Preconditions.checkState(state.compareAndSet(State.CREATED, State.STARTED), "Already started or closed");
104+
// TODO: create session?
105+
CoordinationSession coordinationSession = lock.getCoordinationSession();
106+
// TODO: retry on create? Non idempotent - will not be retried automatically
107+
lock.start();
108+
coordinationSession.createSemaphore(electionName, MAX_LEASE).thenAccept(status -> {
109+
if (status.isSuccess() || status.getCode() == StatusCode.ALREADY_EXISTS) {
110+
semaphoreObserver.start();
111+
}
112+
status.expectSuccess("Unable to create semaphore");
113+
// TODO: set status == error
114+
});
115+
116+
if (autoRequeue) {
117+
enqueueElection();
118+
}
119+
}
120+
121+
public void autoRequeue() {
122+
autoRequeue = true;
123+
}
124+
125+
public boolean isLeader() {
126+
return isLeader;
127+
}
128+
129+
/**
130+
* Re-queue an attempt for leadership. If this instance is already queued, nothing
131+
* happens and false is returned. If the instance was not queued, it is re-queued and true
132+
* is returned
133+
*
134+
* @return true if re-enqueue was successful
135+
*/
136+
public boolean requeue() {
137+
Preconditions.checkState(state.get() == State.STARTED, "Already closed or not yet started");
138+
139+
// TODO: корректно обрабатывать если старт еще не кончился
140+
return enqueueElection();
141+
}
142+
143+
public synchronized boolean interruptLeadership() {
144+
Future<?> localTask = electionTask;
145+
if (localTask != null) {
146+
localTask.cancel(true);
147+
electionTask = null;
148+
return true;
149+
}
150+
return false;
151+
}
152+
153+
private synchronized boolean enqueueElection() {
154+
if (!isQueued() && state.get() == State.STARTED) {
155+
electionTask = electionExecutor.submit(new Callable<Void>() {
156+
@Override
157+
public Void call() throws Exception {
158+
try {
159+
doWork();
160+
} finally {
161+
finishTask();
162+
}
163+
return null;
164+
}
165+
});
166+
return true;
167+
}
168+
169+
return false;
170+
}
171+
172+
private void doWork() throws Exception {
173+
isLeader = false;
174+
175+
try {
176+
lock.getConnectedCoordinationSession(); // asserts that session is connected or throws exception
177+
lock.tryAcquire(
178+
null,
179+
true,
180+
data
181+
);
182+
isLeader = true;
183+
try {
184+
leaderElectionListener.takeLeadership();
185+
} catch (InterruptedException e) {
186+
Thread.currentThread().interrupt();
187+
throw e;
188+
} catch (Throwable e) {
189+
logger.debug("takeLeadership exception", e);
190+
}
191+
} catch (InterruptedException e) {
192+
Thread.currentThread().interrupt();
193+
throw e;
194+
} finally {
195+
if (isLeader) {
196+
isLeader = false;
197+
boolean wasInterrupted = Thread.interrupted();
198+
try {
199+
lock.release();
200+
} catch (Exception e) {
201+
logger.error("Lock release exception for: " + coordinationNodePath);
202+
} finally {
203+
if (wasInterrupted) {
204+
Thread.currentThread().interrupt();
205+
}
206+
}
207+
}
208+
}
209+
}
210+
211+
private synchronized void finishTask() {
212+
electionTask = null;
213+
if (autoRequeue) {
214+
enqueueElection();
215+
}
216+
}
217+
218+
private boolean isQueued() {
219+
return electionTask != null;
220+
}
221+
222+
/**
223+
* Не гарантированы все, кроме лидера
224+
* @return
225+
*/
226+
public List<ElectionParticipant> getParticipants() {
227+
SemaphoreDescription semaphoreDescription = semaphoreObserver.getCachedData();
228+
if (semaphoreDescription == null) {
229+
return Collections.emptyList();
230+
}
231+
232+
return Stream.concat(
233+
semaphoreDescription.getOwnersList().stream()
234+
.map(session -> mapParticipant(session, true)),
235+
semaphoreDescription.getWaitersList().stream()
236+
.map(session -> mapParticipant(session, false))
237+
).collect(Collectors.toList());
238+
}
239+
240+
public Optional<ElectionParticipant> getCurrentLeader() {
241+
SemaphoreDescription semaphoreDescription = semaphoreObserver.getCachedData();
242+
if (semaphoreDescription == null) {
243+
return Optional.empty();
244+
}
245+
246+
return semaphoreDescription.getOwnersList().stream().findFirst()
247+
.map(session -> mapParticipant(session, true));
248+
}
249+
250+
private static ElectionParticipant mapParticipant(SemaphoreDescription.Session session, boolean owner) {
251+
return new ElectionParticipant(
252+
session.getId(),
253+
session.getData(),
254+
owner
255+
);
256+
}
257+
258+
@Override
259+
public Listenable<CoordinationSession.State> getSessionListenable() {
260+
return lock.getSessionListenable();
261+
}
262+
263+
@Override
264+
public synchronized void close() {
265+
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed");
266+
267+
Future<Void> localTask = electionTask;
268+
if (localTask != null) {
269+
localTask.cancel(true);
270+
electionTask = null;
271+
}
272+
273+
electionExecutor.shutdown();
274+
semaphoreObserver.close();
275+
}
276+
}

0 commit comments

Comments
 (0)