Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ protected void doStop() throws Exception {
}
}

public SubscriptionHelper getSubscriptionHelper() throws Exception {
public SubscriptionHelper getSubscriptionHelper() {
if (subscriptionHelper == null) {
// lazily create subscription helper
subscriptionHelper = new SubscriptionHelper(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@

public class SubscriptionHelper extends ServiceSupport {

static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
private final ReplayExtension replayExtension = new ReplayExtension();

private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);

Expand Down Expand Up @@ -231,7 +231,7 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess
= firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
LOG.warn(error);
LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName);
REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
replayExtension.setReplayId(channelName, fallBackReplayId);
for (var consumer : consumers) {
subscribe(consumer);
}
Expand Down Expand Up @@ -408,7 +408,7 @@ protected HttpCookieStore getHttpCookieStore() {
BayeuxClient client = new BayeuxClient(getEndpointUrl(component), transport);

// added eagerly to check for support during handshake
client.addExtension(REPLAY_EXTENSION);
client.addExtension(component.getSubscriptionHelper().getReplayExtension());

return client;
}
Expand All @@ -434,6 +434,10 @@ public synchronized void subscribe(StreamingApiConsumer consumer) {
clientChannel.subscribe(messageListener);
}

ReplayExtension getReplayExtension() {
return replayExtension;
}

private static boolean isTemporaryError(Message message) {
String failureReason = getFailureReason(message);
return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
Expand All @@ -460,7 +464,7 @@ private void setReplayIdIfAbsent(final SalesforceEndpoint endpoint) {

final Long replayIdValue = replayId.get();

REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue);
replayExtension.setReplayIdIfAbsent(channelName, replayIdValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;

import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION;
import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
Expand Down Expand Up @@ -144,6 +143,8 @@ public void shouldNotLoginWhenAccessTokenIsNullAndLazyLoginIsTrue() throws Sales
when(component.getLoginConfig()).thenReturn(loginConfig);
when(component.getConfig()).thenReturn(endpointConfig);
when(component.getSession()).thenReturn(session);
final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component);
when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);

BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, session);

Expand All @@ -167,6 +168,8 @@ public void shouldLoginWhenAccessTokenIsNullAndLazyLoginIsFalse() throws Salesfo
when(component.getLoginConfig()).thenReturn(loginConfig);
when(component.getConfig()).thenReturn(endpointConfig);
when(component.getSession()).thenReturn(session);
final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component);
when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);

BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, session);

Expand All @@ -183,6 +186,8 @@ public void defaultLongPollingTimeoutShouldBeGreaterThanSalesforceTimeout() thro
when(component.getLoginConfig()).thenReturn(new SalesforceLoginConfig());
when(component.getConfig()).thenReturn(endpointConfig);
when(component.getSession()).thenReturn(session);
final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component);
when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
var bayeuxClient = SubscriptionHelper.createClient(component, session);

var longPollingTimeout = bayeuxClient.getTransport("long-polling").getOption(MAX_NETWORK_DELAY_OPTION);
Expand All @@ -207,20 +212,23 @@ public void fallbackReplyId() throws Exception {
when(endpoint.getReplayId()).thenReturn(null);
when(endpoint.getComponent()).thenReturn(component);
when(endpoint.getConfiguration()).thenReturn(endpointConfig);
final SubscriptionHelper subscriptionHelper = new SubscriptionHelper(component);
when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);

assertEquals(Optional.of(2L), determineReplayIdFor(endpoint, "my-topic-1"),
"Expecting replayId for `my-topic-1` to be 2, from initial reply id map");

REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L);
REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L);
ReplayExtension replayExtension = component.getSubscriptionHelper().getReplayExtension();
replayExtension.setReplayIdIfAbsent("my-topic-1", 3L);
replayExtension.setReplayIdIfAbsent("my-topic-1", 4L);

// should still be 3L
Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap");
Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION);
Field f = replayExtension.getClass().getDeclaredField("dataMap");
Map m = (Map) ReflectionHelper.getField(f, replayExtension);
assertEquals(3L, m.get("my-topic-1"));

// there is some subscription error due to INVALID_REPLAY_ID_PATTERN so we force setting another reply id
REPLAY_EXTENSION.setReplayId("my-topic-1", -2L);
replayExtension.setReplayId("my-topic-1", -2L);
assertEquals(-2L, m.get("my-topic-1"));
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
<camel-ref.tesb.version>4.8.1.20250320</camel-ref.tesb.version>
<camel-rest-openapi.tesb.version>4.8.1.20250320</camel-rest-openapi.tesb.version>
<camel-salesforce-codegen.tesb.version>4.8.1.20250320</camel-salesforce-codegen.tesb.version>
<camel-salesforce.tesb.version>4.8.1.20251127</camel-salesforce.tesb.version>
<camel-salesforce.tesb.version>4.8.1.20251213</camel-salesforce.tesb.version>
<camel-smb.tesb.version>4.8.1.20250320</camel-smb.tesb.version>
<camel-sql.tesb.version>4.8.1.20250320</camel-sql.tesb.version>
<camel-tracing.tesb.version>4.8.1.20250320</camel-tracing.tesb.version>
Expand Down