Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit bacc365

Browse files
committed
Add Javadoc for channel pooling
References #47
1 parent 8bb9e0b commit bacc365

File tree

10 files changed

+146
-17
lines changed

10 files changed

+146
-17
lines changed

src/jmh/java/reactor/rabbitmq/SenderBenchmark.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package reactor.rabbitmq;
218

319
import com.rabbitmq.client.Connection;

src/jmh/java/reactor/rabbitmq/SenderBenchmarkUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package reactor.rabbitmq;
218

319
import com.rabbitmq.client.Channel;

src/jmh/java/reactor/rabbitmq/SenderWithLazyChannelPoolBenchmark.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package reactor.rabbitmq;
218

319
import com.rabbitmq.client.Connection;

src/main/java/reactor/rabbitmq/ChannelPool.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,17 +17,44 @@
1717
package reactor.rabbitmq;
1818

1919
import com.rabbitmq.client.Channel;
20+
import org.reactivestreams.Publisher;
2021
import reactor.core.publisher.Mono;
2122
import reactor.core.publisher.SignalType;
2223

2324
import java.util.function.BiConsumer;
2425

25-
public interface ChannelPool {
26+
/**
27+
* Contract to obtain a {@link Channel} and close when sending messages.
28+
* <p>
29+
* Implementations would typically make it possible to re-use
30+
* {@link Channel}s across different calls to <code>Sender#send*</code> methods.
31+
*
32+
* @see SendOptions
33+
* @see Sender#send(Publisher)
34+
* @see Sender#send(Publisher, SendOptions)
35+
* @see Sender#sendWithPublishConfirms(Publisher)
36+
* @see Sender#send(Publisher, SendOptions)
37+
* @since 1.1.0
38+
*/
39+
public interface ChannelPool extends AutoCloseable {
2640

41+
/**
42+
* The {@link Channel} to use for sending a flux of messages.
43+
*
44+
* @return
45+
*/
2746
Mono<? extends Channel> getChannelMono();
2847

48+
/**
49+
* The closing logic when the {@link Channel} is disposed.
50+
*
51+
* @return
52+
*/
2953
BiConsumer<SignalType, Channel> getChannelCloseHandler();
3054

55+
/**
56+
* Close the pool when it is no longer necessary.
57+
*/
3158
void close();
3259

3360
}

src/main/java/reactor/rabbitmq/ChannelPoolFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,14 @@
1919
import com.rabbitmq.client.Connection;
2020
import reactor.core.publisher.Mono;
2121

22+
/**
23+
* Factory to create default {@link ChannelPool} instances.
24+
*
25+
* @see ChannelPool
26+
* @see LazyChannelPool
27+
* @see ChannelPoolOptions
28+
* @since 1.1.0
29+
*/
2230
public class ChannelPoolFactory {
2331

2432
public static ChannelPool createChannelPool(Mono<? extends Connection> connectionMono) {

src/main/java/reactor/rabbitmq/ChannelPoolOptions.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,12 +18,27 @@
1818

1919
import reactor.core.scheduler.Scheduler;
2020

21+
/**
22+
* Options to when creating a {@link ChannelPool}.
23+
*
24+
* @see ChannelPoolFactory
25+
* @see LazyChannelPool
26+
* @since 1.1.0
27+
*/
2128
public class ChannelPoolOptions {
2229

2330
private Integer maxCacheSize;
2431

2532
private Scheduler subscriptionScheduler;
2633

34+
/**
35+
* Set the maximum size of the pool.
36+
* <p>
37+
* Default is 5 channels.
38+
*
39+
* @param maxCacheSize
40+
* @return this {@link ChannelPoolOptions} instance
41+
*/
2742
public ChannelPoolOptions maxCacheSize(int maxCacheSize) {
2843
this.maxCacheSize = maxCacheSize;
2944
return this;
@@ -33,6 +48,12 @@ public Integer getMaxCacheSize() {
3348
return maxCacheSize;
3449
}
3550

51+
/**
52+
* Set the scheduler to use when opening {@link com.rabbitmq.client.Channel}s.
53+
*
54+
* @param subscriptionScheduler
55+
* @return this {@link ChannelPoolOptions} instance
56+
*/
3657
public ChannelPoolOptions subscriptionScheduler(Scheduler subscriptionScheduler) {
3758
this.subscriptionScheduler = subscriptionScheduler;
3859
return this;

src/main/java/reactor/rabbitmq/LazyChannelPool.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,17 +33,24 @@
3333
import static reactor.rabbitmq.ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE;
3434

3535
/**
36-
* This channel pool is lazy initialized. It might even not reach its maximum size {@link ChannelPoolOptions#getMaxCacheSize()} in low-concurrency environments.
37-
* It always tries to obtain channel from the pool. However, in case of high-concurrency environments, number of channels might exceeds channel pool maximum size.
38-
*
39-
* Channels are added to the pool after their use {@link ChannelPool#getChannelCloseHandler()} and obtained from the pool when channel is requested {@link ChannelPool#getChannelMono()}.
40-
*
41-
* If pool is empty, new channel is created.
42-
* If channel is no longer needed and the channel pool is full, then channel is being closed.
43-
* If channel is no longer needed and the channel pool has not reached its capacity, then channel is added to the pool.
44-
*
36+
* Default implementation of {@link ChannelPool}.
37+
* <p>
38+
* This channel pool is lazy initialized. It might even not reach its maximum size
39+
* {@link ChannelPoolOptions#getMaxCacheSize()} in low-concurrency environments.
40+
* It always tries to obtain a channel from the pool. However, in case of high-concurrency environments,
41+
* number of channels might exceed channel pool maximum size.
42+
* <p>
43+
* Channels are added to the pool after their use {@link ChannelPool#getChannelCloseHandler()}
44+
* and obtained from the pool when channel is requested {@link ChannelPool#getChannelMono()}.
45+
* <p>
46+
* If the pool is empty, a new channel is created.
47+
* If a channel is no longer needed and the channel pool is full, then the channel is being closed.
48+
* If a channel is no longer needed and the channel pool has not reached its
49+
* capacity, then the channel is added to the pool.
50+
* <p>
4551
* It uses {@link BlockingQueue} internally in a non-blocking way.
4652
*
53+
* @since 1.1.0
4754
*/
4855
class LazyChannelPool implements ChannelPool {
4956

@@ -70,7 +77,7 @@ public Mono<? extends Channel> getChannelMono() {
7077
}
7178
return channel;
7279
})
73-
.subscribeOn(subscriptionScheduler);
80+
.subscribeOn(subscriptionScheduler);
7481
}
7582

7683
@Override

src/main/java/reactor/rabbitmq/SendOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ public SendOptions channelCloseHandler(BiConsumer<SignalType, Channel> channelCl
101101

102102
/**
103103
* Set the channel pool to use to send messages.
104+
* <p>
105+
* It is developer's responsibility to close it if set.
104106
*
105107
* @param channelPool
106108
* @return this {@link SendOptions} instance

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMes
156156
return channel;
157157
})
158158
.flatMapMany(channel -> Flux.from(new PublishConfirmOperator(messages, channel, sendOptions)).doFinally(signalType -> {
159-
// channel close is no longer a responsibility of PublishConfirmOperator, not sure whether it is a correct approach
160-
// added to avoid creating threads inside PublishConfirmOperator, which make ChannelPool useless
159+
// channel closing is done here, to avoid creating threads inside PublishConfirmOperator,
160+
// which would make ChannelPool useless
161161
if (signalType == SignalType.ON_ERROR) {
162162
channelCloseHandler.accept(signalType, channel);
163163
} else {

src/test/java/reactor/rabbitmq/LazyChannelPoolTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2019 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package reactor.rabbitmq;
218

319
import com.rabbitmq.client.AMQP;

0 commit comments

Comments
 (0)