diff --git a/docs/changelog/135235.yaml b/docs/changelog/135235.yaml new file mode 100644 index 0000000000000..1eff0516ba455 --- /dev/null +++ b/docs/changelog/135235.yaml @@ -0,0 +1,5 @@ +pr: 135235 +summary: Fix systemd notify to use a shared arena +area: Infra/Node Lifecycle +type: bug +issues: [] diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java index 677563194704c..ae0e4ea3be41a 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java @@ -46,9 +46,15 @@ public Zstd getZstd() { } @Override - public CloseableByteBuffer newBuffer(int len) { + public CloseableByteBuffer newSharedBuffer(int len) { assert len > 0; - return javaLib.newBuffer(len); + return javaLib.newSharedBuffer(len); + } + + @Override + public CloseableByteBuffer newConfinedBuffer(int len) { + assert len > 0; + return javaLib.newConfinedBuffer(len); } @Override diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java index b174c0c531714..09fee02832b89 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java @@ -98,7 +98,7 @@ record Arch( this.systemd = null; // not running under systemd } else { logger.debug("Systemd socket path: {}", socketPath); - var buffer = newBuffer(64); + var buffer = newSharedBuffer(64); this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer); } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java index 142171c5bbfcc..d4404cb56949f 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java @@ -88,12 +88,20 @@ default WindowsFunctions getWindowsFunctions() { Optional getVectorSimilarityFunctions(); /** - * Creates a new {@link CloseableByteBuffer}. The buffer must be used within the same thread - * that it is created. + * Creates a new {@link CloseableByteBuffer} using a shared arena. The buffer can be used + * across multiple threads. * @param len the number of bytes the buffer should allocate * @return the buffer */ - CloseableByteBuffer newBuffer(int len); + CloseableByteBuffer newSharedBuffer(int len); + + /** + * Creates a new {@link CloseableByteBuffer} using a confined arena. The buffer must be + * used within the same thread that it is created. + * @param len the number of bytes the buffer should allocate + * @return the buffer + */ + CloseableByteBuffer newConfinedBuffer(int len); /** * Possible stats for execution filtering. diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java index 5eaaae6905791..677d6169cb55b 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java @@ -78,7 +78,13 @@ public Zstd getZstd() { } @Override - public CloseableByteBuffer newBuffer(int len) { + public CloseableByteBuffer newSharedBuffer(int len) { + logger.warn("cannot allocate buffer because native access is not available"); + return null; + } + + @Override + public CloseableByteBuffer newConfinedBuffer(int len) { logger.warn("cannot allocate buffer because native access is not available"); return null; } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java index 56b5b7c8beae4..50a9a1f9e522c 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java @@ -20,8 +20,16 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer { final MemorySegment segment; private final ByteBuffer bufferView; - JdkCloseableByteBuffer(int len) { - this.arena = Arena.ofConfined(); + static JdkCloseableByteBuffer ofShared(int len) { + return new JdkCloseableByteBuffer(len, true); + } + + static JdkCloseableByteBuffer ofConfined(int len) { + return new JdkCloseableByteBuffer(len, false); + } + + private JdkCloseableByteBuffer(int len, boolean shared) { + this.arena = shared ? Arena.ofShared() : Arena.ofConfined(); this.segment = arena.allocate(len); this.bufferView = segment.asByteBuffer(); } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java index 0a67fc70737e6..14ff184ca02cd 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java @@ -13,8 +13,14 @@ import org.elasticsearch.nativeaccess.lib.JavaLibrary; class JdkJavaLibrary implements JavaLibrary { + + @Override + public CloseableByteBuffer newSharedBuffer(int len) { + return JdkCloseableByteBuffer.ofShared(len); + } + @Override - public CloseableByteBuffer newBuffer(int len) { - return new JdkCloseableByteBuffer(len); + public CloseableByteBuffer newConfinedBuffer(int len) { + return JdkCloseableByteBuffer.ofConfined(len); } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java index 258ae9923a40e..52b4a7aea2af8 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java @@ -12,5 +12,7 @@ import org.elasticsearch.nativeaccess.CloseableByteBuffer; public non-sealed interface JavaLibrary extends NativeLibrary { - CloseableByteBuffer newBuffer(int len); + CloseableByteBuffer newSharedBuffer(int len); + + CloseableByteBuffer newConfinedBuffer(int len); } diff --git a/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java b/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java index b6c84d02201c4..7247844af9b9d 100644 --- a/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java +++ b/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java @@ -38,7 +38,7 @@ public void testCompressBound() { } public void testCompressValidation() { - try (var src = nativeAccess.newBuffer(1000); var dst = nativeAccess.newBuffer(500)) { + try (var src = nativeAccess.newConfinedBuffer(1000); var dst = nativeAccess.newConfinedBuffer(500)) { var srcBuf = src.buffer(); var dstBuf = dst.buffer(); @@ -58,9 +58,9 @@ public void testCompressValidation() { public void testDecompressValidation() { try ( - var original = nativeAccess.newBuffer(1000); - var compressed = nativeAccess.newBuffer(500); - var restored = nativeAccess.newBuffer(500) + var original = nativeAccess.newConfinedBuffer(1000); + var compressed = nativeAccess.newConfinedBuffer(500); + var restored = nativeAccess.newConfinedBuffer(500) ) { var originalBuf = original.buffer(); var compressedBuf = compressed.buffer(); @@ -105,9 +105,9 @@ public void testCycle() { private void doTestRoundtrip(byte[] data) { try ( - var original = nativeAccess.newBuffer(data.length); - var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length)); - var restored = nativeAccess.newBuffer(data.length) + var original = nativeAccess.newConfinedBuffer(data.length); + var compressed = nativeAccess.newConfinedBuffer(zstd.compressBound(data.length)); + var restored = nativeAccess.newConfinedBuffer(data.length) ) { original.buffer().put(0, data); int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9)); @@ -121,9 +121,9 @@ private void doTestRoundtrip(byte[] data) { final int compressedOffset = randomIntBetween(1, 1000); final int decompressedOffset = randomIntBetween(1, 1000); try ( - var original = nativeAccess.newBuffer(decompressedOffset + data.length); - var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length)); - var restored = nativeAccess.newBuffer(decompressedOffset + data.length) + var original = nativeAccess.newConfinedBuffer(decompressedOffset + data.length); + var compressed = nativeAccess.newConfinedBuffer(compressedOffset + zstd.compressBound(data.length)); + var restored = nativeAccess.newConfinedBuffer(decompressedOffset + data.length) ) { original.buffer().put(decompressedOffset, data); original.buffer().position(decompressedOffset); diff --git a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java index 6aa77b7222696..131c8f8db3515 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java @@ -134,8 +134,8 @@ public void decompress(DataInput in, int originalLength, int offset, int length, final int compressedLength = in.readVInt(); try ( - CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength); - CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength) + CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength); + CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength) ) { while (src.buffer().position() < compressedLength) { @@ -193,8 +193,8 @@ public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws I // identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into // reusing compression contexts, which are not small and would increase permanent memory usage as well. try ( - CloseableByteBuffer src = nativeAccess.newBuffer(srcLen); - CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound) + CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen); + CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound) ) { while (buffersInput.position() < buffersInput.length()) {