Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit bcf3fd6

Browse files
committed
Re-use transform because it is no longer deprecated
Per discussion with @simonbasle
1 parent d24a24e commit bcf3fd6

File tree

3 files changed

+6
-6
lines changed

3 files changed

+6
-6
lines changed

src/main/java/reactor/rabbitmq/Receiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Receiver(ReceiverOptions options) {
8080
cm = options.getConnectionMonoConfigurator().apply(cm);
8181
cm = cm.doOnNext(conn -> connection.set(conn))
8282
.subscribeOn(this.connectionSubscriptionScheduler)
83-
.composeNow(this::cache);
83+
.transform(this::cache);
8484
} else {
8585
cm = options.getConnectionMono();
8686
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Sender(SenderOptions options) {
110110
cm = options.getConnectionMonoConfigurator().apply(cm);
111111
cm = cm.doOnNext(conn -> connection.set(conn))
112112
.subscribeOn(this.connectionSubscriptionScheduler)
113-
.composeNow(this::cache);
113+
.transform(this::cache);
114114
} else {
115115
cm = options.getConnectionMono();
116116
}
@@ -124,7 +124,7 @@ public Sender(SenderOptions options) {
124124
this.resourceManagementScheduler = options.getResourceManagementScheduler() == null ?
125125
createScheduler("rabbitmq-sender-resource-creation") : options.getResourceManagementScheduler();
126126
this.resourceManagementChannelMono = options.getResourceManagementChannelMono() == null ?
127-
connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).composeNow(this::cache) : options.getResourceManagementChannelMono();
127+
connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).transform(this::cache) : options.getResourceManagementChannelMono();
128128
if (options.getConnectionClosingTimeout() != null && !Duration.ZERO.equals(options.getConnectionClosingTimeout())) {
129129
this.connectionClosingTimeout = (int) options.getConnectionClosingTimeout().toMillis();
130130
} else {
@@ -250,11 +250,11 @@ private BiConsumer<SignalType, Channel> getChannelCloseHandler(SendOptions optio
250250
}
251251

252252
public RpcClient rpcClient(String exchange, String routingKey) {
253-
return new RpcClient(connectionMono.map(CHANNEL_CREATION_FUNCTION).composeNow(this::cache), exchange, routingKey);
253+
return new RpcClient(connectionMono.map(CHANNEL_CREATION_FUNCTION).transform(this::cache), exchange, routingKey);
254254
}
255255

256256
public RpcClient rpcClient(String exchange, String routingKey, Supplier<String> correlationIdProvider) {
257-
return new RpcClient(connectionMono.map(CHANNEL_CREATION_FUNCTION).composeNow(this::cache), exchange, routingKey, correlationIdProvider);
257+
return new RpcClient(connectionMono.map(CHANNEL_CREATION_FUNCTION).transform(this::cache), exchange, routingKey, correlationIdProvider);
258258
}
259259

260260
/**

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1107,7 +1107,7 @@ public void shovel() throws Exception {
11071107
sourceQueue,
11081108
new ConsumeOptions().stopConsumingBiFunction((emitter, msg) -> Integer.parseInt(new String(msg.getBody())) == nbMessages - 1)
11091109
).map(delivery -> new OutboundMessage("", destinationQueue, delivery.getBody()))
1110-
.composeNow(messages -> sender.send(messages)))
1110+
.transform(messages -> sender.send(messages)))
11111111
.thenMany(receiver.consumeNoAck(destinationQueue)).subscribe(msg -> {
11121112
counter.incrementAndGet();
11131113
latch.countDown();

0 commit comments

Comments
 (0)