22
33import java .nio .charset .StandardCharsets ;
44import java .sql .SQLException ;
5+ import java .time .Instant ;
56import java .util .Optional ;
67import javax .annotation .PostConstruct ;
78import javax .annotation .PreDestroy ;
1415import tech .ydb .coordination .CoordinationClient ;
1516import tech .ydb .coordination .CoordinationSession ;
1617import tech .ydb .coordination .SemaphoreLease ;
18+ import tech .ydb .coordination .settings .DescribeSemaphoreMode ;
1719import tech .ydb .core .Result ;
1820import tech .ydb .jdbc .YdbConnection ;
1921
@@ -25,7 +27,7 @@ public class YdbCoordinationServiceLockProvider implements LockProvider {
2527 private static final String YDB_LOCK_NODE_NAME = "shared-lock-ydb" ;
2628 private static final int ATTEMPT_CREATE_NODE = 10 ;
2729 private static final String INSTANCE_INFO =
28- "{ Hostname=" + Utils .getHostname () + ", " + "Current PID=" + ProcessHandle .current ().pid () + "}" ;
30+ "Hostname=" + Utils .getHostname () + ", " + "Current PID=" + ProcessHandle .current ().pid ();
2931 private static final byte [] INSTANCE_INFO_BYTES = INSTANCE_INFO .getBytes (StandardCharsets .UTF_8 );
3032
3133 private final YdbConnection ydbConnection ;
@@ -66,7 +68,36 @@ public void init() {
6668
6769 @ Override
6870 public Optional <SimpleLock > lock (LockConfiguration lockConfiguration ) {
69- logger .info ("Instance[{}] is trying to become a leader..." , INSTANCE_INFO );
71+ var now = Instant .now ();
72+
73+ String instanceInfo = "Hostname=" + Utils .getHostname () + ", " +
74+ "Current PID=" + ProcessHandle .current ().pid () + ", " +
75+ "CreatedAt=" + now ;
76+
77+ logger .info ("Instance[{}] is trying to become a leader..." , instanceInfo );
78+
79+ var describeResult = coordinationSession .describeSemaphore (
80+ lockConfiguration .getName (),
81+ DescribeSemaphoreMode .WITH_OWNERS
82+ ).join ();
83+
84+ if (describeResult .isSuccess ()) {
85+ var describe = describeResult .getValue ();
86+ var describePayload = new String (describe .getData (), StandardCharsets .UTF_8 );
87+
88+ logger .debug ("Received DescribeSemaphore[Name={}, Data={}]" , describe .getName (), describePayload );
89+
90+ Instant createdLeaderTimestampUTC = Instant .parse (describePayload .split ("," )[2 ].split ("=" )[1 ]);
91+
92+ if (now .isAfter (createdLeaderTimestampUTC .plus (lockConfiguration .getLockAtMostFor ()))) {
93+ var deleteResult = coordinationSession .deleteSemaphore (describe .getName (), true ).join ();
94+ logger .debug ("Delete semaphore[Name={}] result: {}" , describe .getName (), deleteResult );
95+ }
96+ } else {
97+ // no success, ephemeral semaphore is not created
98+
99+ logger .debug ("Semaphore[Name={}] not found" , lockConfiguration .getName ());
100+ }
70101
71102 Result <SemaphoreLease > semaphoreLease = coordinationSession .acquireEphemeralSemaphore (
72103 lockConfiguration .getName (),
0 commit comments