Skip to content

Commit 50494ba

Browse files
Extract more code from frozen to shared blob cache module (#93786)
Extracting effectively all the read path logic to the shared module for reuse and encapsulate the magic around the buffer permits into a specific class to make the API nicer.
1 parent 2c4ef7c commit 50494ba

File tree

3 files changed

+153
-95
lines changed

3 files changed

+153
-95
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.blobcache.common;
9+
10+
import org.elasticsearch.core.Nullable;
11+
12+
import java.nio.ByteBuffer;
13+
import java.util.concurrent.Semaphore;
14+
15+
/**
16+
* Wrapper around a {@link ByteBuffer} that allows making slices of it available for writing to multiple threads concurrently in a safe
17+
* manner. Used to populate a read buffer from Lucene concurrently across multiple threads.
18+
*/
19+
public final class ByteBufferReference {
20+
21+
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
22+
23+
private ByteBuffer buffer;
24+
25+
/**
26+
* @param buffer to guard against concurrent manipulations
27+
*/
28+
public ByteBufferReference(ByteBuffer buffer) {
29+
this.buffer = buffer;
30+
}
31+
32+
/**
33+
* Acquires a permit that must be released via {@link #release()} and returns a {@link ByteBuffer} slice of {@link #buffer} that may be
34+
* written to, or {@code null} if {@link #finish(int)} has already been called on this instance and no more writing to the buffer is
35+
* allowed.
36+
* @param position position relative to the position of {@link #buffer} to start slice at
37+
* @param length slice length
38+
* @return slice of {@link #buffer} to write to or {@code null} if already closed
39+
*/
40+
@Nullable
41+
public ByteBuffer tryAcquire(int position, int length) {
42+
if (permits.tryAcquire() == false) {
43+
return null;
44+
}
45+
return buffer.slice(buffer.position() + position, length);
46+
}
47+
48+
/**
49+
* Release a permit acquired through {@link #tryAcquire(int, int)}.
50+
*/
51+
public void release() {
52+
permits.release();
53+
}
54+
55+
/**
56+
* Acquire all permits and then safely advance the position of {@link #buffer} by the given number of bytes.
57+
*
58+
* @param bytesRead number of bytes to advance the current position of {@link #buffer} by.
59+
* @throws Exception on failure
60+
*/
61+
public void finish(int bytesRead) throws Exception {
62+
if (buffer != null) {
63+
assert bytesRead == 0 || permits.availablePermits() == Integer.MAX_VALUE
64+
: "Try to finish [" + bytesRead + "] but only had [" + permits.availablePermits() + "] permits available.";
65+
permits.acquire(Integer.MAX_VALUE);
66+
buffer.position(buffer.position() + bytesRead); // mark all bytes as accounted for
67+
buffer = null;
68+
}
69+
}
70+
}

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.blobcache.BlobCacheUtils;
13+
import org.elasticsearch.blobcache.common.ByteBufferReference;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
1415
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1516
import org.elasticsearch.core.AbstractRefCounted;
@@ -164,6 +165,47 @@ private static int positionalWrite(IO fc, long start, ByteBuffer byteBuffer) thr
164165
return written;
165166
}
166167

168+
/**
169+
* Read {@code length} bytes from given shared bytes at given {@code channelPos} into {@code byteBufferReference} at given
170+
* {@code relativePos}.
171+
* @param fc shared bytes channel to read from
172+
* @param channelPos position in {@code fc} to read from
173+
* @param relativePos position in {@code byteBufferReference}
174+
* @param length number of bytes to read
175+
* @param byteBufferReference buffer reference
176+
* @param cacheFile cache file reference used for exception messages only
177+
* @return number of bytes read
178+
* @throws IOException on failure
179+
*/
180+
public static int readCacheFile(
181+
final IO fc,
182+
long channelPos,
183+
long relativePos,
184+
long length,
185+
final ByteBufferReference byteBufferReference,
186+
Object cacheFile
187+
) throws IOException {
188+
if (length == 0L) {
189+
return 0;
190+
}
191+
final int bytesRead;
192+
final ByteBuffer dup = byteBufferReference.tryAcquire(Math.toIntExact(relativePos), Math.toIntExact(length));
193+
if (dup != null) {
194+
try {
195+
bytesRead = fc.read(dup, channelPos);
196+
if (bytesRead == -1) {
197+
BlobCacheUtils.throwEOF(channelPos, dup.remaining(), cacheFile);
198+
}
199+
} finally {
200+
byteBufferReference.release();
201+
}
202+
} else {
203+
// return fake response
204+
return Math.toIntExact(length);
205+
}
206+
return bytesRead;
207+
}
208+
167209
@Override
168210
protected void closeInternal() {
169211
try {

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java

Lines changed: 41 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.lucene.store.IOContext;
13-
import org.elasticsearch.blobcache.BlobCacheUtils;
13+
import org.elasticsearch.blobcache.common.ByteBufferReference;
1414
import org.elasticsearch.blobcache.common.ByteRange;
1515
import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
1616
import org.elasticsearch.blobcache.shared.SharedBytes;
@@ -21,10 +21,8 @@
2121
import org.elasticsearch.xpack.searchablesnapshots.store.IndexInputStats;
2222
import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory;
2323

24-
import java.io.IOException;
2524
import java.io.InputStream;
2625
import java.nio.ByteBuffer;
27-
import java.util.concurrent.Semaphore;
2826

2927
public class FrozenIndexInput extends MetadataCachingIndexInput {
3028

@@ -104,13 +102,10 @@ protected long getDefaultRangeSize() {
104102
protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
105103
final long position = getAbsolutePosition();
106104
final int length = b.remaining();
107-
final int originalByteBufPosition = b.position();
108-
109105
// Semaphore that, when all permits are acquired, ensures that async callbacks (such as those used by readCacheFile) are not
110106
// accessing the byte buffer anymore that was passed to readWithoutBlobCache
111107
// In particular, it's important to acquire all permits before adapting the ByteBuffer's offset
112-
final Semaphore luceneByteBufPermits = new Semaphore(Integer.MAX_VALUE);
113-
boolean bufferWriteLocked = false;
108+
final ByteBufferReference byteBufferReference = new ByteBufferReference(b);
114109
logger.trace("readInternal: read [{}-{}] from [{}]", position, position + length, this);
115110
try {
116111
final ByteRange startRangeToWrite = computeRange(position);
@@ -122,99 +117,50 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
122117
: "[" + position + "-" + (position + length) + "] vs " + rangeToWrite;
123118
final ByteRange rangeToRead = ByteRange.of(position, position + length);
124119

125-
final int bytesRead = cacheFile.populateAndRead(
126-
rangeToWrite,
127-
rangeToRead,
128-
(channel, pos, relativePos, len) -> readCacheFile(
129-
channel,
120+
final int bytesRead = cacheFile.populateAndRead(rangeToWrite, rangeToRead, (channel, pos, relativePos, len) -> {
121+
logger.trace(
122+
"{}: reading logical {} channel {} pos {} length {} (details: {})",
123+
fileInfo.physicalName(),
124+
rangeToRead.start(),
130125
pos,
131126
relativePos,
132-
len,
133-
b,
134-
rangeToRead.start(),
135-
luceneByteBufPermits
136-
),
137-
(channel, channelPos, relativePos, len, progressUpdater) -> {
138-
final long startTimeNanos = stats.currentTimeNanos();
139-
try (InputStream input = openInputStreamFromBlobStore(rangeToWrite.start() + relativePos, len)) {
140-
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
141-
logger.trace(
142-
"{}: writing channel {} pos {} length {} (details: {})",
143-
fileInfo.physicalName(),
144-
channelPos,
145-
relativePos,
146-
len,
147-
cacheFile
148-
);
149-
SharedBytes.copyToCacheFileAligned(
150-
channel,
151-
input,
152-
channelPos,
153-
relativePos,
154-
len,
155-
progressUpdater,
156-
writeBuffer.get().clear(),
157-
cacheFile
158-
);
159-
final long endTimeNanos = stats.currentTimeNanos();
160-
stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
161-
}
162-
},
163-
SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
164-
);
127+
length,
128+
cacheFile
129+
);
130+
final int read = SharedBytes.readCacheFile(channel, pos, relativePos, len, byteBufferReference, cacheFile);
131+
stats.addCachedBytesRead(read);
132+
return read;
133+
}, (channel, channelPos, relativePos, len, progressUpdater) -> {
134+
final long startTimeNanos = stats.currentTimeNanos();
135+
try (InputStream input = openInputStreamFromBlobStore(rangeToWrite.start() + relativePos, len)) {
136+
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
137+
logger.trace(
138+
"{}: writing channel {} pos {} length {} (details: {})",
139+
fileInfo.physicalName(),
140+
channelPos,
141+
relativePos,
142+
len,
143+
cacheFile
144+
);
145+
SharedBytes.copyToCacheFileAligned(
146+
channel,
147+
input,
148+
channelPos,
149+
relativePos,
150+
len,
151+
progressUpdater,
152+
writeBuffer.get().clear(),
153+
cacheFile
154+
);
155+
final long endTimeNanos = stats.currentTimeNanos();
156+
stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
157+
}
158+
}, SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
165159
assert bytesRead == length : bytesRead + " vs " + length;
166-
assert luceneByteBufPermits.availablePermits() == Integer.MAX_VALUE;
167-
168-
luceneByteBufPermits.acquire(Integer.MAX_VALUE);
169-
bufferWriteLocked = true;
170-
b.position(originalByteBufPosition + bytesRead); // mark all bytes as accounted for
160+
byteBufferReference.finish(bytesRead);
171161
} finally {
172-
if (bufferWriteLocked == false) {
173-
luceneByteBufPermits.acquire(Integer.MAX_VALUE);
174-
}
175-
}
176-
}
177-
178-
private int readCacheFile(
179-
final SharedBytes.IO fc,
180-
long channelPos,
181-
long relativePos,
182-
long length,
183-
final ByteBuffer buffer,
184-
long logicalPos,
185-
Semaphore luceneByteBufPermits
186-
) throws IOException {
187-
logger.trace(
188-
"{}: reading cached {} logical {} channel {} pos {} length {} (details: {})",
189-
fileInfo.physicalName(),
190-
false,
191-
logicalPos,
192-
channelPos,
193-
relativePos,
194-
length,
195-
cacheFile
196-
);
197-
if (length == 0L) {
198-
return 0;
199-
}
200-
final int bytesRead;
201-
if (luceneByteBufPermits.tryAcquire()) {
202-
try {
203-
// create slice that is positioned to read the given values
204-
final ByteBuffer dup = buffer.slice(buffer.position() + Math.toIntExact(relativePos), Math.toIntExact(length));
205-
bytesRead = fc.read(dup, channelPos);
206-
if (bytesRead == -1) {
207-
BlobCacheUtils.throwEOF(channelPos, dup.remaining(), this.cacheFile);
208-
}
209-
} finally {
210-
luceneByteBufPermits.release();
211-
}
212-
} else {
213-
// return fake response
214-
return Math.toIntExact(length);
162+
byteBufferReference.finish(0);
215163
}
216-
stats.addCachedBytesRead(bytesRead);
217-
return bytesRead;
218164
}
219165

220166
@Override

0 commit comments

Comments
 (0)