Skip to content

Commit e9ce31f

Browse files
authored
Use keepalive for ETCD Lease Renewals (#1511)
1 parent 159ca7e commit e9ce31f

File tree

4 files changed

+95
-98
lines changed

4 files changed

+95
-98
lines changed

astra/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,11 @@
465465
<version>${jetcd.version}</version>
466466
<scope>test</scope>
467467
</dependency>
468+
<!-- testcontainers version uses version from <dependencyManagement> -->
469+
<dependency>
470+
<groupId>org.testcontainers</groupId>
471+
<artifactId>testcontainers</artifactId>
472+
</dependency>
468473
<!-- Note: next time we upgrade this, check if mockito-core depends on 1.14.8+ version of bytebuddy -->
469474
<!-- if it does then we don't need to define bytebuddy explicitly anymore -->
470475
<!-- Run mvn dependency:tree to verify. armeria also has a runtime dependency on bytebuddy so we need to check that as well -->
@@ -586,6 +591,17 @@
586591

587592
</dependencies>
588593

594+
<dependencyManagement>
595+
<dependencies>
596+
<dependency>
597+
<groupId>org.testcontainers</groupId>
598+
<artifactId>testcontainers</artifactId>
599+
<version>2.0.2</version>
600+
<scope>test</scope>
601+
</dependency>
602+
</dependencies>
603+
</dependencyManagement>
604+
589605
<build>
590606
<defaultGoal>clean install</defaultGoal>
591607
<extensions>

astra/src/main/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,16 @@ public DiskOrMessageCountBasedRolloverStrategy(
8888
if (dirSize > 0) {
8989
approximateDirectoryBytes.set(dirSize);
9090
}
91-
if (!maxTimePerChunksMinsReached.get()
92-
&& Instant.now()
93-
.isAfter(rolloverStartTime.plus(maxTimePerChunksSeconds, ChronoUnit.SECONDS))) {
94-
LOG.info(
95-
"Max time per chunk reached. chunkStartTime: {} currentTime: {}",
96-
rolloverStartTime,
97-
Instant.now());
98-
maxTimePerChunksMinsReached.set(true);
91+
// Only check max time if it's not Long.MAX_VALUE (which would cause overflow)
92+
if (!maxTimePerChunksMinsReached.get() && maxTimePerChunksSeconds < Long.MAX_VALUE) {
93+
if (Instant.now()
94+
.isAfter(rolloverStartTime.plus(maxTimePerChunksSeconds, ChronoUnit.SECONDS))) {
95+
LOG.info(
96+
"Max time per chunk reached. chunkStartTime: {} currentTime: {}",
97+
rolloverStartTime,
98+
Instant.now());
99+
maxTimePerChunksMinsReached.set(true);
100+
}
99101
}
100102
} catch (Exception e) {
101103
LOG.error("Error calculating directory size", e);

astra/src/main/java/com/slack/astra/metadata/core/EtcdMetadataStore.java

Lines changed: 66 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
import io.etcd.jetcd.KeyValue;
99
import io.etcd.jetcd.Watch.Watcher;
1010
import io.etcd.jetcd.kv.GetResponse;
11+
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
1112
import io.etcd.jetcd.options.GetOption;
1213
import io.etcd.jetcd.options.PutOption;
1314
import io.etcd.jetcd.options.WatchOption;
1415
import io.etcd.jetcd.watch.WatchEvent;
16+
import io.grpc.stub.StreamObserver;
1517
import io.micrometer.core.instrument.Counter;
1618
import io.micrometer.core.instrument.MeterRegistry;
1719
import java.io.Closeable;
@@ -56,8 +58,8 @@ public class EtcdMetadataStore<T extends AstraMetadata> implements Closeable {
5658
/** Shared lease ID for all ephemeral nodes. Only valid if createMode is EPHEMERAL. */
5759
private volatile long sharedLeaseId = -1;
5860

59-
/** Used for refreshing ephemeral node leases. */
60-
private final ScheduledExecutorService leaseRefreshExecutor;
61+
/** Flag to track if the store is being closed to prevent keepalive restarts during shutdown. */
62+
private volatile boolean isClosing = false;
6163

6264
/** Used for watch retry operations with delays. */
6365
private final ScheduledExecutorService watchRetryExecutor;
@@ -68,8 +70,6 @@ public class EtcdMetadataStore<T extends AstraMetadata> implements Closeable {
6870
/** TTL in milliseconds for ephemeral nodes. */
6971
private final long ephemeralTtlMs;
7072

71-
private final int ephemeralMaxRetries;
72-
7373
private static final Logger LOG = LoggerFactory.getLogger(EtcdMetadataStore.class);
7474

7575
protected final String storeFolder;
@@ -140,7 +140,7 @@ public EtcdMetadataStore(
140140
MeterRegistry meterRegistry,
141141
MetadataSerializer<T> serializer,
142142
EtcdCreateMode createMode,
143-
Client etcClient) {
143+
Client etcdClient) {
144144
this.storeFolder = storeFolder;
145145
this.namespace = config.getNamespace();
146146
this.meterRegistry = meterRegistry;
@@ -149,7 +149,6 @@ public EtcdMetadataStore(
149149
this.watchers = new ConcurrentHashMap<>();
150150
this.createMode = createMode;
151151
this.ephemeralTtlMs = config.getEphemeralNodeTtlMs();
152-
this.ephemeralMaxRetries = config.getEphemeralNodeMaxRetries();
153152
this.etcdOperationTimeoutMs = config.getOperationsTimeoutMs();
154153

155154
// Store retry configuration for watch operations
@@ -170,15 +169,15 @@ public EtcdMetadataStore(
170169
this.leaseRefreshHandlerFired =
171170
this.meterRegistry.counter(ASTRA_ETCD_LEASE_REFRESH_HANDLER_FIRED, "store", store);
172171

173-
if (etcClient == null) {
172+
if (etcdClient == null) {
174173
throw new IllegalArgumentException("External etcd client must be provided");
175174
}
176175

177176
LOG.info(
178177
"Using provided external etcd client for store folder: {} with mode: {}",
179178
storeFolder,
180179
createMode);
181-
this.etcdClient = etcClient;
180+
this.etcdClient = etcdClient;
182181

183182
// Initialize watch retry executor - scheduled cached thread pool that scales to 0
184183
this.watchRetryExecutor =
@@ -191,28 +190,16 @@ public EtcdMetadataStore(
191190
return t;
192191
});
193192

194-
// Initialize lease refresh executor if we're creating ephemeral nodes
195193
if (createMode == EtcdCreateMode.EPHEMERAL) {
196-
this.leaseRefreshExecutor =
197-
Executors.newSingleThreadScheduledExecutor(
198-
r -> {
199-
Thread t = new Thread(r);
200-
t.setDaemon(true);
201-
t.setName("etcd-lease-refresh-" + storeFolder);
202-
return t;
203-
});
204-
205-
// Calculate the refresh interval - currently 1/3 of the TTL
206-
long refreshIntervalMs = ephemeralTtlMs / 3;
207-
208194
// Create a single shared lease for all ephemeral nodes synchronously
209195
try {
210196
sharedLeaseId =
211-
etcdClient
197+
this.etcdClient
212198
.getLeaseClient()
213199
.grant(ephemeralTtlMs / 1000) // grant ttl is in seconds
214200
.get(ephemeralTtlMs, TimeUnit.MILLISECONDS)
215201
.getID();
202+
startKeepAlive();
216203
} catch (InterruptedException | ExecutionException | TimeoutException e) {
217204
throw new RuntimeException(e);
218205
}
@@ -222,14 +209,6 @@ public EtcdMetadataStore(
222209
sharedLeaseId,
223210
Long.toHexString(sharedLeaseId),
224211
ephemeralTtlMs);
225-
226-
LOG.info("Starting lease refresh thread with interval: {} ms", refreshIntervalMs);
227-
228-
// Start the refresh task
229-
leaseRefreshExecutor.scheduleWithFixedDelay(
230-
this::refreshAllLeases, refreshIntervalMs, refreshIntervalMs, TimeUnit.MILLISECONDS);
231-
} else {
232-
leaseRefreshExecutor = null;
233212
}
234213

235214
// Initialize cache if needed
@@ -247,6 +226,52 @@ public EtcdMetadataStore(
247226
}
248227
}
249228

229+
/** Creates KeepAlive GRPC connection and handles error and completed cases */
230+
private void startKeepAlive() {
231+
this.etcdClient
232+
.getLeaseClient()
233+
.keepAlive(
234+
sharedLeaseId,
235+
new StreamObserver<LeaseKeepAliveResponse>() {
236+
@Override
237+
public void onNext(LeaseKeepAliveResponse response) {
238+
LOG.debug(
239+
"Received keepAlive response for lease {}, TTL: {}",
240+
response.getID(),
241+
response.getTTL());
242+
leaseRefreshHandlerFired.increment();
243+
}
244+
245+
@Override
246+
public void onError(Throwable t) {
247+
if (isClosing) {
248+
LOG.debug(
249+
"KeepAlive stream error during shutdown for lease {}, not restarting",
250+
sharedLeaseId);
251+
return;
252+
}
253+
LOG.error(
254+
"Error in keepAlive stream for shared lease {}: {}",
255+
sharedLeaseId,
256+
t.getMessage());
257+
startKeepAlive();
258+
}
259+
260+
@Override
261+
public void
262+
onCompleted() { // not the same as a shutdown, we want to restart the connection
263+
if (isClosing) {
264+
LOG.debug(
265+
"KeepAlive stream completed during shutdown for lease {}, not restarting",
266+
sharedLeaseId);
267+
return;
268+
}
269+
LOG.warn("KeepAlive stream completed for shared lease {}", sharedLeaseId);
270+
startKeepAlive();
271+
}
272+
});
273+
}
274+
250275
/**
251276
* Converts a path string to an etcd ByteSequence key.
252277
*
@@ -1091,49 +1116,12 @@ private void populateInitialCache() {
10911116
// as initialized on success or interruption, not on errors.
10921117
}
10931118

1094-
/** Refreshes the shared lease for all ephemeral nodes to prevent them from expiring. */
1095-
private void refreshAllLeases() {
1096-
// Skip if lease hasn't been initialized (though with synchronous initialization this shouldn't
1097-
// happen)
1098-
if (sharedLeaseId == -1) {
1099-
LOG.debug("Skipping lease refresh - lease not yet initialized");
1100-
return;
1101-
}
1102-
1103-
LOG.debug("Refreshing shared lease {} for store {}", sharedLeaseId, storeFolder);
1104-
1105-
long retryTimeoutMs = ephemeralTtlMs / ephemeralMaxRetries;
1106-
int retryCounter = 0;
1107-
while (retryCounter <= ephemeralMaxRetries) {
1108-
try {
1109-
etcdClient
1110-
.getLeaseClient()
1111-
.keepAliveOnce(sharedLeaseId)
1112-
.get(retryTimeoutMs, TimeUnit.MILLISECONDS);
1113-
LOG.trace("Successfully refreshed shared lease {}", sharedLeaseId);
1114-
this.leaseRefreshHandlerFired.increment();
1115-
break;
1116-
} catch (InterruptedException e) {
1117-
LOG.warn("Interrupted while refreshing shared lease {}", sharedLeaseId, e);
1118-
Thread.currentThread().interrupt(); // Preserve interrupt status
1119-
} catch (Exception e) {
1120-
retryCounter++;
1121-
if (retryCounter >= ephemeralMaxRetries) {
1122-
LOG.error(
1123-
"Failed to refresh shared lease max times, fataling {}: {}",
1124-
sharedLeaseId,
1125-
e.getMessage());
1126-
// This is a critical error since it affects all ephemeral nodes
1127-
new RuntimeHalterImpl().handleFatal(e);
1128-
}
1129-
LOG.error("Failed to refresh shared lease, retrying {}: {}", sharedLeaseId, e.getMessage());
1130-
}
1131-
}
1132-
}
1133-
11341119
@Override
11351120
public void close() {
11361121
LOG.info("Closing etcd clients and watchers");
1122+
// Set closing flag to prevent keepalive restarts during shutdown
1123+
isClosing = true;
1124+
11371125
// Close all active watchers
11381126
watchers.values().forEach(Watcher::close);
11391127
watchers.clear();
@@ -1166,24 +1154,15 @@ public void close() {
11661154
}
11671155
}
11681156

1169-
// Shut down lease refresh executor if it exists
1170-
if (leaseRefreshExecutor != null) {
1171-
leaseRefreshExecutor.shutdownNow();
1157+
// Revoke the shared lease if we have one
1158+
if (sharedLeaseId != -1) {
11721159
try {
1173-
leaseRefreshExecutor.awaitTermination(5, TimeUnit.SECONDS);
1174-
} catch (InterruptedException ignored) {
1175-
}
1176-
1177-
// Revoke the shared lease if we have one
1178-
if (sharedLeaseId != -1) {
1179-
try {
1180-
LOG.info("Revoking shared lease {}", sharedLeaseId);
1181-
etcdClient.getLeaseClient().revoke(sharedLeaseId).get(5, TimeUnit.SECONDS);
1182-
} catch (Exception e) {
1183-
LOG.warn("Failed to revoke shared lease {}: {}", sharedLeaseId, e.getMessage());
1184-
} finally {
1185-
sharedLeaseId = -1;
1186-
}
1160+
LOG.info("Revoking shared lease {}", sharedLeaseId);
1161+
etcdClient.getLeaseClient().revoke(sharedLeaseId).get(5, TimeUnit.SECONDS);
1162+
} catch (Exception e) {
1163+
LOG.warn("Failed to revoke shared lease {}: {}", sharedLeaseId, e.getMessage());
1164+
} finally {
1165+
sharedLeaseId = -1;
11871166
}
11881167
}
11891168

astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,15 +227,15 @@ public void shutdownOpenSearchAPI() throws Exception {
227227
bulkIngestKafkaProducer.awaitTerminated(DEFAULT_START_STOP_DURATION);
228228
}
229229
kafkaServer.close();
230-
if (etcdClient != null) {
231-
etcdClient.close();
232-
}
233230

234231
datasetMetadataStore.close();
235232
preprocessorMetadataStore.close();
236233
curatorFramework.unwrap().close();
237234
zkServer.close();
238235
meterRegistry.close();
236+
if (etcdClient != null) {
237+
etcdClient.close();
238+
}
239239
}
240240

241241
public KafkaConsumer getTestKafkaConsumer() {

0 commit comments

Comments
 (0)