Skip to content

Commit 660d582

Browse files
authored
fix(s3stream/wal): fail all IO operations once the WAL is failed (#1840)
* refactor: remove `WALChannel#writeAndFlush` Signed-off-by: Ning Yu <[email protected]> * refactor: introduce `WALChannel#retry` Signed-off-by: Ning Yu <[email protected]> * refactor: introduce `AbstractWALChannel` Signed-off-by: Ning Yu <[email protected]> * fix(s3stream/wal): fail all IO operations once the WAL is failed Signed-off-by: Ning Yu <[email protected]> * refactor: check failed before each IO operation Signed-off-by: Ning Yu <[email protected]> --------- Signed-off-by: Ning Yu <[email protected]>
1 parent a728531 commit 660d582

File tree

9 files changed

+196
-123
lines changed

9 files changed

+196
-123
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ private synchronized void flushWALHeader() throws IOException {
172172
walHeader.setLastWriteTimestamp(System.nanoTime());
173173
long trimOffset = walHeader.getTrimOffset();
174174
ByteBuf buf = walHeader.marshal();
175-
this.walChannel.retryWriteAndFlush(buf, position);
175+
this.walChannel.retryWrite(buf, position);
176+
this.walChannel.retryFlush();
176177
buf.release();
177178
walHeader.updateFlushedTrimOffset(trimOffset);
178179
LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader);
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package com.automq.stream.s3.wal.util;
13+
14+
import com.automq.stream.utils.Threads;
15+
import io.netty.buffer.ByteBuf;
16+
import java.io.IOException;
17+
import java.util.concurrent.TimeUnit;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
public abstract class AbstractWALChannel implements WALChannel {
22+
23+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWALChannel.class);
24+
25+
/**
26+
* Flag to indicate if the WAL has failed.
27+
* It will be set to true if an IO operation fails continuously, and it will never be reset.
28+
* Any IO operation will fail immediately if this flag is true.
29+
*/
30+
private volatile boolean failed = false;
31+
32+
@Override
33+
public void write(ByteBuf src, long position) throws IOException {
34+
checkFailed();
35+
doWrite(src, position);
36+
}
37+
38+
@Override
39+
public void retryWrite(ByteBuf src, long position, long retryIntervalMillis,
40+
long retryTimeoutMillis) throws IOException {
41+
checkFailed();
42+
retry(() -> write(src, position), retryIntervalMillis, retryTimeoutMillis);
43+
}
44+
45+
@Override
46+
public void flush() throws IOException {
47+
checkFailed();
48+
doFlush();
49+
}
50+
51+
@Override
52+
public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
53+
checkFailed();
54+
retry(this::flush, retryIntervalMillis, retryTimeoutMillis);
55+
}
56+
57+
@Override
58+
public int read(ByteBuf dst, long position, int length) throws IOException {
59+
checkFailed();
60+
return doRead(dst, position, length);
61+
}
62+
63+
@Override
64+
public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis,
65+
long retryTimeoutMillis) throws IOException {
66+
checkFailed();
67+
return retry(() -> read(dst, position, length), retryIntervalMillis, retryTimeoutMillis);
68+
}
69+
70+
private void retry(IORunnable runnable, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
71+
retry(IOSupplier.from(runnable), retryIntervalMillis, retryTimeoutMillis);
72+
}
73+
74+
private <T> T retry(IOSupplier<T> supplier, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
75+
long start = System.nanoTime();
76+
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
77+
while (true) {
78+
try {
79+
return supplier.get();
80+
} catch (IOException e) {
81+
if (System.nanoTime() - start > retryTimeoutNanos) {
82+
failed = true;
83+
LOGGER.error("Failed to execute IO operation, retry timeout", e);
84+
throw e;
85+
}
86+
checkFailed();
87+
LOGGER.warn("Failed to execute IO operation, retrying in {}ms, error: {}", retryIntervalMillis, e.getMessage());
88+
Threads.sleep(retryIntervalMillis);
89+
}
90+
}
91+
}
92+
93+
private void checkFailed() throws IOException {
94+
if (failed) {
95+
IOException e = new IOException("Failed to execute IO operation, WAL failed");
96+
LOGGER.error("Failed to execute IO operation, WAL failed", e);
97+
throw e;
98+
}
99+
}
100+
101+
protected abstract void doWrite(ByteBuf src, long position) throws IOException;
102+
103+
protected abstract void doFlush() throws IOException;
104+
105+
protected abstract int doRead(ByteBuf dst, long position, int length) throws IOException;
106+
107+
private interface IOSupplier<T> {
108+
T get() throws IOException;
109+
110+
static IOSupplier<Void> from(IORunnable runnable) {
111+
return () -> {
112+
runnable.run();
113+
return null;
114+
};
115+
}
116+
}
117+
118+
private interface IORunnable {
119+
void run() throws IOException;
120+
}
121+
}

s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;
2929
import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;
3030

31-
public class WALBlockDeviceChannel implements WALChannel {
31+
public class WALBlockDeviceChannel extends AbstractWALChannel {
3232
private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class);
3333
private static final String CHECK_DIRECT_IO_AVAILABLE_FORMAT = "%s.check_direct_io_available";
3434
final String path;
@@ -237,7 +237,7 @@ private ByteBuffer getBuffer(int alignedSize) {
237237
}
238238

239239
@Override
240-
public void write(ByteBuf src, long position) throws IOException {
240+
public void doWrite(ByteBuf src, long position) throws IOException {
241241
if (unalignedWrite) {
242242
// unaligned write, just used for testing
243243
unalignedWrite(src, position);
@@ -295,11 +295,11 @@ private int write(ByteBuffer src, long position) throws IOException {
295295
}
296296

297297
@Override
298-
public void flush() {
298+
public void doFlush() {
299299
}
300300

301301
@Override
302-
public int read(ByteBuf dst, long position, int length) throws IOException {
302+
public int doRead(ByteBuf dst, long position, int length) throws IOException {
303303
long start = position;
304304
length = Math.min(length, dst.writableBytes());
305305
long end = position + length;

s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,25 @@ public static WALCachedChannel of(WALChannel channel, int cacheSize) {
4343
return new WALCachedChannel(channel, cacheSize);
4444
}
4545

46+
@Override
47+
public int read(ByteBuf dst, long position, int length) throws IOException {
48+
return read(channel::read, dst, position, length);
49+
}
50+
51+
@Override
52+
public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis,
53+
long retryTimeoutMillis) throws IOException {
54+
Reader reader = (buf, pos, len) -> channel.retryRead(buf, pos, len, retryIntervalMillis, retryTimeoutMillis);
55+
return read(reader, dst, position, length);
56+
}
57+
4658
/**
4759
* As we use a common cache for all threads, we need to synchronize the read.
4860
*/
49-
@Override
50-
public synchronized int read(ByteBuf dst, long position, int length) throws IOException {
61+
private synchronized int read(Reader reader, ByteBuf dst, long position, int length) throws IOException {
5162
if (CAPACITY_NOT_SET == channel.capacity()) {
5263
// If we don't know the capacity now, we can't cache.
53-
return channel.read(dst, position, length);
64+
return reader.read(dst, position, length);
5465
}
5566

5667
long start = position;
@@ -60,7 +71,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx
6071
ByteBuf cache = getCache();
6172
if (length > cache.capacity()) {
6273
// If the length is larger than the cache capacity, we can't cache.
63-
return channel.read(dst, position, length);
74+
return reader.read(dst, position, length);
6475
}
6576

6677
boolean fallWithinCache = cachePosition >= 0 && cachePosition <= start && end <= cachePosition + cache.readableBytes();
@@ -69,7 +80,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx
6980
cachePosition = start;
7081
// Make sure the cache is not larger than the channel capacity.
7182
int cacheLength = (int) Math.min(cache.writableBytes(), channel.capacity() - cachePosition);
72-
channel.read(cache, cachePosition, cacheLength);
83+
reader.read(cache, cachePosition, cacheLength);
7384
}
7485

7586
// Now the cache is ready.
@@ -107,6 +118,10 @@ private ByteBuf getCache() {
107118
return this.cache;
108119
}
109120

121+
private interface Reader {
122+
int read(ByteBuf dst, long position, int length) throws IOException;
123+
}
124+
110125
@Override
111126
public void open(CapacityReader reader) throws IOException {
112127
this.channel.open(reader);
@@ -127,11 +142,22 @@ public void write(ByteBuf src, long position) throws IOException {
127142
this.channel.write(src, position);
128143
}
129144

145+
@Override
146+
public void retryWrite(ByteBuf src, long position, long retryIntervalMillis,
147+
long retryTimeoutMillis) throws IOException {
148+
channel.retryWrite(src, position, retryIntervalMillis, retryTimeoutMillis);
149+
}
150+
130151
@Override
131152
public void flush() throws IOException {
132153
this.channel.flush();
133154
}
134155

156+
@Override
157+
public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
158+
channel.retryFlush(retryIntervalMillis, retryTimeoutMillis);
159+
}
160+
135161
@Override
136162
public boolean useDirectIO() {
137163
return channel.useDirectIO();

s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java

Lines changed: 7 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import com.automq.stream.s3.wal.exception.WALCapacityMismatchException;
1515
import com.automq.stream.s3.wal.exception.WALNotInitializedException;
16-
import com.automq.stream.utils.Threads;
1716
import io.netty.buffer.ByteBuf;
1817
import java.io.File;
1918
import java.io.IOException;
@@ -31,8 +30,6 @@
3130
*/
3231
public interface WALChannel {
3332

34-
Logger LOGGER = LoggerFactory.getLogger(WALChannel.class);
35-
3633
long DEFAULT_RETRY_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
3734
long DEFAULT_RETRY_TIMEOUT = TimeUnit.MINUTES.toMillis(1);
3835

@@ -77,24 +74,7 @@ default void retryWrite(ByteBuf src, long position) throws IOException {
7774
/**
7875
* Retry {@link #write(ByteBuf, long)} with the given interval until success or timeout.
7976
*/
80-
default void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
81-
long start = System.nanoTime();
82-
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
83-
while (true) {
84-
try {
85-
write(src, position);
86-
break;
87-
} catch (IOException e) {
88-
if (System.nanoTime() - start > retryTimeoutNanos) {
89-
LOGGER.error("Failed to write, retry timeout", e);
90-
throw e;
91-
} else {
92-
LOGGER.error("Failed to write, retrying in {}ms", retryIntervalMillis, e);
93-
Threads.sleep(retryIntervalMillis);
94-
}
95-
}
96-
}
97-
}
77+
void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException;
9878

9979
/**
10080
* Flush to disk.
@@ -108,57 +88,10 @@ default void retryFlush() throws IOException {
10888
/**
10989
* Retry {@link #flush()} with the given interval until success or timeout.
11090
*/
111-
default void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
112-
long start = System.nanoTime();
113-
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
114-
while (true) {
115-
try {
116-
flush();
117-
break;
118-
} catch (IOException e) {
119-
if (System.nanoTime() - start > retryTimeoutNanos) {
120-
LOGGER.error("Failed to flush, retry timeout", e);
121-
throw e;
122-
} else {
123-
LOGGER.error("Failed to flush, retrying in {}ms", retryIntervalMillis, e);
124-
Threads.sleep(retryIntervalMillis);
125-
}
126-
}
127-
}
128-
}
129-
130-
/**
131-
* Call {@link #write(ByteBuf, long)} and {@link #flush()}.
132-
*/
133-
default void writeAndFlush(ByteBuf src, long position) throws IOException {
134-
write(src, position);
135-
flush();
136-
}
137-
138-
default void retryWriteAndFlush(ByteBuf src, long position) throws IOException {
139-
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
140-
}
91+
void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException;
14192

142-
/**
143-
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout.
144-
*/
145-
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
146-
long start = System.nanoTime();
147-
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
148-
while (true) {
149-
try {
150-
writeAndFlush(src, position);
151-
break;
152-
} catch (IOException e) {
153-
if (System.nanoTime() - start > retryTimeoutNanos) {
154-
LOGGER.error("Failed to write and flush, retry timeout", e);
155-
throw e;
156-
} else {
157-
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
158-
Threads.sleep(retryIntervalMillis);
159-
}
160-
}
161-
}
93+
default int read(ByteBuf dst, long position) throws IOException {
94+
return read(dst, position, dst.writableBytes());
16295
}
16396

16497
/**
@@ -171,34 +104,14 @@ default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMi
171104
*/
172105
int read(ByteBuf dst, long position, int length) throws IOException;
173106

174-
default int read(ByteBuf dst, long position) throws IOException {
175-
return read(dst, position, dst.writableBytes());
176-
}
177-
178107
default int retryRead(ByteBuf dst, long position) throws IOException {
179-
return retryRead(dst, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
108+
return retryRead(dst, position, dst.writableBytes(), DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
180109
}
181110

182111
/**
183-
* Retry {@link #read(ByteBuf, long)} with the given interval until success or timeout.
112+
* Retry {@link #read(ByteBuf, long, int)} with the given interval until success or timeout.
184113
*/
185-
default int retryRead(ByteBuf dst, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
186-
long start = System.nanoTime();
187-
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
188-
while (true) {
189-
try {
190-
return read(dst, position);
191-
} catch (IOException e) {
192-
if (System.nanoTime() - start > retryTimeoutNanos) {
193-
LOGGER.error("Failed to read, retry timeout", e);
194-
throw e;
195-
} else {
196-
LOGGER.error("Failed to read, retrying in {}ms", retryIntervalMillis, e);
197-
Threads.sleep(retryIntervalMillis);
198-
}
199-
}
200-
}
201-
}
114+
int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, long retryTimeoutMillis) throws IOException;
202115

203116
boolean useDirectIO();
204117

0 commit comments

Comments
 (0)