diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index 09491f03cf78f..d4c9c8f5bcab5 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -500,7 +500,7 @@ protected void doStop() throws Exception { } } - public SubscriptionHelper getSubscriptionHelper() throws Exception { + public SubscriptionHelper getSubscriptionHelper() { if (subscriptionHelper == null) { // lazily create subscription helper subscriptionHelper = new SubscriptionHelper(this); diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 92cebfce8ea6c..5265b8564e9c8 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -65,7 +65,7 @@ public class SubscriptionHelper extends ServiceSupport { - static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension(); + private final ReplayExtension replayExtension = new ReplayExtension(); private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class); @@ -231,7 +231,7 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess = firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId(); LOG.warn(error); LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName); - REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId); + replayExtension.setReplayId(channelName, fallBackReplayId); for (var consumer : consumers) { subscribe(consumer); } @@ -408,7 +408,7 @@ protected HttpCookieStore getHttpCookieStore() { BayeuxClient client = new BayeuxClient(getEndpointUrl(component), transport); // added eagerly to check for support during handshake - client.addExtension(REPLAY_EXTENSION); + client.addExtension(component.getSubscriptionHelper().getReplayExtension()); return client; } @@ -434,6 +434,10 @@ public synchronized void subscribe(StreamingApiConsumer consumer) { clientChannel.subscribe(messageListener); } + ReplayExtension getReplayExtension() { + return replayExtension; + } + private static boolean isTemporaryError(Message message) { String failureReason = getFailureReason(message); return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR); @@ -460,7 +464,7 @@ private void setReplayIdIfAbsent(final SalesforceEndpoint endpoint) { final Long replayIdValue = replayId.get(); - REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue); + replayExtension.setReplayIdIfAbsent(channelName, replayIdValue); } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java index fd72d7345c86a..963e9d8fbe6bb 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java @@ -34,7 +34,6 @@ import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.Test; -import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION; import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor; import static org.assertj.core.api.Assertions.assertThat; import static org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION; @@ -144,6 +143,8 @@ public void shouldNotLoginWhenAccessTokenIsNullAndLazyLoginIsTrue() throws Sales when(component.getLoginConfig()).thenReturn(loginConfig); when(component.getConfig()).thenReturn(endpointConfig); when(component.getSession()).thenReturn(session); + final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component); + when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper); BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, session); @@ -167,6 +168,8 @@ public void shouldLoginWhenAccessTokenIsNullAndLazyLoginIsFalse() throws Salesfo when(component.getLoginConfig()).thenReturn(loginConfig); when(component.getConfig()).thenReturn(endpointConfig); when(component.getSession()).thenReturn(session); + final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component); + when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper); BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, session); @@ -183,6 +186,8 @@ public void defaultLongPollingTimeoutShouldBeGreaterThanSalesforceTimeout() thro when(component.getLoginConfig()).thenReturn(new SalesforceLoginConfig()); when(component.getConfig()).thenReturn(endpointConfig); when(component.getSession()).thenReturn(session); + final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component); + when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper); var bayeuxClient = SubscriptionHelper.createClient(component, session); var longPollingTimeout = bayeuxClient.getTransport("long-polling").getOption(MAX_NETWORK_DELAY_OPTION); @@ -207,20 +212,23 @@ public void fallbackReplyId() throws Exception { when(endpoint.getReplayId()).thenReturn(null); when(endpoint.getComponent()).thenReturn(component); when(endpoint.getConfiguration()).thenReturn(endpointConfig); + final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component); + when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper); assertEquals(Optional.of(2L), determineReplayIdFor(endpoint, "my-topic-1"), "Expecting replayId for `my-topic-1` to be 2, from initial reply id map"); - REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L); - REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L); + ReplayExtension replayExtension = component.getSubscriptionHelper().getReplayExtension(); + replayExtension.setReplayIdIfAbsent("my-topic-1", 3L); + replayExtension.setReplayIdIfAbsent("my-topic-1", 4L); // should still be 3L - Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap"); - Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION); + Field f = replayExtension.getClass().getDeclaredField("dataMap"); + Map m = (Map) ReflectionHelper.getField(f, replayExtension); assertEquals(3L, m.get("my-topic-1")); // there is some subscription error due to INVALID_REPLAY_ID_PATTERN so we force setting another reply id - REPLAY_EXTENSION.setReplayId("my-topic-1", -2L); + replayExtension.setReplayId("my-topic-1", -2L); assertEquals(-2L, m.get("my-topic-1")); } diff --git a/pom.xml b/pom.xml index e230011d12be6..a0bdf2bfaea7d 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ 4.8.1.20250320 4.8.1.20250320 4.8.1.20250320 - 4.8.1.20251127 + 4.8.1.20251213 4.8.1.20250320 4.8.1.20250320 4.8.1.20250320