1111import net .javacrumbs .shedlock .support .Utils ;
1212import org .slf4j .Logger ;
1313import org .slf4j .LoggerFactory ;
14+ import tech .ydb .common .retry .RetryForever ;
1415import tech .ydb .coordination .CoordinationClient ;
1516import tech .ydb .coordination .CoordinationSession ;
1617import tech .ydb .coordination .SemaphoreLease ;
18+ import tech .ydb .coordination .settings .CoordinationSessionSettings ;
1719import tech .ydb .core .Result ;
1820import tech .ydb .jdbc .YdbConnection ;
1921
@@ -28,8 +30,6 @@ public class YdbCoordinationServiceLockProvider implements LockProvider {
2830 private final YdbConnection ydbConnection ;
2931 private final CoordinationClient coordinationClient ;
3032
31- private volatile CoordinationSession coordinationSession ;
32-
3333 public YdbCoordinationServiceLockProvider (YdbConnection ydbConnection ) {
3434 this .ydbConnection = ydbConnection ;
3535 this .coordinationClient = CoordinationClient .newClient (ydbConnection .getCtx ().getGrpcTransport ());
@@ -39,21 +39,6 @@ public void init() {
3939 for (int i = 0 ; i < ATTEMPT_CREATE_NODE ; i ++) {
4040 var status = coordinationClient .createNode (YDB_LOCK_NODE_NAME ).join ();
4141
42- if (status .isSuccess ()) {
43- coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
44-
45- var statusCS = coordinationSession .connect ().join ();
46-
47- if (statusCS .isSuccess ()) {
48- logger .info ("Created coordination node session [{}]" , coordinationSession );
49-
50- return ;
51- }
52- if (i == ATTEMPT_CREATE_NODE - 1 ) {
53- statusCS .expectSuccess ("Failed creating coordination node session" );
54- }
55- }
56-
5742 if (i == ATTEMPT_CREATE_NODE - 1 ) {
5843 status .expectSuccess ("Failed created coordination service node: " + YDB_LOCK_NODE_NAME );
5944 }
@@ -70,39 +55,57 @@ public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
7055
7156 logger .info ("Instance[{}] is trying to become a leader..." , instanceInfo );
7257
58+ var coordinationSession = coordinationClient .createSession (
59+ YDB_LOCK_NODE_NAME , CoordinationSessionSettings .newBuilder ()
60+ .withRetryPolicy (new RetryForever (500 ))
61+ .build ()
62+ );
63+
64+ var statusCS = coordinationSession .connect ().join ();
65+
66+ if (!statusCS .isSuccess ()) {
67+ logger .info ("Failed creating coordination session [{}]" , coordinationSession );
68+
69+ return Optional .empty ();
70+ }
71+
72+ logger .info ("Created coordination node session [{}]" , coordinationSession );
73+
7374 Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
7475 lockConfiguration .getName (),
7576 true ,
7677 instanceInfo .getBytes (StandardCharsets .UTF_8 ),
7778 lockConfiguration .getLockAtMostFor ()
7879 ).join ();
7980
81+ logger .debug (coordinationSession .toString ());
82+
8083 if (semaphoreLease .isSuccess ()) {
8184 logger .info ("Instance[{}] acquired semaphore[SemaphoreName={}]" , instanceInfo ,
8285 semaphoreLease .getValue ().getSemaphoreName ());
8386
84- return Optional .of (new YdbSimpleLock (semaphoreLease .getValue (), instanceInfo ));
87+ return Optional .of (new YdbSimpleLock (semaphoreLease .getValue (), instanceInfo , coordinationSession ));
8588 } else {
8689 logger .info ("Instance[{}] did not acquire semaphore" , instanceInfo );
8790
8891 return Optional .empty ();
8992 }
9093 }
9194
92- private record YdbSimpleLock (SemaphoreLease semaphoreLease , String metaInfo ) implements SimpleLock {
95+ private record YdbSimpleLock (SemaphoreLease semaphoreLease , String metaInfo ,
96+ CoordinationSession coordinationSession ) implements SimpleLock {
9397 @ Override
9498 public void unlock () {
9599 logger .info ("Instance[{}] released semaphore[SemaphoreName={}]" , metaInfo , semaphoreLease .getSemaphoreName ());
96100
97101 semaphoreLease .release ().join ();
102+
103+ coordinationSession .close ();
98104 }
99105 }
100106
101107 @ PreDestroy
102108 private void close () throws SQLException {
103- // closing coordination session
104- coordinationSession .close ();
105-
106109 ydbConnection .close ();
107110 }
108111}
0 commit comments