From e6361b95bebc9f5e31a0b6255b14d8d40243ef52 Mon Sep 17 00:00:00 2001 From: Philipp Dolif Date: Mon, 22 Dec 2025 18:20:40 +0100 Subject: [PATCH 1/2] Wait for function to subscribe to input topic --- .../integration/functions/k8s/PulsarFunctionsK8STest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java index 4671c82e4eae1..bcfc16d24307f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -119,6 +119,14 @@ public void testCreateFunctionInK8sWithSecrets() }); log.info("Function created successfully"); + log.info("Waiting for function to subscribe to input topic"); + Awaitility.await().ignoreExceptions().atMost(Duration.ofSeconds(30)) + .until(() -> { + admin.topics().getSubscriptions(inputTopicName); + return true; + }); + log.info("Function subscribed to input topic"); + // Validate that k8s secrets were provided as environment variables to the function pod String podName = "pf-%s-%s-%s-0".formatted(fnTenant, fnNamespace, fnName); Exec exec = new Exec(getApiClient()); From e028e3d4c2195c86c244a13257f06426f66364ae Mon Sep 17 00:00:00 2001 From: Philipp Dolif Date: Mon, 22 Dec 2025 18:31:06 +0100 Subject: [PATCH 2/2] Increase wait time for function to start a second time --- .../integration/functions/k8s/PulsarFunctionsK8STest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java index bcfc16d24307f..307e503f0c0b0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -165,12 +165,8 @@ public void testCreateFunctionInK8sWithSecrets() }); log.info("Starting function"); - // this seems to be flaky if the stopping of the function hasn't fully completed when it's started again. - // one way to reproduce is to remove the delay before starting the function and also removing the pollDelay - // from the await after stopFunction - Thread.sleep(2000); admin.functions().startFunction(fnTenant, fnNamespace, fnName); - Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)) + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(40)) .untilAsserted(() -> { FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); assertThat(functionStatus.getNumInstances()).isEqualTo(1);