|
25 | 25 | import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; |
26 | 26 | import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; |
27 | 27 | import org.apache.pinot.segment.spi.memory.CleanerUtil; |
| 28 | +import org.slf4j.Logger; |
| 29 | +import org.slf4j.LoggerFactory; |
28 | 30 |
|
29 | 31 |
|
30 | 32 | /** |
|
39 | 41 | * </ul> |
40 | 42 | */ |
41 | 43 | public class ChunkReaderContext implements ForwardIndexReaderContext { |
| 44 | + private static final Logger LOGGER = LoggerFactory.getLogger(ChunkReaderContext.class); |
| 45 | + |
| 46 | + private static final boolean USE_HEAP_FOR_LARGE_CHUNKS; |
| 47 | + private static final long MAX_DIRECT_BUFFER_CHUNK_SIZE; |
| 48 | + // default max direct buffer threshold size: 2MB |
| 49 | + private static final long DEFAULT_MAX_DIRECT_BUFFER_CHUNK_SIZE = 2 * 1024 * 1024L; |
| 50 | + |
42 | 51 | private final ByteBuffer _chunkBuffer; |
43 | 52 |
|
44 | 53 | private int _chunkId; |
45 | 54 |
|
46 | 55 | private List<ForwardIndexReader.ByteRange> _ranges; |
47 | 56 |
|
| 57 | + static { |
| 58 | + USE_HEAP_FOR_LARGE_CHUNKS = Boolean.parseBoolean(System.getProperty("useHeapForLargeChunks", "false")); |
| 59 | + MAX_DIRECT_BUFFER_CHUNK_SIZE = Long.parseLong(System.getProperty("maxDirectBufferChunkSize", |
| 60 | + Long.toString(DEFAULT_MAX_DIRECT_BUFFER_CHUNK_SIZE))); |
| 61 | + LOGGER.info("useHeapForLargeChunks: {}, maxDirectBufferChunkSize: {}", |
| 62 | + USE_HEAP_FOR_LARGE_CHUNKS, MAX_DIRECT_BUFFER_CHUNK_SIZE); |
| 63 | + } |
| 64 | + |
48 | 65 | public ChunkReaderContext(int maxChunkSize) { |
49 | | - _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize); |
| 66 | + if (!USE_HEAP_FOR_LARGE_CHUNKS || maxChunkSize < MAX_DIRECT_BUFFER_CHUNK_SIZE) { |
| 67 | + _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize); |
| 68 | + } else { |
| 69 | + _chunkBuffer = ByteBuffer.allocate(maxChunkSize); |
| 70 | + } |
50 | 71 | _chunkId = -1; |
51 | 72 | _ranges = new ArrayList<>(); |
52 | 73 | } |
53 | 74 |
|
54 | 75 | @Override |
55 | 76 | public void close() |
56 | 77 | throws IOException { |
57 | | - if (CleanerUtil.UNMAP_SUPPORTED) { |
| 78 | + if (CleanerUtil.UNMAP_SUPPORTED && _chunkBuffer.isDirect()) { |
58 | 79 | CleanerUtil.getCleaner().freeBuffer(_chunkBuffer); |
59 | 80 | } |
60 | 81 | } |
|
0 commit comments