@@ -20,28 +20,30 @@ public class OutboundMessageChannel<M> implements Predicate<M> {
2020 private volatile boolean eventuallyClosed ;
2121
2222 // State accessed exclusively by the event loop thread
23- private boolean overflow ;
23+ private boolean overflow ; // Indicates channel ownership
24+ private int draining = 0 ; // Indicates drain in progress
2425 private boolean closed ;
25- private int reentrant = 0 ;
2626
2727 /**
2828 * Create a channel.
2929 *
3030 * @param eventLoop the channel event-loop
3131 */
32- public OutboundMessageChannel (EventLoop eventLoop , Predicate < M > predicate ) {
32+ public OutboundMessageChannel (EventLoop eventLoop ) {
3333 this .eventLoop = eventLoop ;
34- this .messageChannel = new MessageChannel .MpSc <>(predicate );
34+ this .messageChannel = new MessageChannel .MpSc <>(this );
3535 }
3636
3737 /**
3838 * Create a channel.
3939 *
4040 * @param eventLoop the channel event-loop
41+ * @param lowWaterMark the low-water mark, must be positive
42+ * @param highWaterMark the high-water mark, must be greater than the low-water mark
4143 */
42- public OutboundMessageChannel (EventLoop eventLoop ) {
44+ public OutboundMessageChannel (EventLoop eventLoop , int lowWaterMark , int highWaterMark ) {
4345 this .eventLoop = eventLoop ;
44- this .messageChannel = new MessageChannel .MpSc <>(this );
46+ this .messageChannel = new MessageChannel .MpSc <>(this , lowWaterMark , highWaterMark );
4547 }
4648
4749 @ Override
@@ -68,68 +70,76 @@ public final boolean write(M message) {
6870 int flags ;
6971 if (inEventLoop ) {
7072 if (closed ) {
71- disposeMessage (message );
73+ handleDispose (message );
7274 return true ;
7375 }
74- reentrant ++;
75- try {
76- flags = messageChannel .add (message );
77- if ((flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ) {
78- flags = messageChannel .drain ();
79- overflow |= (flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ;
80- if ((flags & MessageChannel .WRITABLE_MASK ) != 0 ) {
81- handleDrained (numberOfUnwritableSignals (flags ));
82- }
83- }
84- } finally {
85- reentrant --;
86- }
87- if (reentrant == 0 && closed ) {
88- releaseMessages ();
76+ flags = messageChannel .add (message );
77+ if (draining == 0 && (flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ) {
78+ flags = drainMessageQueue ();
8979 }
9080 } else {
9181 if (eventuallyClosed ) {
92- disposeMessage (message );
82+ handleDispose (message );
9383 return true ;
9484 }
9585 flags = messageChannel .add (message );
9686 if ((flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ) {
97- eventLoop .execute (this ::drainMessageChannel );
87+ eventLoop .execute (this ::drain );
9888 }
9989 }
90+ int val ;
10091 if ((flags & MessageChannel .UNWRITABLE_MASK ) != 0 ) {
101- int val = numberOfUnwritableSignals .incrementAndGet ();
102- return val <= 0 ;
92+ val = numberOfUnwritableSignals .incrementAndGet ();
10393 } else {
104- return numberOfUnwritableSignals .get () <= 0 ;
94+ val = numberOfUnwritableSignals .get ();
10595 }
96+ return val <= 0 ;
10697 }
10798
10899 /**
109- * Attempt to drain the queue Drain the queue .
100+ * Synchronous message queue drain .
110101 */
111- public void drain () {
112- assert (eventLoop .inEventLoop ());
113- if (overflow ) {
114- startDraining ();
115- reentrant ++;
116- int flags ;
117- try {
118- flags = messageChannel .drain ();
119- overflow = (flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ;
120- if ((flags & MessageChannel .WRITABLE_MASK ) != 0 ) {
121- handleDrained (numberOfUnwritableSignals (flags ));
122- }
123- } finally {
124- reentrant --;
102+ private int drainMessageQueue () {
103+ draining ++;
104+ try {
105+ int flags = messageChannel .drain ();
106+ overflow |= (flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ;
107+ if ((flags & MessageChannel .WRITABLE_MASK ) != 0 ) {
108+ handleDrained (numberOfUnwritableSignals (flags ));
125109 }
126- stopDraining ();
127- if (reentrant == 0 && closed ) {
110+ return flags ;
111+ } finally {
112+ draining --;
113+ if (draining == 0 && closed ) {
128114 releaseMessages ();
129115 }
130116 }
131117 }
132118
119+ private void drain () {
120+ if (closed ) {
121+ return ;
122+ }
123+ assert (draining == 0 );
124+ startDraining ();
125+ drainMessageQueue ();
126+ stopDraining ();
127+ }
128+
129+ /**
130+ * Attempts to drain the queue.
131+ */
132+ public final boolean tryDrain () {
133+ assert (eventLoop .inEventLoop ());
134+ if (overflow ) {
135+ overflow = false ;
136+ drain ();
137+ return true ;
138+ } else {
139+ return false ;
140+ }
141+ }
142+
133143 /**
134144 * Close the queue.
135145 */
@@ -140,52 +150,30 @@ public final void close() {
140150 }
141151 closed = true ;
142152 eventuallyClosed = true ;
143- if (reentrant > 0 ) {
153+ if (draining > 0 ) {
144154 return ;
145155 }
146156 releaseMessages ();
147157 }
148158
149- private void drainMessageChannel () {
150- if (closed ) {
151- return ;
152- }
153- startDraining ();
154- reentrant ++;
155- int flags ;
156- try {
157- flags = messageChannel .drain ();
158- overflow = (flags & MessageChannel .DRAIN_REQUIRED_MASK ) != 0 ;
159- if ((flags & MessageChannel .WRITABLE_MASK ) != 0 ) {
160- handleDrained (numberOfUnwritableSignals (flags ));
161- }
162- } finally {
163- reentrant --;
164- }
165- stopDraining ();
166- if (reentrant == 0 && closed ) {
167- releaseMessages ();
168- }
169- }
170-
171159 private void handleDrained (int numberOfSignals ) {
172160 int val = numberOfUnwritableSignals .addAndGet (-numberOfSignals );
173161 if ((val + numberOfSignals ) > 0 && val <= 0 ) {
174- afterDrain ( );
162+ eventLoop . execute ( this :: handleDrained );
175163 }
176164 }
177165
178166 private void releaseMessages () {
179167 List <M > messages = messageChannel .clear ();
180168 for (M elt : messages ) {
181- disposeMessage (elt );
169+ handleDispose (elt );
182170 }
183171 }
184172
185173 /**
186174 * Called when the channel becomes writable again.
187175 */
188- protected void afterDrain () {
176+ protected void handleDrained () {
189177 }
190178
191179 protected void startDraining () {
@@ -199,6 +187,6 @@ protected void stopDraining() {
199187 *
200188 * @param msg the message
201189 */
202- protected void disposeMessage (M msg ) {
190+ protected void handleDispose (M msg ) {
203191 }
204192}
0 commit comments