Skip to content

Commit 17674ca

Browse files
committed
feat: receiver verticle now has a grace period before closing
Signed-off-by: Calum Murray <[email protected]>
1 parent 865fea6 commit 17674ca

File tree

7 files changed

+38
-8
lines changed

7 files changed

+38
-8
lines changed

data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,17 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler<HttpSe
9797
private IngressProducerReconcilableStore ingressProducerStore;
9898
private FileWatcher secretWatcher;
9999

100+
private final long terminationGracePeriodMs;
101+
100102
public ReceiverVerticle(
101103
final ReceiverEnv env,
102104
final HttpServerOptions httpServerOptions,
103105
final HttpServerOptions httpsServerOptions,
104106
final Function<Vertx, IngressProducerReconcilableStore> ingressProducerStoreFactory,
105107
final IngressRequestHandler ingressRequestHandler,
106108
final String secretVolumePath,
107-
final OIDCDiscoveryConfig oidcDiscoveryConfig) {
109+
final OIDCDiscoveryConfig oidcDiscoveryConfig,
110+
final long terminationGracePeriodMs) {
108111

109112
Objects.requireNonNull(env);
110113
Objects.requireNonNull(httpServerOptions);
@@ -122,6 +125,7 @@ public ReceiverVerticle(
122125
this.tlsKeyFile = new File(secretVolumePath + "/tls.key");
123126
this.tlsCrtFile = new File(secretVolumePath + "/tls.crt");
124127
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
128+
this.terminationGracePeriodMs = terminationGracePeriodMs;
125129
}
126130

127131
public HttpServerOptions getHttpsServerOptions() {
@@ -193,6 +197,20 @@ private void setupSecretWatcher() {
193197

194198
@Override
195199
public void stop(Promise<Void> stopPromise) throws Exception {
200+
if (this.terminationGracePeriodMs < 1) {
201+
this.handleStop(stopPromise);
202+
return;
203+
}
204+
this.vertx.setTimer(this.terminationGracePeriodMs, (ignored) -> {
205+
try {
206+
this.handleStop(stopPromise);
207+
} catch (Exception e) {
208+
stopPromise.fail(e);
209+
}
210+
});
211+
}
212+
213+
private void handleStop(Promise<Void> stopPromise) throws Exception {
196214
CompositeFuture.all(
197215
(this.httpServer != null ? this.httpServer.close().mapEmpty() : Future.succeededFuture()),
198216
(this.httpsServer != null ? this.httpsServer.close().mapEmpty() : Future.succeededFuture()),

data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public class Main {
6868

6969
private static final Logger logger = LoggerFactory.getLogger(Main.class);
7070

71+
private static final long receiverGracePeriodMs = 20_000; // 20 seconds, in milliseconds
72+
7173
/**
7274
* Start receiver.
7375
*
@@ -165,7 +167,8 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
165167
eventTypeClient,
166168
eventTypeLister,
167169
vertx,
168-
oidcDiscoveryConfig);
170+
oidcDiscoveryConfig,
171+
receiverGracePeriodMs);
169172
DeploymentOptions deploymentOptions =
170173
new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors());
171174
// Deploy the receiver verticles

data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class ReceiverVerticleFactory implements Supplier<Verticle> {
5252

5353
private ReactiveProducerFactory<String, CloudEvent> kafkaProducerFactory;
5454

55+
private final long terminationGracePeriodMs;
56+
5557
ReceiverVerticleFactory(
5658
final ReceiverEnv env,
5759
final Properties producerConfigs,
@@ -62,7 +64,8 @@ class ReceiverVerticleFactory implements Supplier<Verticle> {
6264
final MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient,
6365
final Lister<EventType> eventTypeLister,
6466
Vertx vertx,
65-
final OIDCDiscoveryConfig oidcDiscoveryConfig)
67+
final OIDCDiscoveryConfig oidcDiscoveryConfig,
68+
final long terminationGracePeriodMs)
6669
throws NoSuchAlgorithmException {
6770
{
6871
this.env = env;
@@ -75,6 +78,7 @@ class ReceiverVerticleFactory implements Supplier<Verticle> {
7578
new EventTypeCreatorImpl(eventTypeClient, eventTypeLister, vertx));
7679
this.kafkaProducerFactory = kafkaProducerFactory;
7780
this.oidcDiscoveryConfig = oidcDiscoveryConfig;
81+
this.terminationGracePeriodMs = terminationGracePeriodMs;
7882
}
7983
}
8084

@@ -90,6 +94,7 @@ public Verticle get() {
9094
properties -> kafkaProducerFactory.create(v, properties)),
9195
this.ingressRequestHandler,
9296
secretVolumePath,
93-
oidcDiscoveryConfig);
97+
oidcDiscoveryConfig,
98+
terminationGracePeriodMs);
9499
}
95100
}

data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ public void setUpHTTP(final Vertx vertx, final VertxTestContext testContext) {
151151
new IngressRequestHandlerImpl(
152152
StrictRequestToRecordMapper.getInstance(), registry, ((event, reference) -> null)),
153153
SECRET_VOLUME_PATH,
154-
null);
154+
null,
155+
0);
155156
vertx.deployVerticle(verticle, testContext.succeeding(ar -> testContext.completeNow()));
156157

157158
// Connect to the logger in ReceiverVerticle

data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ public void setup() throws ExecutionException, InterruptedException {
133133
new IngressRequestHandlerImpl(
134134
StrictRequestToRecordMapper.getInstance(), Metrics.getRegistry(), ((event, reference) -> null)),
135135
SECRET_VOLUME_PATH,
136-
null);
136+
null,
137+
0);
137138

138139
vertx.deployVerticle(verticle).toCompletionStage().toCompletableFuture().get();
139140
}

data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactoryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public void shouldCreateMultipleReceiverVerticleInstances(Vertx vertx) throws No
5656
mockClient.resources(EventType.class),
5757
mock(Lister.class),
5858
vertx,
59-
mock(OIDCDiscoveryConfig.class));
59+
mock(OIDCDiscoveryConfig.class),
60+
0);
6061

6162
assertThat(supplier.get()).isNotSameAs(supplier.get());
6263
}

data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ private ReceiverVerticle setUpReceiver(final Vertx vertx, final VertxTestContext
396396
Metrics.getRegistry(),
397397
(((event, reference) -> null))),
398398
SECRET_VOLUME_PATH,
399-
null);
399+
null,
400+
0);
400401

401402
final CountDownLatch latch = new CountDownLatch(1);
402403
vertx.deployVerticle(verticle, context.succeeding(h -> latch.countDown()));

0 commit comments

Comments
 (0)