Skip to content

Commit 4a13ebd

Browse files
committed
OutboundMessageQueue internal refactoring (extracting a method).
1 parent 9b7be94 commit 4a13ebd

File tree

5 files changed

+109
-67
lines changed

5 files changed

+109
-67
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import io.netty.buffer.Unpooled;
1616
import io.netty.channel.EventLoop;
1717
import io.netty.handler.codec.http2.EmptyHttp2Headers;
18-
import io.netty.handler.codec.http2.Http2Exception;
1918
import io.netty.handler.codec.http2.Http2Headers;
2019
import io.netty.handler.codec.http2.Http2Stream;
2120
import io.netty.util.concurrent.FutureListener;
@@ -156,7 +155,7 @@ void onData(Buffer data) {
156155
void onWritabilityChanged() {
157156
writable = !writable;
158157
if (writable) {
159-
outboundQueue.drain();
158+
outboundQueue.tryDrain();
160159
}
161160
}
162161

vertx-core/src/main/java/io/vertx/core/internal/concurrent/OutboundMessageChannel.java

Lines changed: 48 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class OutboundMessageChannel<M> implements Predicate<M> {
2020
private volatile boolean eventuallyClosed;
2121

2222
// State accessed exclusively by the event loop thread
23-
private boolean overflow;
23+
private boolean overflow; // Indicates channel ownership
2424
private boolean closed;
2525
private int reentrant = 0;
2626

@@ -29,19 +29,21 @@ public class OutboundMessageChannel<M> implements Predicate<M> {
2929
*
3030
* @param eventLoop the channel event-loop
3131
*/
32-
public OutboundMessageChannel(EventLoop eventLoop, Predicate<M> predicate) {
32+
public OutboundMessageChannel(EventLoop eventLoop) {
3333
this.eventLoop = eventLoop;
34-
this.messageChannel = new MessageChannel.MpSc<>(predicate);
34+
this.messageChannel = new MessageChannel.MpSc<>(this);
3535
}
3636

3737
/**
3838
* Create a channel.
3939
*
4040
* @param eventLoop the channel event-loop
41+
* @param lowWaterMark the low-water mark, must be positive
42+
* @param highWaterMark the high-water mark, must be greater than the low-water mark
4143
*/
42-
public OutboundMessageChannel(EventLoop eventLoop) {
44+
public OutboundMessageChannel(EventLoop eventLoop, int lowWaterMark, int highWaterMark) {
4345
this.eventLoop = eventLoop;
44-
this.messageChannel = new MessageChannel.MpSc<>(this);
46+
this.messageChannel = new MessageChannel.MpSc<>(this, lowWaterMark, highWaterMark);
4547
}
4648

4749
@Override
@@ -71,21 +73,9 @@ public final boolean write(M message) {
7173
disposeMessage(message);
7274
return true;
7375
}
74-
reentrant++;
75-
try {
76-
flags = messageChannel.add(message);
77-
if ((flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0) {
78-
flags = messageChannel.drain();
79-
overflow |= (flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
80-
if ((flags & MessageChannel.WRITABLE_MASK) != 0) {
81-
handleDrained(numberOfUnwritableSignals(flags));
82-
}
83-
}
84-
} finally {
85-
reentrant--;
86-
}
87-
if (reentrant == 0 && closed) {
88-
releaseMessages();
76+
flags = messageChannel.add(message);
77+
if ((flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0) {
78+
flags = drainMessageQueue();
8979
}
9080
} else {
9181
if (eventuallyClosed) {
@@ -94,42 +84,60 @@ public final boolean write(M message) {
9484
}
9585
flags = messageChannel.add(message);
9686
if ((flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0) {
97-
eventLoop.execute(this::drainMessageChannel);
87+
eventLoop.execute(this::drain);
9888
}
9989
}
90+
int val;
10091
if ((flags & MessageChannel.UNWRITABLE_MASK) != 0) {
101-
int val = numberOfUnwritableSignals.incrementAndGet();
102-
return val <= 0;
92+
val = numberOfUnwritableSignals.incrementAndGet();
10393
} else {
104-
return numberOfUnwritableSignals.get() <= 0;
94+
val = numberOfUnwritableSignals.get();
10595
}
96+
return val <= 0;
10697
}
10798

10899
/**
109-
* Attempt to drain the queue Drain the queue.
100+
* Synchronous message queue drain.
110101
*/
111-
public void drain() {
112-
assert(eventLoop.inEventLoop());
113-
if (overflow) {
114-
startDraining();
115-
reentrant++;
116-
int flags;
117-
try {
118-
flags = messageChannel.drain();
119-
overflow = (flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
120-
if ((flags & MessageChannel.WRITABLE_MASK) != 0) {
121-
handleDrained(numberOfUnwritableSignals(flags));
122-
}
123-
} finally {
124-
reentrant--;
102+
private int drainMessageQueue() {
103+
reentrant++;
104+
try {
105+
int flags = messageChannel.drain();
106+
overflow |= (flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
107+
if ((flags & MessageChannel.WRITABLE_MASK) != 0) {
108+
handleDrained(numberOfUnwritableSignals(flags));
125109
}
126-
stopDraining();
110+
return flags;
111+
} finally {
112+
reentrant--;
127113
if (reentrant == 0 && closed) {
128114
releaseMessages();
129115
}
130116
}
131117
}
132118

119+
private void drain() {
120+
if (closed) {
121+
return;
122+
}
123+
startDraining();
124+
drainMessageQueue();
125+
stopDraining();
126+
}
127+
128+
/**
129+
* Attempts to drain the queue.
130+
*/
131+
public final boolean tryDrain() {
132+
assert(eventLoop.inEventLoop());
133+
if (overflow) {
134+
drain();
135+
return true;
136+
} else {
137+
return false;
138+
}
139+
}
140+
133141
/**
134142
* Close the queue.
135143
*/
@@ -146,28 +154,6 @@ public final void close() {
146154
releaseMessages();
147155
}
148156

149-
private void drainMessageChannel() {
150-
if (closed) {
151-
return;
152-
}
153-
startDraining();
154-
reentrant++;
155-
int flags;
156-
try {
157-
flags = messageChannel.drain();
158-
overflow = (flags & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
159-
if ((flags & MessageChannel.WRITABLE_MASK) != 0) {
160-
handleDrained(numberOfUnwritableSignals(flags));
161-
}
162-
} finally {
163-
reentrant--;
164-
}
165-
stopDraining();
166-
if (reentrant == 0 && closed) {
167-
releaseMessages();
168-
}
169-
}
170-
171157
private void handleDrained(int numberOfSignals) {
172158
int val = numberOfUnwritableSignals.addAndGet(-numberOfSignals);
173159
if ((val + numberOfSignals) > 0 && val <= 0) {

vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ protected void handleMessage(Object msg) {
210210
void channelWritabilityChanged() {
211211
channelWritable = chctx.channel().isWritable();
212212
if (channelWritable) {
213-
messageQueue.drain();
213+
messageQueue.tryDrain();
214214
}
215215
}
216216

vertx-core/src/main/java/io/vertx/core/streams/impl/MessageChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ public static class MpSc<E> extends MessageChannel<E> {
435435
public MpSc(Predicate<E> consumer) {
436436
this(consumer, DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK);
437437
}
438+
438439
public MpSc(Predicate<E> consumer, int lowWaterMark, int highWaterMark) {
439440
super(PlatformDependent.newMpscQueue(), consumer, lowWaterMark, highWaterMark);
440441
}

vertx-core/src/test/java/io/vertx/tests/concurrent/OutboundMessageChannelTest.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import java.util.ArrayList;
2020
import java.util.Collections;
2121
import java.util.List;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.stream.Collectors;
25+
import java.util.stream.IntStream;
2326

2427
/**
2528
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -91,7 +94,7 @@ public boolean test(Integer msg) {
9194
}
9295

9396
@Test
94-
public void testReentrantOverflowThenDrain() {
97+
public void testReentrantOverflowThenDrain1() {
9598
AtomicInteger drains = new AtomicInteger();
9699
queue = new OutboundMessageChannel<>(eventLoop) {
97100
int reentrant = 0;
@@ -124,6 +127,58 @@ protected void afterDrain() {
124127
eventLoop.execute(() -> {
125128
queue.write(0);
126129
assertEquals(1, drains.get());
130+
List<Integer> expected = IntStream.range(0, 16).boxed().collect(Collectors.toList());
131+
assertEquals(expected, output);
132+
testComplete();
133+
});
134+
await();
135+
}
136+
137+
@Test
138+
public void testReentrantOverflowThenDrain2() {
139+
AtomicInteger drains = new AtomicInteger();
140+
queue = new OutboundMessageChannel<>(eventLoop) {
141+
int reentrant = 0;
142+
@Override
143+
public boolean test(Integer msg) {
144+
assertEquals(0, reentrant++);
145+
try {
146+
output.add(msg);
147+
switch (msg) {
148+
case 0: {
149+
int count = 1;
150+
while (write(count++)) {
151+
}
152+
assertEquals(16, count);
153+
assertEquals(0, drains.get());
154+
break;
155+
}
156+
case 15: {
157+
int count = 16;
158+
while (write(count++)) {
159+
}
160+
assertEquals(17, count);
161+
assertEquals(0, drains.get());
162+
break;
163+
}
164+
default:
165+
break;
166+
}
167+
return true;
168+
} finally {
169+
reentrant--;
170+
}
171+
}
172+
@Override
173+
protected void afterDrain() {
174+
drains.incrementAndGet();
175+
}
176+
};
177+
eventLoop.execute(() -> {
178+
queue.write(0);
179+
assertEquals(1, drains.get());
180+
List<Integer> expected = IntStream.range(0, 17).boxed().collect(Collectors.toList());
181+
assertEquals(expected, output);
127182
testComplete();
128183
});
129184
await();
@@ -189,5 +244,6 @@ protected void startDraining() {
189244
testComplete();
190245
});
191246
});
192-
await(); }
247+
await();
248+
}
193249
}

0 commit comments

Comments
 (0)