Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions libs/core/src/main/java/org/elasticsearch/core/ObjectPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.core;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* ObjectPool to avoid repeated allocations of expensive objects such as buffers.
*
* When using virtual threads, the use of {@link ThreadLocal}s is a bad option.
* Virtual threads are single-use and not meant to be pooled, so re-use via a {@link ThreadLocal} does not work.
*/
public interface ObjectPool<T> {

static <T> ObjectPool<T> withInitial(Supplier<T> supplier, Duration timeout) {
return new HybridPool<>(supplier, timeout.toNanos());
}

static <T> ObjectPool<T> withInitial(Supplier<T> supplier) {
return withInitial(supplier, UnboundedObjectPool.DEFAULT_TIMEOUT);
}

/**
* A pooled object that can be acquired from an {@link ObjectPool}.
*
* {@link PooledObject}s are returned to the pool when closed and are expected to be used in a try-with-resources block.
* Otherwise, special attention is needed to ensure the object is returned to the pool.
*
* This interface extends {@link AutoCloseable} to allow for automatic resource management.
* @param <T>
*/
interface PooledObject<T> extends AutoCloseable {
T get();

void close();
}

PooledObject<T> acquire();

/**
* Hybrid approach of an {@link ObjectPool}
*
* If the current thread is a virtual thread, this pool will acquire a {@link PooledObject} from an unbounded pool.
* Otherwise, a {@link ThreadLocal} will be used, assuming the {@link PooledObject} is acquired on a pooled thread.
*/
final class HybridPool<T> implements ObjectPool<T> {
private static final class EsThreadLocal<T> extends ThreadLocal<T> implements PooledObject<T> {
private final Supplier<T> supplier;

EsThreadLocal(Supplier<T> supplier) {
this.supplier = supplier;
}

@Override
protected T initialValue() {
return supplier.get();
}

@Override
public void close() {
// noop
}
}

private final PooledObject<T> threadLocal;
private final UnboundedObjectPool<T> unboundedPool;

private HybridPool(Supplier<T> supplier, long timeoutNanos) {
this.threadLocal = new EsThreadLocal<>(supplier);
this.unboundedPool = new UnboundedObjectPool<>(supplier, timeoutNanos);
}

@Override
public PooledObject<T> acquire() {
if (Thread.currentThread().isVirtual()) {
return unboundedPool.acquire();
} else {
unboundedPool.checkTimeouts();
return threadLocal;
}
}
}

final class UnboundedObjectPool<T> implements ObjectPool<T> {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
private static final long TIMEOUT_CHECK_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(5);

private static final ThreadFactory VTHREAD_FACTORY = Thread.ofVirtual().name("object-pool-timeout-", 0).factory();
private static final VarHandle VAR_HANDLE;

static {
try {
VAR_HANDLE = MethodHandles.lookup().findVarHandle(UnboundedObjectPool.class, "lastTimeoutCheckNanos", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

// FIXME ArrayBlockingQueue and dynamically resize when necessary? That could minimize allocations when releasing objects
private final Queue<PooledObjectImpl> pool = new ConcurrentLinkedQueue<>();
private final Runnable timeoutChecker = this::runTimeoutCheck;

private final Supplier<T> supplier;
private final long timeoutNanos;

private volatile long lastTimeoutCheckNanos = 0;

private UnboundedObjectPool(Supplier<T> supplier, long timeoutNanos) {
this.supplier = supplier;
this.timeoutNanos = timeoutNanos;
}

public PooledObject<T> acquire() {
PooledObjectImpl obj = pool.poll();
if (obj != null) {
obj.acquire();
return obj;
}
return new PooledObjectImpl(supplier.get(), System.nanoTime());
}

private boolean hasNextTimedOut() {
var next = pool.peek();
return next != null && next.hasTimedOut();
}

private void checkTimeouts() {
long nowNanos = System.nanoTime();
long lastNanos = lastTimeoutCheckNanos;
if (nowNanos - lastNanos < TIMEOUT_CHECK_INTERVAL_NANOS) {
return; // no need to check timeouts
}

if (VAR_HANDLE.compareAndSet(this, lastNanos, nowNanos) && hasNextTimedOut()) {
// fork timeout check onto a virtual thread
VTHREAD_FACTORY.newThread(timeoutChecker).start();
}
}

private void runTimeoutCheck() {
do {
PooledObjectImpl obj = pool.poll();
if (obj == null) {
return; // no objects to check
} else if (obj.hasTimedOut() == false) {
obj.release(); // return to pool and reset release time to keep it in order
return;
}
// otherwise just drop
lastTimeoutCheckNanos = System.nanoTime();
} while (hasNextTimedOut());
}

private class PooledObjectImpl implements PooledObject<T> {
private final T object;
// if negative: release time, if positive: acquire time
private volatile long nanoTime;

PooledObjectImpl(T object, long nanoTime) {
this.object = object;
this.nanoTime = nanoTime;
}

@Override
public T get() {
assert nanoTime > 0 : "Object must be acquired before use";
return object;
}

boolean hasTimedOut() {
var current = nanoTime;
return current < 0 && System.nanoTime() + current > timeoutNanos;
}

void acquire() {
this.nanoTime = System.nanoTime();
}

void release() {
this.nanoTime = -System.nanoTime();
pool.offer(PooledObjectImpl.this);
}

@Override
public void close() {
release();
checkTimeouts();
}
}
}
}
30 changes: 18 additions & 12 deletions libs/core/src/main/java/org/elasticsearch/core/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
public class Streams {

private static final ThreadLocal<byte[]> LOCAL_BUFFER = ThreadLocal.withInitial(() -> new byte[8 * 1024]);
private static final ObjectPool<byte[]> LOCAL_BUFFER = ObjectPool.withInitial(() -> new byte[8 * 1024]);

private Streams() {

Expand Down Expand Up @@ -63,7 +63,9 @@ public static long copy(final InputStream in, final OutputStream out, byte[] buf
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
return copy(in, out, LOCAL_BUFFER.get(), close);
try (var pooledBuffer = LOCAL_BUFFER.acquire()) {
return copy(in, out, pooledBuffer.get(), close);
}
}

/**
Expand All @@ -77,7 +79,9 @@ public static long copy(final InputStream in, final OutputStream out, byte[] buf
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out) throws IOException {
return copy(in, out, LOCAL_BUFFER.get(), true);
try (var pooledBuffer = LOCAL_BUFFER.acquire()) {
return copy(in, out, pooledBuffer.get(), true);
}
}

/**
Expand Down Expand Up @@ -107,17 +111,19 @@ private static int readToHeapBuffer(InputStream input, ByteBuffer buffer, int co

private static int readToDirectBuffer(InputStream input, ByteBuffer b, int count) throws IOException {
int totalRead = 0;
final byte[] buffer = LOCAL_BUFFER.get();
while (totalRead < count) {
final int len = Math.min(count - totalRead, buffer.length);
final int read = input.read(buffer, 0, len);
if (read == -1) {
break;
try (var pooledBuffer = LOCAL_BUFFER.acquire()) {
final byte[] buffer = pooledBuffer.get();
while (totalRead < count) {
final int len = Math.min(count - totalRead, buffer.length);
final int read = input.read(buffer, 0, len);
if (read == -1) {
break;
}
b.put(buffer, 0, read);
totalRead += read;
}
b.put(buffer, 0, read);
totalRead += read;
return totalRead;
}
return totalRead;
}

public static int readFully(InputStream reader, byte[] dest) throws IOException {
Expand Down
1 change: 1 addition & 0 deletions libs/lz4/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

module org.elasticsearch.lz4 {
requires org.lz4.java;
requires org.elasticsearch.base;

exports org.elasticsearch.lz4;
}
Loading