Skip to content

Commit b76f985

Browse files
okg-cxftishun
andauthored
Fix: make sure FIFO order between write and notify channel active (#2597)
* fix: make sure FIFO order for write() when notifyChannelActive(), also make sure channel access thread-safe and avoid potential NPE * Formatting issues --------- Co-authored-by: Tihomir Mateev <[email protected]>
1 parent 230a03a commit b76f985

File tree

2 files changed

+58
-34
lines changed

2 files changed

+58
-34
lines changed

src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,9 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
195195
}
196196

197197
if (autoFlushCommands) {
198-
199-
if (isConnected()) {
200-
writeToChannelAndFlush(command);
198+
Channel channel = this.channel;
199+
if (isConnected(channel)) {
200+
writeToChannelAndFlush(channel, command);
201201
} else {
202202
writeToDisconnectedBuffer(command);
203203
}
@@ -236,9 +236,9 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
236236
}
237237

238238
if (autoFlushCommands) {
239-
240-
if (isConnected()) {
241-
writeToChannelAndFlush(commands);
239+
Channel channel = this.channel;
240+
if (isConnected(channel)) {
241+
writeToChannelAndFlush(channel, commands);
242242
} else {
243243
writeToDisconnectedBuffer(commands);
244244
}
@@ -288,10 +288,9 @@ private RedisException validateWrite(int commands) {
288288
return new RedisException("Connection is closed");
289289
}
290290

291+
final boolean connected = isConnected(this.channel);
291292
if (usesBoundedQueues()) {
292293

293-
boolean connected = isConnected();
294-
295294
if (QUEUE_SIZE.get(this) + commands > clientOptions.getRequestQueueSize()) {
296295
return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
297296
+ ". Commands are not accepted until the queue size drops.");
@@ -308,7 +307,7 @@ private RedisException validateWrite(int commands) {
308307
}
309308
}
310309

311-
if (!isConnected() && rejectCommandsWhileDisconnected) {
310+
if (!connected && rejectCommandsWhileDisconnected) {
312311
return new RedisException("Currently not connected. Commands are rejected.");
313312
}
314313

@@ -370,11 +369,11 @@ private void writeToDisconnectedBuffer(RedisCommand<?, ?, ?> command) {
370369
commandBuffer.add(command);
371370
}
372371

373-
private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
372+
private void writeToChannelAndFlush(Channel channel, RedisCommand<?, ?, ?> command) {
374373

375374
QUEUE_SIZE.incrementAndGet(this);
376375

377-
ChannelFuture channelFuture = channelWriteAndFlush(command);
376+
ChannelFuture channelFuture = channelWriteAndFlush(channel, command);
378377

379378
if (reliability == Reliability.AT_MOST_ONCE) {
380379
// cancel on exceptions and remove from queue, because there is no housekeeping
@@ -387,30 +386,30 @@ private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
387386
}
388387
}
389388

390-
private void writeToChannelAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
389+
private void writeToChannelAndFlush(Channel channel, Collection<? extends RedisCommand<?, ?, ?>> commands) {
391390

392391
QUEUE_SIZE.addAndGet(this, commands.size());
393392

394393
if (reliability == Reliability.AT_MOST_ONCE) {
395394

396395
// cancel on exceptions and remove from queue, because there is no housekeeping
397396
for (RedisCommand<?, ?, ?> command : commands) {
398-
channelWrite(command).addListener(AtMostOnceWriteListener.newInstance(this, command));
397+
channelWrite(channel, command).addListener(AtMostOnceWriteListener.newInstance(this, command));
399398
}
400399
}
401400

402401
if (reliability == Reliability.AT_LEAST_ONCE) {
403402

404403
// commands are ok to stay within the queue, reconnect will retrigger them
405404
for (RedisCommand<?, ?, ?> command : commands) {
406-
channelWrite(command).addListener(RetryListener.newInstance(this, command));
405+
channelWrite(channel, command).addListener(RetryListener.newInstance(this, command));
407406
}
408407
}
409408

410-
channelFlush();
409+
channelFlush(channel);
411410
}
412411

413-
private void channelFlush() {
412+
private void channelFlush(Channel channel) {
414413

415414
if (debugEnabled) {
416415
logger.debug("{} write() channelFlush", logPrefix());
@@ -419,7 +418,7 @@ private void channelFlush() {
419418
channel.flush();
420419
}
421420

422-
private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
421+
private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> command) {
423422

424423
if (debugEnabled) {
425424
logger.debug("{} write() channelWrite command {}", logPrefix(), command);
@@ -428,7 +427,7 @@ private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
428427
return channel.write(command);
429428
}
430429

431-
private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
430+
private ChannelFuture channelWriteAndFlush(Channel channel, RedisCommand<?, ?, ?> command) {
432431

433432
if (debugEnabled) {
434433
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
@@ -441,7 +440,6 @@ private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
441440
public void notifyChannelActive(Channel channel) {
442441

443442
this.logPrefix = null;
444-
this.channel = channel;
445443
this.connectionError = null;
446444

447445
if (isClosed()) {
@@ -456,6 +454,7 @@ public void notifyChannelActive(Channel channel) {
456454
}
457455

458456
sharedLock.doExclusive(() -> {
457+
this.channel = channel;
459458

460459
try {
461460
// Move queued commands to buffer before issuing any commands because of connection activation.
@@ -478,7 +477,7 @@ public void notifyChannelActive(Channel channel) {
478477
inActivation = false;
479478
}
480479

481-
flushCommands(disconnectedBuffer);
480+
flushCommands(channel, disconnectedBuffer);
482481
} catch (Exception e) {
483482

484483
if (debugEnabled) {
@@ -525,7 +524,7 @@ public void notifyException(Throwable t) {
525524
doExclusive(this::drainCommands).forEach(cmd -> cmd.completeExceptionally(t));
526525
}
527526

528-
if (!isConnected()) {
527+
if (!isConnected(this.channel)) {
529528
connectionError = t;
530529
}
531530
}
@@ -538,16 +537,16 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
538537
@Override
539538
@SuppressWarnings({ "rawtypes", "unchecked" })
540539
public void flushCommands() {
541-
flushCommands(commandBuffer);
540+
flushCommands(this.channel, commandBuffer);
542541
}
543542

544-
private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
543+
private void flushCommands(Channel channel, Queue<RedisCommand<?, ?, ?>> queue) {
545544

546545
if (debugEnabled) {
547546
logger.debug("{} flushCommands()", logPrefix());
548547
}
549548

550-
if (isConnected()) {
549+
if (isConnected(channel)) {
551550

552551
List<RedisCommand<?, ?, ?>> commands = sharedLock.doExclusive(() -> {
553552

@@ -563,7 +562,7 @@ private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
563562
}
564563

565564
if (!commands.isEmpty()) {
566-
writeToChannelAndFlush(commands);
565+
writeToChannelAndFlush(channel, commands);
567566
}
568567
}
569568
}
@@ -626,10 +625,10 @@ public void disconnect() {
626625

627626
private Channel getOpenChannel() {
628627

629-
Channel currentChannel = this.channel;
628+
Channel channel = this.channel;
630629

631-
if (currentChannel != null) {
632-
return currentChannel;
630+
if (channel != null /* && channel.isOpen() is this deliberately omitted? */) {
631+
return channel;
633632
}
634633

635634
return null;
@@ -646,6 +645,7 @@ public void reset() {
646645
logger.debug("{} reset()", logPrefix());
647646
}
648647

648+
Channel channel = this.channel;
649649
if (channel != null) {
650650
channel.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
651651
}
@@ -718,9 +718,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
718718
}
719719
}
720720

721-
if (isConnected()) {
722-
flushCommands(disconnectedBuffer);
723-
}
721+
flushCommands(this.channel, disconnectedBuffer);
724722
});
725723
}
726724

@@ -802,9 +800,7 @@ private void cancelCommands(String message, Iterable<? extends RedisCommand<?, ?
802800
}
803801
}
804802

805-
private boolean isConnected() {
806-
807-
Channel channel = this.channel;
803+
private boolean isConnected(Channel channel) {
808804
return channel != null && channel.isActive();
809805
}
810806

src/test/java/io/lettuce/core/protocol/DefaultEndpointUnitTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,34 @@ void before() {
124124
sut.setConnectionFacade(connectionFacade);
125125
}
126126

127+
@Test
128+
void writeShouldGuaranteeFIFOOrder() {
129+
sut.write(Collections.singletonList(new Command<>(CommandType.SELECT, new StatusOutput<>(StringCodec.UTF8))));
130+
131+
sut.registerConnectionWatchdog(connectionWatchdog);
132+
doAnswer(i -> sut.write(new Command<>(CommandType.AUTH, new StatusOutput<>(StringCodec.UTF8)))).when(connectionWatchdog)
133+
.arm();
134+
when(channel.isActive()).thenReturn(true);
135+
136+
sut.notifyChannelActive(channel);
137+
138+
DefaultChannelPromise promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
139+
140+
when(channel.writeAndFlush(any())).thenAnswer(invocation -> {
141+
if (invocation.getArguments()[0] instanceof RedisCommand) {
142+
queue.add((RedisCommand) invocation.getArguments()[0]);
143+
}
144+
145+
if (invocation.getArguments()[0] instanceof Collection) {
146+
queue.addAll((Collection) invocation.getArguments()[0]);
147+
}
148+
return promise;
149+
});
150+
151+
assertThat(queue).hasSize(2).first().hasFieldOrPropertyWithValue("type", CommandType.SELECT);
152+
assertThat(queue).hasSize(2).last().hasFieldOrPropertyWithValue("type", CommandType.AUTH);
153+
}
154+
127155
@Test
128156
void writeConnectedShouldWriteCommandToChannel() {
129157

0 commit comments

Comments
 (0)