diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java index 4671c82e4eae1..307e503f0c0b0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -119,6 +119,14 @@ public void testCreateFunctionInK8sWithSecrets() }); log.info("Function created successfully"); + log.info("Waiting for function to subscribe to input topic"); + Awaitility.await().ignoreExceptions().atMost(Duration.ofSeconds(30)) + .until(() -> { + admin.topics().getSubscriptions(inputTopicName); + return true; + }); + log.info("Function subscribed to input topic"); + // Validate that k8s secrets were provided as environment variables to the function pod String podName = "pf-%s-%s-%s-0".formatted(fnTenant, fnNamespace, fnName); Exec exec = new Exec(getApiClient()); @@ -157,12 +165,8 @@ public void testCreateFunctionInK8sWithSecrets() }); log.info("Starting function"); - // this seems to be flaky if the stopping of the function hasn't fully completed when it's started again. - // one way to reproduce is to remove the delay before starting the function and also removing the pollDelay - // from the await after stopFunction - Thread.sleep(2000); admin.functions().startFunction(fnTenant, fnNamespace, fnName); - Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)) + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(40)) .untilAsserted(() -> { FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); assertThat(functionStatus.getNumInstances()).isEqualTo(1);