Skip to content

Commit 36eba4c

Browse files
authored
Messaging: handle proxy connection close header (Azure#45698)
* use unreleased core-amqp version * adding support for respond to proxy challenge in new connection * synching the qpid, core-amqp versions * update the changelog * updating qpid-extensions version in bannedDependencies * sync qpid-extensions, core-amqp versions in more places * changing the tests to accomodate the changes * fix checkstyle
1 parent f862b55 commit 36eba4c

File tree

13 files changed

+60
-18
lines changed

13 files changed

+60
-18
lines changed

eng/versioning/external_dependencies.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ com.microsoft.azure:azure-client-authentication;1.7.14
3232
com.microsoft.azure:azure-client-runtime;1.7.14
3333
com.microsoft.azure:azure-core;0.9.8
3434
com.microsoft.azure:azure-keyvault-cryptography;1.2.2
35-
com.microsoft.azure:qpid-proton-j-extensions;1.2.5
35+
com.microsoft.azure:qpid-proton-j-extensions;1.2.6
3636
com.microsoft.sqlserver:mssql-jdbc;10.2.3.jre8
3737
com.microsoft.azure:azure-functions-maven-plugin;1.30.0
3838
com.microsoft.azure.functions:azure-functions-java-library;2.2.0

eng/versioning/version_client.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ io.clientcore:annotation-processor-test;1.0.0-beta.1;1.0.0-beta.1
530530
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
531531
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
532532

533+
unreleased_com.azure:azure-core-amqp;2.11.0-beta.1
533534
unreleased_com.azure.v2:azure-core;2.0.0-beta.1
534535
unreleased_com.azure.v2:azure-identity;2.0.0-beta.1
535536
unreleased_io.clientcore:annotation-processor;1.0.0-beta.4

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,18 @@
44

55
### Features Added
66

7+
- Added functionality to resume proxy handshake on a new connection if the challenge response closes the existing connection. ([45698](https://github.com/Azure/azure-sdk-for-java/pull/45698)) ([69](https://github.com/Azure/qpid-proton-j-extensions/pull/69))
8+
79
### Breaking Changes
810

911
### Bugs Fixed
1012

1113
### Other Changes
1214

15+
#### Dependency Updates
16+
17+
- Upgraded `qpid-proton-j-extensions` from `1.2.5` to `1.2.6`.
18+
1319
## 2.10.1 (2025-06-26)
1420

1521
### Other Changes

sdk/core/azure-core-amqp/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
<dependency>
8383
<groupId>com.microsoft.azure</groupId>
8484
<artifactId>qpid-proton-j-extensions</artifactId>
85-
<version>1.2.5</version> <!-- {x-version-update;com.microsoft.azure:qpid-proton-j-extensions;external_dependency} -->
85+
<version>1.2.6</version> <!-- {x-version-update;com.microsoft.azure:qpid-proton-j-extensions;external_dependency} -->
8686
</dependency>
8787
<dependency>
8888
<groupId>org.apache.qpid</groupId>
@@ -128,7 +128,7 @@
128128
<rules>
129129
<bannedDependencies>
130130
<includes>
131-
<include>com.microsoft.azure:qpid-proton-j-extensions:[1.2.5]</include> <!-- {x-include-update;com.microsoft.azure:qpid-proton-j-extensions;external_dependency} -->
131+
<include>com.microsoft.azure:qpid-proton-j-extensions:[1.2.6]</include> <!-- {x-include-update;com.microsoft.azure:qpid-proton-j-extensions;external_dependency} -->
132132
<include>org.apache.qpid:proton-j:[0.34.1]</include> <!-- {x-include-update;org.apache.qpid:proton-j;external_dependency} -->
133133
</includes>
134134
</bannedDependencies>

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,13 @@ public Mono<ReactorConnection> connectAndAwaitToActive() {
215215
}).thenReturn(this);
216216
}
217217

218+
public void transferState(ReactorConnection fromConnection) {
219+
if (fromConnection == null) {
220+
return;
221+
}
222+
this.handler.transferState(fromConnection.handler);
223+
}
224+
218225
/**
219226
* {@inheritDoc}
220227
*/

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Objects;
1919
import java.util.concurrent.RejectedExecutionException;
2020
import java.util.concurrent.TimeoutException;
21+
import java.util.concurrent.atomic.AtomicReference;
2122
import java.util.function.Supplier;
2223

2324
import static com.azure.core.amqp.implementation.ClientConstants.CONNECTION_ID_KEY;
@@ -47,6 +48,7 @@ public final class ReactorConnectionCache<T extends ReactorConnection> implement
4748
// any dependent type; instead, the dependent type must acquire Connection only through the cache route,
4849
// i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter.
4950
private volatile T currentConnection;
51+
private final State state = new State();
5052

5153
/**
5254
* Create a ReactorConnectionCache that is responsible for obtaining a connection, waiting for it to active,
@@ -86,6 +88,7 @@ public ReactorConnectionCache(Supplier<T> connectionSupplier, String fullyQualif
8688
});
8789

8890
this.createOrGetCachedConnection = newConnection.flatMap(c -> {
91+
state.transfer(c);
8992
withConnectionId(logger, c.getId()).log("Waiting to connect and active.");
9093

9194
return c.connectAndAwaitToActive().doOnCancel(() -> {
@@ -289,4 +292,15 @@ private static void closeConnection(ReactorConnection c, ClientLogger logger, St
289292
private static LoggingEventBuilder withConnectionId(ClientLogger logger, String id) {
290293
return logger.atInfo().addKeyValue(CONNECTION_ID_KEY, id);
291294
}
295+
296+
private static final class State {
297+
private final AtomicReference<ReactorConnection> s = new AtomicReference<>(null);
298+
299+
void transfer(ReactorConnection c) {
300+
final ReactorConnection from = s.getAndSet(c);
301+
if (from != null) {
302+
c.transferState(from);
303+
}
304+
}
305+
}
292306
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ public ConnectionHandler(final String connectionId, final ConnectionOptions conn
131131
this.enableSsl = enableSsl;
132132
}
133133

134+
public void transferState(ConnectionHandler fromHandler) {
135+
// No state to transfer in ConnectionHandler
136+
}
137+
134138
/**
135139
* Gets properties to add when creating AMQP connection.
136140
*

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandler.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class WebSocketsProxyConnectionHandler extends WebSocketsConnectionHandle
5656
* or Service Bus.
5757
*/
5858
private final String connectHostnameAndPort;
59+
private final ProxyImpl proxy;
5960

6061
/**
6162
* Creates a handler that handles proton-j's connection through a proxy using web sockets.
@@ -97,6 +98,12 @@ public WebSocketsProxyConnectionHandler(String connectionId, ConnectionOptions c
9798
final Proxy proxy = proxies.get(0);
9899
this.proxyHostAddress = (InetSocketAddress) proxy.address();
99100
}
101+
102+
if (proxyOptions == ProxyOptions.SYSTEM_DEFAULTS) {
103+
this.proxy = new ProxyImpl();
104+
} else {
105+
this.proxy = new ProxyImpl(getProtonConfiguration());
106+
}
100107
}
101108

102109
/**
@@ -120,6 +127,16 @@ public static boolean shouldUseProxy(final String hostname, final int port) {
120127
return isProxyAddressLegal(proxies);
121128
}
122129

130+
@Override
131+
public void transferState(ConnectionHandler fromHandler) {
132+
if (fromHandler instanceof WebSocketsProxyConnectionHandler) {
133+
final WebSocketsProxyConnectionHandler wsHandler = (WebSocketsProxyConnectionHandler) fromHandler;
134+
final ProxyImpl fromProxy = wsHandler.proxy;
135+
this.proxy.transferState(fromProxy);
136+
}
137+
super.transferState(fromHandler);
138+
}
139+
123140
/**
124141
* Gets the hostname for the proxy.
125142
*
@@ -205,11 +222,6 @@ public void onTransportError(Event event) {
205222
protected void addTransportLayers(final Event event, final TransportInternal transport) {
206223
super.addTransportLayers(event, transport);
207224

208-
// Checking that the proxy configuration is not null and not equal to the system defaults option.
209-
final ProxyImpl proxy = proxyOptions != null && !(proxyOptions == ProxyOptions.SYSTEM_DEFAULTS)
210-
? new ProxyImpl(getProtonConfiguration())
211-
: new ProxyImpl();
212-
213225
final ProxyHandler proxyHandler = new ProxyHandlerImpl();
214226
proxy.configure(connectHostnameAndPort, null, proxyHandler, transport);
215227

sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/WebSocketsProxyConnectionHandlerTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,10 @@ public void proxyConfigureConnectHostnameAndPortDerivesFromFqdn() {
201201
final Proxy newProxy = new Proxy(Proxy.Type.HTTP, address);
202202
final ProxyOptions proxyOptions = new ProxyOptions(ProxyAuthenticationType.BASIC, newProxy, USERNAME, PASSWORD);
203203

204-
this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, connectionOptions, proxyOptions, peerDetails,
205-
AmqpMetricsProvider.noop());
206-
207204
// Act and Assert
208205
try (MockedConstruction<ProxyImpl> mockConstruction = mockConstruction(ProxyImpl.class)) {
206+
this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, connectionOptions, proxyOptions,
207+
peerDetails, AmqpMetricsProvider.noop());
209208
this.handler.addTransportLayers(mock(Event.class, Mockito.CALLS_REAL_METHODS),
210209
mock(TransportImpl.class, Mockito.CALLS_REAL_METHODS));
211210

@@ -236,11 +235,10 @@ public void proxyConfigureConnectHostnameAndPortDerivesFromCustomEndpoint() {
236235
AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), ProxyOptions.SYSTEM_DEFAULTS, scheduler,
237236
CLIENT_OPTIONS, VERIFY_MODE, PRODUCT, CLIENT_VERSION, customEndpointHostname, customEndpointPort, true);
238237

239-
this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, connectionOptionsWithCustomEndpoint,
240-
proxyOptions, peerDetails, AmqpMetricsProvider.noop());
241-
242238
// Act and Assert
243239
try (MockedConstruction<ProxyImpl> mockConstruction = mockConstruction(ProxyImpl.class)) {
240+
this.handler = new WebSocketsProxyConnectionHandler(CONNECTION_ID, connectionOptionsWithCustomEndpoint,
241+
proxyOptions, peerDetails, AmqpMetricsProvider.noop());
244242
this.handler.addTransportLayers(mock(Event.class, Mockito.CALLS_REAL_METHODS),
245243
mock(TransportImpl.class, Mockito.CALLS_REAL_METHODS));
246244

sdk/eventhubs/azure-messaging-eventhubs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<dependency>
5454
<groupId>com.azure</groupId>
5555
<artifactId>azure-core-amqp</artifactId>
56-
<version>2.10.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
56+
<version>2.11.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
5757
</dependency>
5858

5959
<!-- Test dependencies -->

0 commit comments

Comments
 (0)