From 215ae93db76582e11f196031a658f87c323c654d Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Mon, 22 Sep 2025 14:13:40 -0700 Subject: [PATCH 1/3] Fix systemd notify to use a shared arena --- .../nativeaccess/AbstractNativeAccess.java | 4 ++-- .../nativeaccess/LinuxNativeAccess.java | 2 +- .../nativeaccess/NativeAccess.java | 2 +- .../nativeaccess/NoopNativeAccess.java | 2 +- .../jdk/JdkCloseableByteBuffer.java | 4 ++-- .../nativeaccess/jdk/JdkJavaLibrary.java | 5 +++-- .../nativeaccess/lib/JavaLibrary.java | 2 +- .../elasticsearch/nativeaccess/ZstdTests.java | 20 +++++++++---------- .../codec/zstd/Zstd814StoredFieldsFormat.java | 8 ++++---- 9 files changed, 25 insertions(+), 24 deletions(-) diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java index 677563194704c..e842897c7ba93 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java @@ -46,9 +46,9 @@ public Zstd getZstd() { } @Override - public CloseableByteBuffer newBuffer(int len) { + public CloseableByteBuffer newBuffer(int len, boolean shared) { assert len > 0; - return javaLib.newBuffer(len); + return javaLib.newBuffer(len, shared); } @Override diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java index b174c0c531714..f5e5b0c4ecc02 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java @@ -98,7 +98,7 @@ record Arch( this.systemd = null; // not running under systemd } else { logger.debug("Systemd socket path: {}", socketPath); - var buffer = newBuffer(64); + var buffer = newBuffer(64, true); this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer); } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java index 142171c5bbfcc..e4282e1dacb90 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java @@ -93,7 +93,7 @@ default WindowsFunctions getWindowsFunctions() { * @param len the number of bytes the buffer should allocate * @return the buffer */ - CloseableByteBuffer newBuffer(int len); + CloseableByteBuffer newBuffer(int len, boolean shared); /** * Possible stats for execution filtering. diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java index 5eaaae6905791..7b5e7a739919e 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java @@ -78,7 +78,7 @@ public Zstd getZstd() { } @Override - public CloseableByteBuffer newBuffer(int len) { + public CloseableByteBuffer newBuffer(int len, boolean shared) { logger.warn("cannot allocate buffer because native access is not available"); return null; } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java index 56b5b7c8beae4..eba12015ee847 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java @@ -20,8 +20,8 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer { final MemorySegment segment; private final ByteBuffer bufferView; - JdkCloseableByteBuffer(int len) { - this.arena = Arena.ofConfined(); + JdkCloseableByteBuffer(int len, boolean shared) { + this.arena = shared ? Arena.ofShared() : Arena.ofConfined(); this.segment = arena.allocate(len); this.bufferView = segment.asByteBuffer(); } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java index 0a67fc70737e6..19d81c1282cd2 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java @@ -13,8 +13,9 @@ import org.elasticsearch.nativeaccess.lib.JavaLibrary; class JdkJavaLibrary implements JavaLibrary { + @Override - public CloseableByteBuffer newBuffer(int len) { - return new JdkCloseableByteBuffer(len); + public CloseableByteBuffer newBuffer(int len, boolean shared) { + return new JdkCloseableByteBuffer(len, shared); } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java index 258ae9923a40e..c4ba930566f0c 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java @@ -12,5 +12,5 @@ import org.elasticsearch.nativeaccess.CloseableByteBuffer; public non-sealed interface JavaLibrary extends NativeLibrary { - CloseableByteBuffer newBuffer(int len); + CloseableByteBuffer newBuffer(int len, boolean shared); } diff --git a/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java b/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java index b6c84d02201c4..cd2d2bf00e57a 100644 --- a/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java +++ b/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java @@ -38,7 +38,7 @@ public void testCompressBound() { } public void testCompressValidation() { - try (var src = nativeAccess.newBuffer(1000); var dst = nativeAccess.newBuffer(500)) { + try (var src = nativeAccess.newBuffer(1000, false); var dst = nativeAccess.newBuffer(500, false)) { var srcBuf = src.buffer(); var dstBuf = dst.buffer(); @@ -58,9 +58,9 @@ public void testCompressValidation() { public void testDecompressValidation() { try ( - var original = nativeAccess.newBuffer(1000); - var compressed = nativeAccess.newBuffer(500); - var restored = nativeAccess.newBuffer(500) + var original = nativeAccess.newBuffer(1000, false); + var compressed = nativeAccess.newBuffer(500, false); + var restored = nativeAccess.newBuffer(500, false) ) { var originalBuf = original.buffer(); var compressedBuf = compressed.buffer(); @@ -105,9 +105,9 @@ public void testCycle() { private void doTestRoundtrip(byte[] data) { try ( - var original = nativeAccess.newBuffer(data.length); - var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length)); - var restored = nativeAccess.newBuffer(data.length) + var original = nativeAccess.newBuffer(data.length, false); + var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length), false); + var restored = nativeAccess.newBuffer(data.length, false) ) { original.buffer().put(0, data); int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9)); @@ -121,9 +121,9 @@ private void doTestRoundtrip(byte[] data) { final int compressedOffset = randomIntBetween(1, 1000); final int decompressedOffset = randomIntBetween(1, 1000); try ( - var original = nativeAccess.newBuffer(decompressedOffset + data.length); - var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length)); - var restored = nativeAccess.newBuffer(decompressedOffset + data.length) + var original = nativeAccess.newBuffer(decompressedOffset + data.length, false); + var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length), false); + var restored = nativeAccess.newBuffer(decompressedOffset + data.length, false) ) { original.buffer().put(decompressedOffset, data); original.buffer().position(decompressedOffset); diff --git a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java index fcd135658e0e9..976837611977d 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java @@ -134,8 +134,8 @@ public void decompress(DataInput in, int originalLength, int offset, int length, final int compressedLength = in.readVInt(); try ( - CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength); - CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength) + CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength, false); + CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength, false) ) { while (src.buffer().position() < compressedLength) { @@ -193,8 +193,8 @@ public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws I // identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into // reusing compression contexts, which are not small and would increase permanent memory usage as well. try ( - CloseableByteBuffer src = nativeAccess.newBuffer(srcLen); - CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound) + CloseableByteBuffer src = nativeAccess.newBuffer(srcLen, false); + CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound, false) ) { while (buffersInput.position() < buffersInput.length()) { From 1730f7b2a0d6ed4fde6f269b125a0a3aa2d034d3 Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Mon, 22 Sep 2025 14:35:37 -0700 Subject: [PATCH 2/3] Update docs/changelog/135235.yaml --- docs/changelog/135235.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/135235.yaml diff --git a/docs/changelog/135235.yaml b/docs/changelog/135235.yaml new file mode 100644 index 0000000000000..1eff0516ba455 --- /dev/null +++ b/docs/changelog/135235.yaml @@ -0,0 +1,5 @@ +pr: 135235 +summary: Fix systemd notify to use a shared arena +area: Infra/Node Lifecycle +type: bug +issues: [] From 647aafee870d9554287bfa78699c9f3f06d6781e Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Mon, 22 Sep 2025 16:33:29 -0700 Subject: [PATCH 3/3] response to pr comments --- .../nativeaccess/AbstractNativeAccess.java | 10 ++++++++-- .../nativeaccess/LinuxNativeAccess.java | 2 +- .../nativeaccess/NativeAccess.java | 14 ++++++++++--- .../nativeaccess/NoopNativeAccess.java | 8 +++++++- .../jdk/JdkCloseableByteBuffer.java | 10 +++++++++- .../nativeaccess/jdk/JdkJavaLibrary.java | 9 +++++++-- .../nativeaccess/lib/JavaLibrary.java | 4 +++- .../elasticsearch/nativeaccess/ZstdTests.java | 20 +++++++++---------- .../codec/zstd/Zstd814StoredFieldsFormat.java | 8 ++++---- 9 files changed, 60 insertions(+), 25 deletions(-) diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java index e842897c7ba93..ae0e4ea3be41a 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java @@ -46,9 +46,15 @@ public Zstd getZstd() { } @Override - public CloseableByteBuffer newBuffer(int len, boolean shared) { + public CloseableByteBuffer newSharedBuffer(int len) { assert len > 0; - return javaLib.newBuffer(len, shared); + return javaLib.newSharedBuffer(len); + } + + @Override + public CloseableByteBuffer newConfinedBuffer(int len) { + assert len > 0; + return javaLib.newConfinedBuffer(len); } @Override diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java index f5e5b0c4ecc02..09fee02832b89 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java @@ -98,7 +98,7 @@ record Arch( this.systemd = null; // not running under systemd } else { logger.debug("Systemd socket path: {}", socketPath); - var buffer = newBuffer(64, true); + var buffer = newSharedBuffer(64); this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer); } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java index e4282e1dacb90..d4404cb56949f 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java @@ -88,12 +88,20 @@ default WindowsFunctions getWindowsFunctions() { Optional getVectorSimilarityFunctions(); /** - * Creates a new {@link CloseableByteBuffer}. The buffer must be used within the same thread - * that it is created. + * Creates a new {@link CloseableByteBuffer} using a shared arena. The buffer can be used + * across multiple threads. * @param len the number of bytes the buffer should allocate * @return the buffer */ - CloseableByteBuffer newBuffer(int len, boolean shared); + CloseableByteBuffer newSharedBuffer(int len); + + /** + * Creates a new {@link CloseableByteBuffer} using a confined arena. The buffer must be + * used within the same thread that it is created. + * @param len the number of bytes the buffer should allocate + * @return the buffer + */ + CloseableByteBuffer newConfinedBuffer(int len); /** * Possible stats for execution filtering. diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java index 7b5e7a739919e..677d6169cb55b 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java @@ -78,7 +78,13 @@ public Zstd getZstd() { } @Override - public CloseableByteBuffer newBuffer(int len, boolean shared) { + public CloseableByteBuffer newSharedBuffer(int len) { + logger.warn("cannot allocate buffer because native access is not available"); + return null; + } + + @Override + public CloseableByteBuffer newConfinedBuffer(int len) { logger.warn("cannot allocate buffer because native access is not available"); return null; } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java index eba12015ee847..50a9a1f9e522c 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java @@ -20,7 +20,15 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer { final MemorySegment segment; private final ByteBuffer bufferView; - JdkCloseableByteBuffer(int len, boolean shared) { + static JdkCloseableByteBuffer ofShared(int len) { + return new JdkCloseableByteBuffer(len, true); + } + + static JdkCloseableByteBuffer ofConfined(int len) { + return new JdkCloseableByteBuffer(len, false); + } + + private JdkCloseableByteBuffer(int len, boolean shared) { this.arena = shared ? Arena.ofShared() : Arena.ofConfined(); this.segment = arena.allocate(len); this.bufferView = segment.asByteBuffer(); diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java index 19d81c1282cd2..14ff184ca02cd 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java @@ -15,7 +15,12 @@ class JdkJavaLibrary implements JavaLibrary { @Override - public CloseableByteBuffer newBuffer(int len, boolean shared) { - return new JdkCloseableByteBuffer(len, shared); + public CloseableByteBuffer newSharedBuffer(int len) { + return JdkCloseableByteBuffer.ofShared(len); + } + + @Override + public CloseableByteBuffer newConfinedBuffer(int len) { + return JdkCloseableByteBuffer.ofConfined(len); } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java index c4ba930566f0c..52b4a7aea2af8 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java @@ -12,5 +12,7 @@ import org.elasticsearch.nativeaccess.CloseableByteBuffer; public non-sealed interface JavaLibrary extends NativeLibrary { - CloseableByteBuffer newBuffer(int len, boolean shared); + CloseableByteBuffer newSharedBuffer(int len); + + CloseableByteBuffer newConfinedBuffer(int len); } diff --git a/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java b/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java index cd2d2bf00e57a..7247844af9b9d 100644 --- a/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java +++ b/libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java @@ -38,7 +38,7 @@ public void testCompressBound() { } public void testCompressValidation() { - try (var src = nativeAccess.newBuffer(1000, false); var dst = nativeAccess.newBuffer(500, false)) { + try (var src = nativeAccess.newConfinedBuffer(1000); var dst = nativeAccess.newConfinedBuffer(500)) { var srcBuf = src.buffer(); var dstBuf = dst.buffer(); @@ -58,9 +58,9 @@ public void testCompressValidation() { public void testDecompressValidation() { try ( - var original = nativeAccess.newBuffer(1000, false); - var compressed = nativeAccess.newBuffer(500, false); - var restored = nativeAccess.newBuffer(500, false) + var original = nativeAccess.newConfinedBuffer(1000); + var compressed = nativeAccess.newConfinedBuffer(500); + var restored = nativeAccess.newConfinedBuffer(500) ) { var originalBuf = original.buffer(); var compressedBuf = compressed.buffer(); @@ -105,9 +105,9 @@ public void testCycle() { private void doTestRoundtrip(byte[] data) { try ( - var original = nativeAccess.newBuffer(data.length, false); - var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length), false); - var restored = nativeAccess.newBuffer(data.length, false) + var original = nativeAccess.newConfinedBuffer(data.length); + var compressed = nativeAccess.newConfinedBuffer(zstd.compressBound(data.length)); + var restored = nativeAccess.newConfinedBuffer(data.length) ) { original.buffer().put(0, data); int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9)); @@ -121,9 +121,9 @@ private void doTestRoundtrip(byte[] data) { final int compressedOffset = randomIntBetween(1, 1000); final int decompressedOffset = randomIntBetween(1, 1000); try ( - var original = nativeAccess.newBuffer(decompressedOffset + data.length, false); - var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length), false); - var restored = nativeAccess.newBuffer(decompressedOffset + data.length, false) + var original = nativeAccess.newConfinedBuffer(decompressedOffset + data.length); + var compressed = nativeAccess.newConfinedBuffer(compressedOffset + zstd.compressBound(data.length)); + var restored = nativeAccess.newConfinedBuffer(decompressedOffset + data.length) ) { original.buffer().put(decompressedOffset, data); original.buffer().position(decompressedOffset); diff --git a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java index 976837611977d..f40ed5baf74d6 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java @@ -134,8 +134,8 @@ public void decompress(DataInput in, int originalLength, int offset, int length, final int compressedLength = in.readVInt(); try ( - CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength, false); - CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength, false) + CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength); + CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength) ) { while (src.buffer().position() < compressedLength) { @@ -193,8 +193,8 @@ public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws I // identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into // reusing compression contexts, which are not small and would increase permanent memory usage as well. try ( - CloseableByteBuffer src = nativeAccess.newBuffer(srcLen, false); - CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound, false) + CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen); + CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound) ) { while (buffersInput.position() < buffersInput.length()) {