Skip to content

Commit c68d881

Browse files
committed
upd
1 parent c13bbca commit c68d881

File tree

11 files changed

+256
-16
lines changed

11 files changed

+256
-16
lines changed

coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElection.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@
1111
import java.util.concurrent.Executors;
1212
import java.util.concurrent.Future;
1313
import java.util.concurrent.ScheduledExecutorService;
14-
import java.util.concurrent.ThreadFactory;
1514
import java.util.concurrent.atomic.AtomicReference;
1615
import java.util.function.Supplier;
1716
import java.util.stream.Collectors;
1817
import java.util.stream.Stream;
1918

2019
import com.google.common.base.Preconditions;
21-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2220
import org.slf4j.Logger;
2321
import org.slf4j.LoggerFactory;
2422

@@ -30,8 +28,8 @@
3028
import tech.ydb.coordination.recipes.util.Listenable;
3129
import tech.ydb.coordination.recipes.util.ListenableContainer;
3230
import tech.ydb.coordination.recipes.util.RetryableTask;
33-
import tech.ydb.coordination.recipes.util.SessionListenableProvider;
3431
import tech.ydb.coordination.recipes.util.SemaphoreObserver;
32+
import tech.ydb.coordination.recipes.util.SessionListenableProvider;
3533
import tech.ydb.coordination.settings.DescribeSemaphoreMode;
3634
import tech.ydb.coordination.settings.WatchSemaphoreMode;
3735
import tech.ydb.core.Status;
@@ -54,10 +52,6 @@
5452
*/
5553
public class LeaderElection implements Closeable, SessionListenableProvider {
5654
private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);
57-
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder()
58-
.setNameFormat("ydb-leader-election-%d")
59-
.setDaemon(true)
60-
.build();
6155
private static final long MAX_LEASE = 1L;
6256

6357
private final LeaderElectionListener leaderElectionListener;
@@ -148,7 +142,7 @@ public LeaderElection(
148142
this.data = data;
149143
this.leaderElectionListener = leaderElectionListener;
150144
this.scheduledExecutor = settings.getScheduledExecutor();
151-
this.blockingExecutor = Executors.newSingleThreadExecutor(threadFactory);
145+
this.blockingExecutor = Executors.newSingleThreadExecutor();
152146
this.retryPolicy = settings.getRetryPolicy();
153147

154148
this.coordinationSession = client.createSession(coordinationNodePath);

coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembership.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.checkerframework.checker.nullness.qual.Nullable;
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
19+
1920
import tech.ydb.common.retry.RetryPolicy;
2021
import tech.ydb.coordination.CoordinationClient;
2122
import tech.ydb.coordination.CoordinationSession;
@@ -24,6 +25,7 @@
2425
import tech.ydb.coordination.recipes.util.ListenableContainer;
2526
import tech.ydb.coordination.recipes.util.RetryableTask;
2627
import tech.ydb.coordination.recipes.util.SemaphoreObserver;
28+
import tech.ydb.coordination.recipes.util.SessionListenableProvider;
2729
import tech.ydb.coordination.settings.CoordinationSessionSettings;
2830
import tech.ydb.coordination.settings.DescribeSemaphoreMode;
2931
import tech.ydb.coordination.settings.WatchSemaphoreMode;
@@ -44,12 +46,13 @@
4446
*
4547
* <p>The implementation uses a semaphore with watch capabilities to track membership.
4648
*/
47-
public class GroupMembership implements Closeable {
49+
public class GroupMembership implements Closeable, SessionListenableProvider {
4850
private static final Logger logger = LoggerFactory.getLogger(GroupMembership.class);
4951
private static final long MAX_GROUP_SIZE = Long.MAX_VALUE;
5052
private static final Duration ACQUIRE_TIMEOUT = Duration.ofSeconds(30);
5153

5254
private final String groupName;
55+
private final byte[] data;
5356
private final RetryPolicy retryPolicy;
5457
private final ScheduledExecutorService scheduledExecutor;
5558

@@ -89,12 +92,14 @@ private enum State {
8992
public GroupMembership(
9093
CoordinationClient coordinationClient,
9194
String coordinationNodePath,
92-
String groupName
95+
String groupName,
96+
byte[] data
9397
) {
9498
this(
9599
coordinationClient,
96100
coordinationNodePath,
97101
groupName,
102+
data,
98103
GroupMembershipSettings.newBuilder()
99104
.build()
100105
);
@@ -114,11 +119,13 @@ public GroupMembership(
114119
CoordinationClient coordinationClient,
115120
String coordinationNodePath,
116121
String groupName,
122+
byte[] data,
117123
GroupMembershipSettings settings
118124
) {
119125
validateConstructorArgs(coordinationClient, coordinationNodePath, groupName, settings);
120126

121127
this.groupName = groupName;
128+
this.data = data;
122129
this.retryPolicy = settings.getRetryPolicy();
123130
this.scheduledExecutor = settings.getScheduledExecutor();
124131

@@ -271,7 +278,7 @@ private synchronized boolean tryEnqueueAcquire() {
271278
logger.debug("Enqueuing new acquire task for group '{}'", groupName);
272279
CompletableFuture<Status> acquireRetryableTask = new RetryableTask(
273280
"groupMembership-acquireSemaphoreTask-" + groupName,
274-
() -> coordinationSession.acquireSemaphore(groupName, 1, ACQUIRE_TIMEOUT)
281+
() -> coordinationSession.acquireSemaphore(groupName, 1, data, ACQUIRE_TIMEOUT)
275282
.thenApply(Result::getStatus),
276283
scheduledExecutor,
277284
retryPolicy
@@ -330,6 +337,7 @@ private static GroupMember mapSessionToGroupMember(SemaphoreDescription.Session
330337
*
331338
* @return observable for coordination session state changes
332339
*/
340+
@Override
333341
public Listenable<CoordinationSession.State> getSessionListenable() {
334342
return sessionListenable;
335343
}

coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutex.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
13+
1314
import tech.ydb.coordination.CoordinationClient;
1415
import tech.ydb.coordination.CoordinationSession;
1516
import tech.ydb.coordination.recipes.locks.exception.LockException;
@@ -96,6 +97,7 @@ public InterProcessMutex(
9697
sessionState);
9798
state.set(State.FAILED);
9899
}
100+
logger.info("New State: " + sessionState);
99101
sessionListenable.notifyListeners(sessionState);
100102
});
101103

@@ -191,6 +193,7 @@ public void close() {
191193
state.set(State.CLOSED);
192194
try {
193195
lockInternals.close();
196+
coordinationSession.close();
194197
} catch (Exception e) {
195198
logger.warn("Error while closing lock internals '{}'", lockName, e);
196199
}

coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private enum State {
149149
CLOSED
150150
}
151151

152-
public InternalLock(
152+
InternalLock(
153153
CoordinationClient client,
154154
String coordinationNodePath,
155155
String lockName,

coordination/src/main/java/tech/ydb/coordination/recipes/util/RetryableTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
10+
1011
import tech.ydb.common.retry.RetryPolicy;
1112
import tech.ydb.core.Status;
1213

coordination/src/test/java/tech/ydb/coordination/CoordinationSessionBaseMockedTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.mockito.stubbing.OngoingStubbing;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
17+
import tech.ydb.coordination.description.SemaphoreDescription;
18+
import tech.ydb.coordination.description.SemaphoreWatcher;
1719
import tech.ydb.core.Result;
1820
import tech.ydb.core.Status;
1921
import tech.ydb.core.StatusCode;
@@ -211,6 +213,14 @@ public OngoingStubbing<CompletableFuture<Result<SemaphoreLease>>> acquireEphemer
211213
return when(coordinationSession.acquireEphemeralSemaphore(anyString(), anyBoolean(), any(), any()));
212214
}
213215

216+
public OngoingStubbing<CompletableFuture<Result<SemaphoreDescription>>> describeSemaphore() {
217+
return when(coordinationSession.describeSemaphore(anyString(), any()));
218+
}
219+
220+
public OngoingStubbing<CompletableFuture<Result<SemaphoreWatcher>>> watchSemaphore() {
221+
return when(coordinationSession.watchSemaphore(anyString(), any(), any()));
222+
}
223+
214224
public void connecting() {
215225
changeState(CoordinationSession.State.CONNECTING);
216226
}

coordination/src/test/java/tech/ydb/coordination/recipes/election/LeaderElectionIntegrationTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,39 @@ public void takeLeadership() throws Exception {
8484
elector.close();
8585
}
8686

87+
@Test
88+
public void interruptLeadership_ThenStops() throws Exception {
89+
AtomicBoolean leadershipTaken = new AtomicBoolean(false);
90+
AtomicBoolean interrupted = new AtomicBoolean(false);
91+
92+
String testName = "interruptLeadership_ThenStops";
93+
LeaderElection elector = getLeaderElector(testName, new LeaderElectionListener() {
94+
@Override
95+
public void takeLeadership() throws Exception {
96+
try {
97+
logger.debug("Leadership is taken");
98+
leadershipTaken.set(true);
99+
Thread.sleep(10000);
100+
} catch (InterruptedException e) {
101+
interrupted.set(true);
102+
logger.debug("Leadership is interrupted");
103+
}
104+
}
105+
});
106+
elector.start();
107+
elector.requeue();
108+
109+
AwaitAssert.await().until(leadershipTaken::get);
110+
Assert.assertTrue(leadershipTaken.get());
111+
112+
elector.interruptLeadership();
113+
AwaitAssert.await().until(interrupted::get);
114+
Assert.assertFalse(elector.isLeader());
115+
116+
elector.close();
117+
}
118+
119+
87120
@Test
88121
public void shouldCallTakeLeadershipAgainAfterRequeue() throws Exception {
89122
AtomicInteger leadershipCount = new AtomicInteger(0);
@@ -133,6 +166,8 @@ public void takeLeadership() throws Exception {
133166

134167
Assert.assertEquals(1, participants1.size());
135168
Assert.assertTrue(leader1.isPresent());
169+
Assert.assertTrue(leader1.get().isLeader());
170+
Assert.assertArrayEquals(leader1.get().getData(), testName.getBytes(StandardCharsets.UTF_8));
136171
Assert.assertEquals(participants1.get(0).getSessionId(), leader1.get().getSessionId());
137172

138173
// Add second leader
@@ -146,8 +181,8 @@ public void takeLeadership() throws Exception {
146181
logger.info("Leadership 2 ended");
147182
}
148183
});
184+
elector2.autoRequeue();
149185
elector2.start();
150-
elector2.requeue();
151186

152187
AwaitAssert.await().until(leader2Taken::get);
153188
// Check participants and leader
@@ -161,6 +196,9 @@ public void takeLeadership() throws Exception {
161196
logger.info("current leader 2 {}", leader2);
162197
logger.info("current participants 2 {}", participants2);
163198

199+
Assert.assertEquals(participants1, participants2);
200+
Assert.assertEquals(leader1.hashCode(), leader2.hashCode());
201+
164202
Assert.assertTrue(leader2Taken.get());
165203
Assert.assertTrue(elector2.isLeader());
166204
Assert.assertEquals(elector2.getCurrentLeader().get().getSessionId(),

coordination/src/test/java/tech/ydb/coordination/recipes/group/GroupMembershipIntegrationTest.java

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package tech.ydb.coordination.recipes.group;
22

3+
import java.util.List;
4+
import java.util.concurrent.Executors;
5+
36
import org.junit.AfterClass;
7+
import org.junit.Assert;
48
import org.junit.BeforeClass;
59
import org.junit.ClassRule;
610
import org.junit.Test;
711
import org.slf4j.Logger;
812
import org.slf4j.LoggerFactory;
13+
import tech.ydb.common.retry.RetryForever;
914
import tech.ydb.coordination.AwaitAssert;
1015
import tech.ydb.coordination.CoordinationClient;
1116
import tech.ydb.test.junit4.GrpcTransportRule;
@@ -29,20 +34,40 @@ public static void clean() {
2934
}
3035

3136
private GroupMembership getGroupMembership(String testName) {
32-
return getGroupMembership(testName, testName);
37+
return getGroupMembership(testName, testName, testName.getBytes());
3338
}
3439

3540
private GroupMembership getGroupMembership(
3641
String coordinationNodePath,
37-
String groupName
42+
String groupName,
43+
byte[] data
3844
) {
3945
client.createNode(coordinationNodePath).join().expectSuccess(
4046
"cannot create coordination node on path: " + coordinationNodePath
4147
);
4248
return new GroupMembership(
4349
client,
4450
coordinationNodePath,
45-
groupName
51+
groupName,
52+
data
53+
);
54+
}
55+
56+
private GroupMembership getGroupMembershipCustom(
57+
String coordinationNodePath,
58+
String groupName,
59+
byte[] data,
60+
GroupMembershipSettings settings
61+
) {
62+
client.createNode(coordinationNodePath).join().expectSuccess(
63+
"cannot create coordination node on path: " + coordinationNodePath
64+
);
65+
return new GroupMembership(
66+
client,
67+
coordinationNodePath,
68+
groupName,
69+
data,
70+
settings
4671
);
4772
}
4873

@@ -51,6 +76,39 @@ public void successTest() throws Exception {
5176
String testName = "successTest";
5277

5378
GroupMembership groupMembership = getGroupMembership(testName);
79+
80+
groupMembership.getSessionListenable().addListener(
81+
state -> logger.info("State change: " + state)
82+
);
83+
groupMembership.getMembersListenable().addListener(
84+
groupMembers -> logger.info("Members change: " + groupMembers)
85+
);
86+
87+
groupMembership.start();
88+
89+
AwaitAssert.await().until(() -> {
90+
if (groupMembership.getCurrentMembers() == null) {
91+
return false;
92+
}
93+
return groupMembership.getCurrentMembers().size() == 1;
94+
});
95+
96+
groupMembership.close();
97+
}
98+
99+
@Test
100+
public void everyTest() throws Exception {
101+
String testName = "everyTest";
102+
103+
GroupMembership groupMembership = getGroupMembershipCustom(
104+
testName,
105+
testName,
106+
testName.getBytes(),
107+
GroupMembershipSettings.newBuilder()
108+
.withRetryPolicy(new RetryForever(100))
109+
.withScheduledExecutor(Executors.newSingleThreadScheduledExecutor())
110+
.build()
111+
);
54112
groupMembership.start();
55113

56114

@@ -61,6 +119,16 @@ public void successTest() throws Exception {
61119
return groupMembership.getCurrentMembers().size() == 1;
62120
});
63121

122+
List<GroupMember> currentMembers = groupMembership.getCurrentMembers();
123+
GroupMember groupMember1 = currentMembers.get(0);
124+
logger.info(groupMember1.toString());
125+
126+
Assert.assertEquals(1L, groupMember1.getSessionId());
127+
Assert.assertArrayEquals(groupMember1.getData(), testName.getBytes());
128+
GroupMember groupMember2 = new GroupMember(1L, testName.getBytes());
129+
Assert.assertEquals(groupMember1, groupMember2);
130+
Assert.assertEquals(groupMember1.hashCode(), groupMember2.hashCode());
131+
64132
groupMembership.close();
65133
}
66134

0 commit comments

Comments
 (0)