Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit eee7e20

Browse files
committed
Added ChannelPool to SendOptions with one implementation - LazyChannelPool
1 parent 8033d81 commit eee7e20

File tree

7 files changed

+441
-0
lines changed

7 files changed

+441
-0
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Channel;
20+
import reactor.core.publisher.Mono;
21+
import reactor.core.publisher.SignalType;
22+
23+
import java.util.function.BiConsumer;
24+
25+
public interface ChannelPool {
26+
27+
Mono<? extends Channel> getChannelMono();
28+
29+
BiConsumer<SignalType, Channel> getChannelCloseHandler();
30+
31+
void close();
32+
33+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Connection;
20+
import reactor.core.publisher.Mono;
21+
22+
public class ChannelPoolFactory {
23+
24+
public static ChannelPool createChannelPool(Mono<? extends Connection> connectionMono) {
25+
return createChannelPool(connectionMono, new ChannelPoolOptions());
26+
}
27+
28+
public static ChannelPool createChannelPool(Mono<? extends Connection> connectionMono, ChannelPoolOptions channelPoolOptions) {
29+
return new LazyChannelPool(connectionMono, channelPoolOptions);
30+
}
31+
32+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import reactor.core.scheduler.Scheduler;
20+
import reactor.core.scheduler.Schedulers;
21+
22+
public class ChannelPoolOptions {
23+
24+
private int maxSize = 5;
25+
26+
private Scheduler subscriptionScheduler = Schedulers.newElastic("sender-channel-pool");
27+
28+
public ChannelPoolOptions maxSize(int maxSize) {
29+
this.maxSize = maxSize;
30+
return this;
31+
}
32+
33+
public int getMaxSize() {
34+
return maxSize;
35+
}
36+
37+
public ChannelPoolOptions subscriptionScheduler(Scheduler subscriptionScheduler) {
38+
this.subscriptionScheduler = subscriptionScheduler;
39+
return this;
40+
}
41+
42+
public Scheduler getSubscriptionScheduler() {
43+
return subscriptionScheduler;
44+
}
45+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
import com.rabbitmq.client.Channel;
20+
import com.rabbitmq.client.Connection;
21+
import reactor.core.publisher.Mono;
22+
import reactor.core.publisher.SignalType;
23+
import reactor.core.scheduler.Scheduler;
24+
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.function.BiConsumer;
31+
32+
import static reactor.rabbitmq.ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE;
33+
34+
/**
35+
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxSize()} in low-traffic environments.
36+
* It always tries to obtain channel from the pool. However, in case of high-traffic number of channels might exceeds channel pool maximum size.
37+
*
38+
* Channels are added to the pool after their use {@link ChannelPool#getChannelCloseHandler()} and obtained from the pool when channel is requested {@link ChannelPool#getChannelMono()}.
39+
*
40+
* If pool is empty, new channel is created.
41+
* If channel is no longer needed and the channel pool is full (high-traffic), then channel is being closed.
42+
* If channel is no longer needed and the channel pool has not reached its capacity, then channel is added to the pool.
43+
*
44+
* It uses {@link BlockingQueue} internally in a non-blocking way.
45+
*
46+
*/
47+
class LazyChannelPool implements ChannelPool {
48+
49+
private final Mono<? extends Connection> connectionMono;
50+
private final BlockingQueue<Channel> channelsQueue;
51+
private final Scheduler subscriptionScheduler;
52+
53+
LazyChannelPool(Mono<? extends Connection> connectionMono, ChannelPoolOptions channelPoolOptions) {
54+
this.channelsQueue = new LinkedBlockingQueue<>(channelPoolOptions.getMaxSize());
55+
this.connectionMono = connectionMono;
56+
this.subscriptionScheduler = channelPoolOptions.getSubscriptionScheduler();
57+
}
58+
59+
public Mono<? extends Channel> getChannelMono() {
60+
return connectionMono.map(connection -> {
61+
Channel channel = channelsQueue.poll();
62+
if (channel == null) {
63+
channel = createChannel(connection);
64+
}
65+
return channel;
66+
}).subscribeOn(subscriptionScheduler);
67+
}
68+
69+
@Override
70+
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
71+
return (signalType, channel) -> {
72+
if (!channel.isOpen()) {
73+
return;
74+
}
75+
// maybe also close channel if signalType == SignalType.ON_ERROR ?
76+
boolean offer = channelsQueue.offer(channel);
77+
if (!offer) {
78+
SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(signalType, channel);
79+
}
80+
};
81+
}
82+
83+
@Override
84+
public void close() {
85+
List<Channel> channels = new ArrayList<>();
86+
channelsQueue.drainTo(channels);
87+
channels.forEach(channel -> {
88+
SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(SignalType.ON_COMPLETE, channel);
89+
});
90+
}
91+
92+
private Channel createChannel(Connection connection) {
93+
try {
94+
return connection.createChannel();
95+
} catch (IOException e) {
96+
throw new RabbitFluxException("Error while creating channel", e);
97+
}
98+
}
99+
100+
}

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,17 @@ public SendOptions channelCloseHandler(BiConsumer<SignalType, Channel> channelCl
9898
this.channelCloseHandler = channelCloseHandler;
9999
return this;
100100
}
101+
102+
/**
103+
* Set the channel pool to use to send messages.
104+
*
105+
* @param channelPool
106+
* @return this {@link SendOptions} instance
107+
* @since 1.1.0
108+
*/
109+
public SendOptions channelPool(ChannelPool channelPool) {
110+
this.channelMono = channelPool.getChannelMono();
111+
this.channelCloseHandler = channelPool.getChannelCloseHandler();
112+
return this;
113+
}
101114
}

0 commit comments

Comments
 (0)