Skip to content

Commit 51f04cb

Browse files
author
Bernd Warmuth
committed
cleanup
Signed-off-by: Bernd Warmuth <[email protected]>
1 parent 38f8e89 commit 51f04cb

File tree

4 files changed

+3
-96
lines changed

4 files changed

+3
-96
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java

Lines changed: 0 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
import io.grpc.ConnectivityState;
55
import io.grpc.ManagedChannel;
66
import java.util.concurrent.CountDownLatch;
7-
import java.util.concurrent.ScheduledExecutorService;
8-
import java.util.concurrent.ScheduledFuture;
97
import java.util.concurrent.TimeUnit;
10-
import java.util.concurrent.atomic.AtomicReference;
118
import lombok.extern.slf4j.Slf4j;
129

1310
/**
@@ -98,78 +95,4 @@ private static void waitForDesiredState(
9895
"Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout));
9996
}
10097
}
101-
102-
/**
103-
* Polls the state of a gRPC channel at regular intervals and triggers callbacks upon state changes.
104-
*
105-
* @param executor the ScheduledExecutorService used for polling.
106-
* @param channel the ManagedChannel to monitor.
107-
* @param onConnectionReady callback invoked when the channel transitions to a READY state.
108-
* @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state.
109-
* @param pollIntervalMs the polling interval in milliseconds.
110-
*/
111-
public static void pollChannelState(
112-
ScheduledExecutorService executor,
113-
ManagedChannel channel,
114-
Runnable onConnectionReady,
115-
Runnable onConnectionLost,
116-
long pollIntervalMs) {
117-
118-
AtomicReference<ConnectivityState> lastState = new AtomicReference<>(ConnectivityState.READY);
119-
120-
Runnable pollTask = () -> {
121-
ConnectivityState currentState = channel.getState(true);
122-
if (currentState != lastState.get()) {
123-
if (currentState == ConnectivityState.READY) {
124-
log.debug("gRPC connection became READY");
125-
onConnectionReady.run();
126-
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
127-
|| currentState == ConnectivityState.SHUTDOWN) {
128-
log.debug("gRPC connection became TRANSIENT_FAILURE");
129-
onConnectionLost.run();
130-
}
131-
lastState.set(currentState);
132-
}
133-
};
134-
executor.scheduleAtFixedRate(pollTask, 0, pollIntervalMs, TimeUnit.MILLISECONDS);
135-
}
136-
137-
/**
138-
* Polls the channel state at fixed intervals and waits for the channel to reach a desired state within a timeout
139-
* period.
140-
*
141-
* @param executor the ScheduledExecutorService used for polling.
142-
* @param channel the ManagedChannel to monitor.
143-
* @param desiredState the ConnectivityState to wait for.
144-
* @param connectCallback callback invoked when the desired state is reached.
145-
* @param timeout the maximum amount of time to wait.
146-
* @param unit the time unit of the timeout.
147-
* @return {@code true} if the desired state was reached within the timeout period, {@code false} otherwise.
148-
* @throws InterruptedException if the current thread is interrupted while waiting.
149-
*/
150-
public static boolean pollForDesiredState(
151-
ScheduledExecutorService executor,
152-
ManagedChannel channel,
153-
ConnectivityState desiredState,
154-
Runnable connectCallback,
155-
long timeout,
156-
TimeUnit unit)
157-
throws InterruptedException {
158-
CountDownLatch latch = new CountDownLatch(1);
159-
160-
Runnable waitForStateTask = () -> {
161-
ConnectivityState currentState = channel.getState(true);
162-
if (currentState == desiredState) {
163-
connectCallback.run();
164-
latch.countDown();
165-
}
166-
};
167-
168-
ScheduledFuture<?> scheduledFuture =
169-
executor.scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS);
170-
171-
boolean success = latch.await(timeout, unit);
172-
scheduledFuture.cancel(true);
173-
return success;
174-
}
17598
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) {
115115
* Handles provider readiness events by clearing the cache (if enabled) and notifying listeners of readiness.
116116
*/
117117
private void handleProviderReadyEvent() {
118-
this.onConnectionEvent.accept(true, Collections.emptyList()); // TODO: check if this is needed
119118
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
120119
this.cache.clear();
121120
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdRpcReconnectCucumberTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,15 @@
66
import org.apache.logging.log4j.core.config.Order;
77
import org.junit.platform.suite.api.ConfigurationParameter;
88
import org.junit.platform.suite.api.IncludeEngines;
9-
import org.junit.platform.suite.api.SelectClasspathResource;
9+
import org.junit.platform.suite.api.SelectFile;
1010
import org.junit.platform.suite.api.Suite;
1111
import org.testcontainers.junit.jupiter.Testcontainers;
1212

13-
/**
14-
* Class for running the reconnection tests for the RPC provider
15-
*/
13+
/** Class for running the reconnection tests for the RPC provider */
1614
@Order(value = Integer.MAX_VALUE)
1715
@Suite
1816
@IncludeEngines("cucumber")
19-
@SelectClasspathResource("features/flagd-reconnect.feature")
20-
@SelectClasspathResource("features/events.feature")
17+
@SelectFile("test-harness/gherkin/flagd-reconnect.feature")
2118
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
2219
@ConfigurationParameter(
2320
key = GLUE_PROPERTY_NAME,

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,6 @@ public void change() {
5656
verify(cache, atLeast(1)).clear();
5757
}
5858

59-
@Test
60-
public void ready() {
61-
EventStreamResponse resp = mock(EventStreamResponse.class);
62-
when(resp.getType()).thenReturn("provider_ready");
63-
stream.onNext(resp);
64-
// we notify that we are ready
65-
assertEquals(1, states.size());
66-
assertTrue(states.get(0));
67-
// cache was cleaned
68-
verify(cache, atLeast(1)).clear();
69-
}
70-
7159
@Test
7260
public void cacheBustingForKnownKeys() {
7361
final String key1 = "myKey1";

0 commit comments

Comments
 (0)