Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135235.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135235
summary: Fix systemd notify to use a shared arena
area: Infra/Node Lifecycle
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public Zstd getZstd() {
}

@Override
public CloseableByteBuffer newBuffer(int len) {
public CloseableByteBuffer newBuffer(int len, boolean shared) {
assert len > 0;
return javaLib.newBuffer(len);
return javaLib.newBuffer(len, shared);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ record Arch(
this.systemd = null; // not running under systemd
} else {
logger.debug("Systemd socket path: {}", socketPath);
var buffer = newBuffer(64);
var buffer = newBuffer(64, true);
this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ default WindowsFunctions getWindowsFunctions() {
* @param len the number of bytes the buffer should allocate
* @return the buffer
*/
CloseableByteBuffer newBuffer(int len);
CloseableByteBuffer newBuffer(int len, boolean shared);

/**
* Possible stats for execution filtering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Zstd getZstd() {
}

@Override
public CloseableByteBuffer newBuffer(int len) {
public CloseableByteBuffer newBuffer(int len, boolean shared) {
logger.warn("cannot allocate buffer because native access is not available");
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer {
final MemorySegment segment;
private final ByteBuffer bufferView;

JdkCloseableByteBuffer(int len) {
this.arena = Arena.ofConfined();
JdkCloseableByteBuffer(int len, boolean shared) {
this.arena = shared ? Arena.ofShared() : Arena.ofConfined();
this.segment = arena.allocate(len);
this.bufferView = segment.asByteBuffer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import org.elasticsearch.nativeaccess.lib.JavaLibrary;

class JdkJavaLibrary implements JavaLibrary {

@Override
public CloseableByteBuffer newBuffer(int len) {
return new JdkCloseableByteBuffer(len);
public CloseableByteBuffer newBuffer(int len, boolean shared) {
return new JdkCloseableByteBuffer(len, shared);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
import org.elasticsearch.nativeaccess.CloseableByteBuffer;

public non-sealed interface JavaLibrary extends NativeLibrary {
CloseableByteBuffer newBuffer(int len);
CloseableByteBuffer newBuffer(int len, boolean shared);
Copy link
Member

@rjernst rjernst Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of passing in a boolean, could we separate these two cases into two different methods? That is, have newSharedBuffer and newConfinedBuffer? I think this better matches the model of the jdk with ofShared() and ofConfined. The internal implementation can still use a flag, but the api surface area would be clearly distinct and easily understandable without remember what that flag means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this.

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testCompressBound() {
}

public void testCompressValidation() {
try (var src = nativeAccess.newBuffer(1000); var dst = nativeAccess.newBuffer(500)) {
try (var src = nativeAccess.newBuffer(1000, false); var dst = nativeAccess.newBuffer(500, false)) {
var srcBuf = src.buffer();
var dstBuf = dst.buffer();

Expand All @@ -58,9 +58,9 @@ public void testCompressValidation() {

public void testDecompressValidation() {
try (
var original = nativeAccess.newBuffer(1000);
var compressed = nativeAccess.newBuffer(500);
var restored = nativeAccess.newBuffer(500)
var original = nativeAccess.newBuffer(1000, false);
var compressed = nativeAccess.newBuffer(500, false);
var restored = nativeAccess.newBuffer(500, false)
) {
var originalBuf = original.buffer();
var compressedBuf = compressed.buffer();
Expand Down Expand Up @@ -105,9 +105,9 @@ public void testCycle() {

private void doTestRoundtrip(byte[] data) {
try (
var original = nativeAccess.newBuffer(data.length);
var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length));
var restored = nativeAccess.newBuffer(data.length)
var original = nativeAccess.newBuffer(data.length, false);
var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length), false);
var restored = nativeAccess.newBuffer(data.length, false)
) {
original.buffer().put(0, data);
int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9));
Expand All @@ -121,9 +121,9 @@ private void doTestRoundtrip(byte[] data) {
final int compressedOffset = randomIntBetween(1, 1000);
final int decompressedOffset = randomIntBetween(1, 1000);
try (
var original = nativeAccess.newBuffer(decompressedOffset + data.length);
var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length));
var restored = nativeAccess.newBuffer(decompressedOffset + data.length)
var original = nativeAccess.newBuffer(decompressedOffset + data.length, false);
var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length), false);
var restored = nativeAccess.newBuffer(decompressedOffset + data.length, false)
) {
original.buffer().put(decompressedOffset, data);
original.buffer().position(decompressedOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ public void decompress(DataInput in, int originalLength, int offset, int length,
final int compressedLength = in.readVInt();

try (
CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength);
CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength)
CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength, false);
CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength, false)
) {

while (src.buffer().position() < compressedLength) {
Expand Down Expand Up @@ -193,8 +193,8 @@ public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws I
// identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into
// reusing compression contexts, which are not small and would increase permanent memory usage as well.
try (
CloseableByteBuffer src = nativeAccess.newBuffer(srcLen);
CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound)
CloseableByteBuffer src = nativeAccess.newBuffer(srcLen, false);
CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound, false)
) {

while (buffersInput.position() < buffersInput.length()) {
Expand Down