Skip to content

Commit 5468936

Browse files
authored
Refactor OnDemandBlockIndexInput to AbstractBlockIndexInput for extensibility (#19613)
* Renaming OnDemandBlockIndexInput to AbstractBlockIndexInput. AbstractBlockIndexInput is an abstract class containing all the common operations for block index inputs. Also added static utility methods for common block operations. Signed-off-by: Abhinav Gupta <[email protected]> * Added missing documentation for the utility methods. Removed duplicate utility methods. Corrected test cases. Signed-off-by: Abhinav Gupta <[email protected]> --------- Signed-off-by: Abhinav Gupta <[email protected]>
1 parent cb530cf commit 5468936

File tree

5 files changed

+426
-72
lines changed

5 files changed

+426
-72
lines changed
Lines changed: 148 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,28 @@
1818
import java.io.EOFException;
1919
import java.io.IOException;
2020
import java.lang.ref.Cleaner;
21+
import java.util.List;
2122
import java.util.Objects;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.IntStream;
2225

2326
/**
2427
* Class acts as a virtual file mechanism for the accessed files and only fetches the required blocks of the actual file.
25-
* Original/Main IndexInput file will be split using {@link OnDemandBlockIndexInput.Builder#DEFAULT_BLOCK_SIZE_SHIFT}. This class has all the
26-
* logic of how and when to fetch specific block of the main file. Each block is identified by {@link OnDemandBlockIndexInput#currentBlockId}.
28+
* Original/Main IndexInput file will be split using {@link AbstractBlockIndexInput.Builder#DEFAULT_BLOCK_SIZE_SHIFT}. This class has all the
29+
* logic of how and when to fetch specific block of the main file. Each block is identified by {@link AbstractBlockIndexInput#currentBlockId}.
2730
* <br>
2831
* This class delegate the responsibility of actually fetching the block when demanded to its subclasses using
29-
* {@link OnDemandBlockIndexInput#fetchBlock(int)}.
32+
* {@link AbstractBlockIndexInput#fetchBlock(int)}.
3033
* <p>
3134
* Like {@link IndexInput}, this class may only be used from one thread as it is not thread safe.
3235
* However, a cleaning action may run from another thread triggered by the {@link Cleaner}, but
33-
* this is okay because at that point the {@link OnDemandBlockIndexInput} instance is phantom
36+
* this is okay because at that point the {@link AbstractBlockIndexInput} instance is phantom
3437
* reachable and therefore not able to be accessed by any other thread.
3538
*
3639
* @opensearch.internal
3740
*/
38-
public abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
39-
private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class);
41+
public abstract class AbstractBlockIndexInput extends IndexInput implements RandomAccessInput {
42+
private static final Logger logger = LogManager.getLogger(AbstractBlockIndexInput.class);
4043

4144
public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner";
4245

@@ -47,7 +50,7 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand
4750
* the cleaning action is a no-op. For an open IndexInput, the close action
4851
* will decrement a reference count.
4952
*/
50-
private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX));
53+
protected static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX));
5154

5255
/**
5356
* Start offset of the virtual file : non-zero in the slice case
@@ -75,25 +78,26 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand
7578
/**
7679
* ID of the current block
7780
*/
78-
private int currentBlockId;
81+
protected int currentBlockId;
7982

8083
private final BlockHolder blockHolder = new BlockHolder();
84+
protected final Cleaner.Cleanable cleanable;
8185

82-
OnDemandBlockIndexInput(Builder builder) {
86+
protected AbstractBlockIndexInput(Builder builder) {
8387
super(builder.resourceDescription);
8488
this.isClone = builder.isClone;
8589
this.offset = builder.offset;
8690
this.length = builder.length;
8791
this.blockSizeShift = builder.blockSizeShift;
8892
this.blockSize = builder.blockSize;
8993
this.blockMask = builder.blockMask;
90-
CLEANER.register(this, blockHolder);
94+
this.cleanable = CLEANER.register(this, blockHolder);
9195
}
9296

9397
/**
9498
* Builds the actual sliced IndexInput (may apply extra offset in subclasses).
9599
**/
96-
protected abstract OnDemandBlockIndexInput buildSlice(String sliceDescription, long offset, long length);
100+
protected abstract AbstractBlockIndexInput buildSlice(String sliceDescription, long offset, long length);
97101

98102
/**
99103
* Given a blockId, fetch it's IndexInput which might be partial/split/cloned one
@@ -103,7 +107,7 @@ public abstract class OnDemandBlockIndexInput extends IndexInput implements Rand
103107
protected abstract IndexInput fetchBlock(int blockId) throws IOException;
104108

105109
@Override
106-
public abstract OnDemandBlockIndexInput clone();
110+
public abstract AbstractBlockIndexInput clone();
107111

108112
@Override
109113
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
@@ -302,6 +306,107 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException {
302306

303307
}
304308

309+
/**
310+
* Utility method to get the blockSize given blockSizeShift.
311+
* @param blockSizeShift blockSizeShift used to calculate blockSize.
312+
* @return returns blockSize
313+
*/
314+
public static int getBlockSize(int blockSizeShift) {
315+
return 1 << blockSizeShift;
316+
}
317+
318+
/**
319+
* Utility method to get the blockId corresponding to the file offset passed.
320+
* @param pos file offset whose blockId is requested.
321+
* @param blockSizeShift blockSizeShift used to calculate blockSize.
322+
* @return blockId for the given pos.
323+
*/
324+
public static int getBlock(long pos, int blockSizeShift) {
325+
return (int) (pos >>> blockSizeShift);
326+
}
327+
328+
/**
329+
* Utility method to convert file offset to block level offset.
330+
* @param pos fileOffset whose block offset is requested.
331+
* @param blockSizeShift blockSizeShift used to calculate blockSize.
332+
* @return returns block offset for the given pos.
333+
*/
334+
public static long getBlockOffset(long pos, int blockSizeShift) {
335+
return (long) (pos & (getBlockSize(blockSizeShift) - 1));
336+
}
337+
338+
/**
339+
* Utility method to get the starting file offset of the given block.
340+
* @param blockId blockId whose start offset is requested.
341+
* @param blockSizeShift blockSizeShift used to calculate blockSize.
342+
* @return returns the file offset corresponding to the start of the block.
343+
*/
344+
public static long getBlockStart(int blockId, int blockSizeShift) {
345+
return (long) blockId << blockSizeShift;
346+
}
347+
348+
/**
349+
* Utility method to get the number of blocks in a file.
350+
* @param fileSize fileSize of the original file.
351+
* @param blockSizeShift blockSizeShift used to calculate blockSize.
352+
* @return returns the number of blocks in the file.
353+
*/
354+
public static int getNumberOfBlocks(long fileSize, int blockSizeShift) {
355+
return (int) getBlock(fileSize - 1, blockSizeShift) + 1;
356+
}
357+
358+
/**
359+
* Utility method get the size of the given blockId.
360+
* @param blockId blockId whose size is requested
361+
* @param blockSizeShift blockSizeShift used to calculate blockSize.
362+
* @param fileSize fileSize of the original file.
363+
* @return returns the size of the block whose blockId is passed.
364+
*/
365+
public static long getActualBlockSize(int blockId, int blockSizeShift, long fileSize) {
366+
assert blockId >= 0 : "blockId cannot be negative";
367+
return (blockId != getBlock(fileSize - 1, blockSizeShift))
368+
? getBlockSize(blockSizeShift)
369+
: getBlockOffset(fileSize - 1, blockSizeShift) + 1;
370+
}
371+
372+
/**
373+
* Utility method to a list of blockIds for a given fileSize.
374+
* @param fileSize size of the file for which blockIds are requested.
375+
* @param blockSizeShift blockSizeShift (used to calculate blockSize) used to create blocks.
376+
* @return returns a list of integers representing blockIds.
377+
*/
378+
public static List<Integer> getAllBlockIdsForFile(long fileSize, int blockSizeShift) {
379+
return IntStream.rangeClosed(0, getNumberOfBlocks(fileSize, blockSizeShift) - 1).boxed().collect(Collectors.toList());
380+
}
381+
382+
/**
383+
* Utility method to validate if a given fileName is a blockFileName.
384+
* @param fileName fileName to check
385+
* @return returns true if the passed fileName is a valid block file name.
386+
*/
387+
public static boolean isBlockFilename(String fileName) {
388+
return fileName.contains("_block_");
389+
}
390+
391+
/**
392+
* Utility method to generate block file name for a given fileName and blockId as per naming convention.
393+
* @param fileName fileName whose block file name is required
394+
* @param blockId blockId of the file whose block file name is required
395+
* @return returns the blockFileName
396+
*/
397+
public static String getBlockFileName(String fileName, int blockId) {
398+
return fileName + "_block_" + blockId;
399+
}
400+
401+
/**
402+
* Utility method to get the original file name given the block file name. .
403+
* @param blockFileName name of the block file whose original file name is required.
404+
* @return returns the original file name, No op if blockFileName is not a valid name for a block file.
405+
*/
406+
public static String getFileNameFromBlockFileName(String blockFileName) {
407+
return blockFileName.contains("_block_") ? blockFileName.substring(0, blockFileName.indexOf("_block_")) : blockFileName;
408+
}
409+
305410
/**
306411
* Seek to a block position, download the block if it's necessary
307412
* NOTE: the pos should be an adjusted position for slices
@@ -341,7 +446,7 @@ private void demandBlock(int blockId) throws IOException {
341446
currentBlockId = blockId;
342447
}
343448

344-
protected void cloneBlock(OnDemandBlockIndexInput other) {
449+
protected void cloneBlock(AbstractBlockIndexInput other) {
345450
if (other.blockHolder.block != null) {
346451
this.blockHolder.set(other.blockHolder.block.clone());
347452
this.currentBlockId = other.currentBlockId;
@@ -373,64 +478,69 @@ public static Builder builder() {
373478
}
374479

375480
/**
376-
* Builder for {@link OnDemandBlockIndexInput}. The default block size is 8MiB
481+
* Builder for {@link AbstractBlockIndexInput}. The default block size is 8MiB
377482
* (see {@link Builder#DEFAULT_BLOCK_SIZE_SHIFT}).
378483
*/
379-
public static class Builder {
484+
public static class Builder<T extends Builder<T>> {
380485
// Block size shift (default value is 23 == 2^23 == 8MiB)
381486
public static final int DEFAULT_BLOCK_SIZE_SHIFT = 23;
382487
public static final int DEFAULT_BLOCK_SIZE = 1 << DEFAULT_BLOCK_SIZE_SHIFT;;
383488

384-
private String resourceDescription;
385-
private boolean isClone;
386-
private long offset;
387-
private long length;
388-
private int blockSizeShift = DEFAULT_BLOCK_SIZE_SHIFT;
389-
private int blockSize = 1 << blockSizeShift;
390-
private int blockMask = blockSize - 1;
489+
protected String resourceDescription;
490+
protected boolean isClone;
491+
protected long offset;
492+
protected long length;
493+
protected int blockSizeShift = DEFAULT_BLOCK_SIZE_SHIFT;
494+
protected int blockSize = 1 << blockSizeShift;
495+
protected int blockMask = blockSize - 1;
391496

392-
private Builder() {}
497+
protected Builder() {}
498+
499+
@SuppressWarnings("unchecked")
500+
protected final T self() {
501+
return (T) this;
502+
}
393503

394-
public Builder resourceDescription(String resourceDescription) {
395-
this.resourceDescription = resourceDescription;
396-
return this;
504+
public T resourceDescription(String resourceDescription) {
505+
this.resourceDescription = Objects.requireNonNull(resourceDescription, "Resource description cannot be null");
506+
return self();
397507
}
398508

399-
public Builder isClone(boolean clone) {
400-
isClone = clone;
401-
return this;
509+
public T isClone(boolean clone) {
510+
this.isClone = clone;
511+
return self();
402512
}
403513

404-
public Builder offset(long offset) {
514+
public T offset(long offset) {
405515
this.offset = offset;
406-
return this;
516+
return self();
407517
}
408518

409-
public Builder length(long length) {
519+
public T length(long length) {
410520
this.length = length;
411-
return this;
521+
return self();
412522
}
413523

414-
public Builder blockSizeShift(int blockSizeShift) {
524+
public T blockSizeShift(int blockSizeShift) {
415525
assert blockSizeShift < 31 : "blockSizeShift must be < 31";
416526
this.blockSizeShift = blockSizeShift;
417527
this.blockSize = 1 << blockSizeShift;
418528
this.blockMask = blockSize - 1;
419-
return this;
529+
return self();
420530
}
421531
}
422532

423533
/**
424534
* Simple class to hold the currently open IndexInput backing an instance
425-
* of an {@link OnDemandBlockIndexInput}. Lucene may clone one of these
535+
* of an {@link AbstractBlockIndexInput}. Lucene may clone one of these
426536
* instances, and per the contract[1], the clones will never be closed.
427537
* However, closing the instances is critical for our reference counting.
428538
* Therefore, we are using the {@link Cleaner} mechanism from the JDK to
429539
* close these clones when they become phantom reachable. The clean action
430-
* must not hold a reference to the {@link OnDemandBlockIndexInput} itself
540+
* must not hold a reference to the {@link AbstractBlockIndexInput} itself
431541
* (otherwise it would never become phantom reachable!) so we need a wrapper
432542
* instance to hold the current underlying IndexInput, while allowing it to
433-
* be changed out with different instances as {@link OnDemandBlockIndexInput}
543+
* be changed out with different instances as {@link AbstractBlockIndexInput}
434544
* reads through the data.
435545
* <p>
436546
* This class implements {@link Runnable} so that it can be passed directly

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import java.util.List;
2222

2323
/**
24-
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
24+
* This is an implementation of {@link AbstractBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
2525
* <br>
2626
* This class rely on {@link TransferManager} to really fetch the snapshot files from the remote blob store and maybe cache them
2727
*
2828
* @opensearch.internal
2929
*/
30-
public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
30+
public class OnDemandBlockSnapshotIndexInput extends AbstractBlockIndexInput {
3131
private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class);
3232
/**
3333
* Where this class fetches IndexInput parts from
@@ -89,15 +89,15 @@ public OnDemandBlockSnapshotIndexInput(
8989
TransferManager transferManager
9090
) {
9191
this(
92-
OnDemandBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length),
92+
AbstractBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length),
9393
fileInfo,
9494
directory,
9595
transferManager
9696
);
9797
}
9898

9999
protected OnDemandBlockSnapshotIndexInput(
100-
OnDemandBlockIndexInput.Builder builder,
100+
AbstractBlockIndexInput.Builder builder,
101101
FileInfo fileInfo,
102102
FSDirectory directory,
103103
TransferManager transferManager
@@ -122,7 +122,7 @@ protected OnDemandBlockSnapshotIndexInput(
122122
@Override
123123
protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, long offset, long length) {
124124
return new OnDemandBlockSnapshotIndexInput(
125-
OnDemandBlockIndexInput.builder()
125+
AbstractBlockIndexInput.builder()
126126
.blockSizeShift(blockSizeShift)
127127
.isClone(true)
128128
.offset(this.offset + offset)
@@ -137,10 +137,10 @@ protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, lo
137137
@Override
138138
protected IndexInput fetchBlock(int blockId) throws IOException {
139139
logger.trace("fetchBlock called with blockId -> {}", blockId);
140-
final String blockFileName = fileName + "_block_" + blockId;
140+
final String blockFileName = getBlockFileName(fileName, blockId);
141141

142142
final long blockStart = getBlockStart(blockId);
143-
final long blockEnd = blockStart + getActualBlockSize(blockId);
143+
final long blockEnd = blockStart + getActualBlockSize(blockId, blockSizeShift, originalFileSize);
144144
logger.trace(
145145
"File: {} , Block File: {} , BlockStart: {} , BlockEnd: {} , OriginalFileSize: {}",
146146
fileName,
@@ -196,8 +196,4 @@ public OnDemandBlockSnapshotIndexInput clone() {
196196
clone.cloneBlock(this);
197197
return clone;
198198
}
199-
200-
protected long getActualBlockSize(int blockId) {
201-
return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1;
202-
}
203199
}

0 commit comments

Comments
 (0)