11package tech .ydb .lock .provider ;
22
3+ import java .nio .charset .StandardCharsets ;
34import java .sql .SQLException ;
45import java .util .Optional ;
6+ import javax .annotation .PostConstruct ;
57import javax .annotation .PreDestroy ;
68import net .javacrumbs .shedlock .core .LockConfiguration ;
79import net .javacrumbs .shedlock .core .LockProvider ;
810import net .javacrumbs .shedlock .core .SimpleLock ;
11+ import net .javacrumbs .shedlock .support .Utils ;
912import org .slf4j .Logger ;
1013import org .slf4j .LoggerFactory ;
1114import tech .ydb .coordination .CoordinationClient ;
15+ import tech .ydb .coordination .CoordinationSession ;
1216import tech .ydb .coordination .SemaphoreLease ;
17+ import tech .ydb .core .Result ;
1318import tech .ydb .jdbc .YdbConnection ;
1419
1520/**
@@ -19,21 +24,38 @@ public class YdbCoordinationServiceLockProvider implements LockProvider {
1924 private static final Logger logger = LoggerFactory .getLogger (YdbCoordinationServiceLockProvider .class );
2025 private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb" ;
2126 private static final int ATTEMPT_CREATE_NODE = 10 ;
27+ private static final String INSTANCE_INFO =
28+ "{Hostname=" + Utils .getHostname () + ", " + "Current PID=" + ProcessHandle .current ().pid () + "}" ;
29+ private static final byte [] INSTANCE_INFO_BYTES = INSTANCE_INFO .getBytes (StandardCharsets .UTF_8 );
2230
2331 private final YdbConnection ydbConnection ;
2432 private final CoordinationClient coordinationClient ;
2533
34+ private volatile CoordinationSession coordinationSession ;
35+
2636 public YdbCoordinationServiceLockProvider (YdbConnection ydbConnection ) {
2737 this .ydbConnection = ydbConnection ;
2838 this .coordinationClient = CoordinationClient .newClient (ydbConnection .getCtx ().getGrpcTransport ());
2939 }
3040
41+ @ PostConstruct
3142 public void init () {
3243 for (int i = 0 ; i < ATTEMPT_CREATE_NODE ; i ++) {
3344 var status = coordinationClient .createNode (YDB_LOCK_NODE_NAME ).join ();
3445
3546 if (status .isSuccess ()) {
36- return ;
47+ coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
48+
49+ var statusCS = coordinationSession .connect ().join ();
50+
51+ if (statusCS .isSuccess ()) {
52+ logger .info ("Created coordination node session [{}]" , coordinationSession );
53+
54+ return ;
55+ }
56+ if (i == ATTEMPT_CREATE_NODE - 1 ) {
57+ statusCS .expectSuccess ("Failed creating coordination node session" );
58+ }
3759 }
3860
3961 if (i == ATTEMPT_CREATE_NODE - 1 ) {
@@ -44,35 +66,41 @@ public void init() {
4466
4567 @ Override
4668 public Optional <SimpleLock > lock (LockConfiguration lockConfiguration ) {
47- var coordinationSession = coordinationClient .createSession (YDB_LOCK_NODE_NAME );
48-
49- coordinationSession .connect ().join ()
50- .expectSuccess ("Failed creating coordination node session" );
69+ logger .info ("Instance[{}] is trying to become a leader..." , INSTANCE_INFO );
5170
52- logger .debug ("Created coordination node session" );
53-
54- var semaphoreLease = coordinationSession .acquireEphemeralSemaphore (lockConfiguration .getName (), true ,
55- lockConfiguration .getLockAtMostFor ()).join ();
71+ Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
72+ lockConfiguration .getName (),
73+ true ,
74+ INSTANCE_INFO_BYTES ,
75+ lockConfiguration .getLockAtMostFor ()
76+ ).join ();
5677
5778 if (semaphoreLease .isSuccess ()) {
58- logger .debug ("Semaphore acquired" );
79+ logger .info ("Instance[{}] acquired semaphore[SemaphoreName={}]" , INSTANCE_INFO ,
80+ semaphoreLease .getValue ().getSemaphoreName ());
5981
6082 return Optional .of (new YdbSimpleLock (semaphoreLease .getValue ()));
6183 } else {
62- logger .debug ("Semaphore is not acquired" );
84+ logger .info ("Instance[{}] did not acquire semaphore" , INSTANCE_INFO );
85+
6386 return Optional .empty ();
6487 }
6588 }
6689
6790 private record YdbSimpleLock (SemaphoreLease semaphoreLease ) implements SimpleLock {
6891 @ Override
6992 public void unlock () {
93+ logger .info ("Instance[{}] released semaphore[SemaphoreName={}]" , INSTANCE_INFO , semaphoreLease .getSemaphoreName ());
94+
7095 semaphoreLease .release ().join ();
7196 }
7297 }
7398
7499 @ PreDestroy
75100 private void close () throws SQLException {
101+ // closing coordination session
102+ coordinationSession .close ();
103+
76104 ydbConnection .close ();
77105 }
78106}
0 commit comments