11package tech .ydb .lock .provider ;
22
3+ import java .nio .charset .StandardCharsets ;
34import java .sql .SQLException ;
5+ import java .time .Instant ;
46import java .util .Optional ;
57import javax .annotation .PreDestroy ;
68import net .javacrumbs .shedlock .core .LockConfiguration ;
79import net .javacrumbs .shedlock .core .LockProvider ;
810import net .javacrumbs .shedlock .core .SimpleLock ;
11+ import net .javacrumbs .shedlock .support .Utils ;
912import org .slf4j .Logger ;
1013import org .slf4j .LoggerFactory ;
14+ import tech .ydb .common .retry .RetryForever ;
1115import tech .ydb .coordination .CoordinationClient ;
16+ import tech .ydb .coordination .CoordinationSession ;
1217import tech .ydb .coordination .SemaphoreLease ;
18+ import tech .ydb .coordination .settings .CoordinationSessionSettings ;
19+ import tech .ydb .core .Result ;
1320import tech .ydb .jdbc .YdbConnection ;
1421
1522/**
@@ -32,10 +39,6 @@ public void init() {
3239 for (int i = 0 ; i < ATTEMPT_CREATE_NODE ; i ++) {
3340 var status = coordinationClient .createNode (YDB_LOCK_NODE_NAME ).join ();
3441
35- if (status .isSuccess ()) {
36- return ;
37- }
38-
3942 if (i == ATTEMPT_CREATE_NODE - 1 ) {
4043 status .expectSuccess ("Failed created coordination service node: " + YDB_LOCK_NODE_NAME );
4144 }
@@ -44,30 +47,60 @@ public void init() {
4447
4548 @ Override
4649 public Optional <SimpleLock > lock (LockConfiguration lockConfiguration ) {
47- var coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
50+ var now = Instant .now ();
51+
52+ String instanceInfo = "Hostname=" + Utils .getHostname () + ", " +
53+ "Current PID=" + ProcessHandle .current ().pid () + ", " +
54+ "CreatedAt=" + now ;
55+
56+ logger .info ("Instance[{}] is trying to become a leader..." , instanceInfo );
4857
49- coordinationSession .connect ().join ()
50- .expectSuccess ("Failed creating coordination node session" );
58+ var coordinationSession = coordinationClient .createSession (
59+ YDB_LOCK_NODE_NAME , CoordinationSessionSettings .newBuilder ()
60+ .withRetryPolicy (new RetryForever (500 ))
61+ .build ()
62+ );
5163
52- logger . debug ( "Created coordination node session" );
64+ var statusCS = coordinationSession . connect (). join ( );
5365
54- var semaphoreLease = coordinationSession .acquireEphemeralSemaphore (lockConfiguration .getName (), true ,
55- lockConfiguration .getLockAtMostFor ()).join ();
66+ if (!statusCS .isSuccess ()) {
67+ logger .info ("Failed creating coordination session [{}]" , coordinationSession );
68+
69+ return Optional .empty ();
70+ }
71+
72+ logger .info ("Created coordination node session [{}]" , coordinationSession );
73+
74+ Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
75+ lockConfiguration .getName (),
76+ true ,
77+ instanceInfo .getBytes (StandardCharsets .UTF_8 ),
78+ lockConfiguration .getLockAtMostFor ()
79+ ).join ();
80+
81+ logger .debug (coordinationSession .toString ());
5682
5783 if (semaphoreLease .isSuccess ()) {
58- logger .debug ("Semaphore acquired" );
84+ logger .info ("Instance[{}] acquired semaphore[SemaphoreName={}]" , instanceInfo ,
85+ semaphoreLease .getValue ().getSemaphoreName ());
5986
60- return Optional .of (new YdbSimpleLock (semaphoreLease .getValue ()));
87+ return Optional .of (new YdbSimpleLock (semaphoreLease .getValue (), instanceInfo , coordinationSession ));
6188 } else {
62- logger .debug ("Semaphore is not acquired" );
89+ logger .info ("Instance[{}] did not acquire semaphore" , instanceInfo );
90+
6391 return Optional .empty ();
6492 }
6593 }
6694
67- private record YdbSimpleLock (SemaphoreLease semaphoreLease ) implements SimpleLock {
95+ private record YdbSimpleLock (SemaphoreLease semaphoreLease , String metaInfo ,
96+ CoordinationSession coordinationSession ) implements SimpleLock {
6897 @ Override
6998 public void unlock () {
99+ logger .info ("Instance[{}] released semaphore[SemaphoreName={}]" , metaInfo , semaphoreLease .getSemaphoreName ());
100+
70101 semaphoreLease .release ().join ();
102+
103+ coordinationSession .close ();
71104 }
72105 }
73106
0 commit comments