Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135235.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135235
summary: Fix systemd notify to use a shared arena
area: Infra/Node Lifecycle
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ public Zstd getZstd() {
}

@Override
public CloseableByteBuffer newBuffer(int len) {
public CloseableByteBuffer newSharedBuffer(int len) {
assert len > 0;
return javaLib.newBuffer(len);
return javaLib.newSharedBuffer(len);
}

@Override
public CloseableByteBuffer newConfinedBuffer(int len) {
assert len > 0;
return javaLib.newConfinedBuffer(len);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ record Arch(
this.systemd = null; // not running under systemd
} else {
logger.debug("Systemd socket path: {}", socketPath);
var buffer = newBuffer(64);
var buffer = newSharedBuffer(64);
this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,20 @@ default WindowsFunctions getWindowsFunctions() {
Optional<VectorSimilarityFunctions> getVectorSimilarityFunctions();

/**
* Creates a new {@link CloseableByteBuffer}. The buffer must be used within the same thread
* that it is created.
* Creates a new {@link CloseableByteBuffer} using a shared arena. The buffer can be used
* across multiple threads.
* @param len the number of bytes the buffer should allocate
* @return the buffer
*/
CloseableByteBuffer newBuffer(int len);
CloseableByteBuffer newSharedBuffer(int len);

/**
* Creates a new {@link CloseableByteBuffer} using a confined arena. The buffer must be
* used within the same thread that it is created.
* @param len the number of bytes the buffer should allocate
* @return the buffer
*/
CloseableByteBuffer newConfinedBuffer(int len);

/**
* Possible stats for execution filtering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ public Zstd getZstd() {
}

@Override
public CloseableByteBuffer newBuffer(int len) {
public CloseableByteBuffer newSharedBuffer(int len) {
logger.warn("cannot allocate buffer because native access is not available");
return null;
}

@Override
public CloseableByteBuffer newConfinedBuffer(int len) {
logger.warn("cannot allocate buffer because native access is not available");
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer {
final MemorySegment segment;
private final ByteBuffer bufferView;

JdkCloseableByteBuffer(int len) {
this.arena = Arena.ofConfined();
static JdkCloseableByteBuffer ofShared(int len) {
return new JdkCloseableByteBuffer(len, true);
}

static JdkCloseableByteBuffer ofConfined(int len) {
return new JdkCloseableByteBuffer(len, false);
}

private JdkCloseableByteBuffer(int len, boolean shared) {
this.arena = shared ? Arena.ofShared() : Arena.ofConfined();
this.segment = arena.allocate(len);
this.bufferView = segment.asByteBuffer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
import org.elasticsearch.nativeaccess.lib.JavaLibrary;

class JdkJavaLibrary implements JavaLibrary {

@Override
public CloseableByteBuffer newSharedBuffer(int len) {
return JdkCloseableByteBuffer.ofShared(len);
}

@Override
public CloseableByteBuffer newBuffer(int len) {
return new JdkCloseableByteBuffer(len);
public CloseableByteBuffer newConfinedBuffer(int len) {
return JdkCloseableByteBuffer.ofConfined(len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@
import org.elasticsearch.nativeaccess.CloseableByteBuffer;

public non-sealed interface JavaLibrary extends NativeLibrary {
CloseableByteBuffer newBuffer(int len);
CloseableByteBuffer newSharedBuffer(int len);

CloseableByteBuffer newConfinedBuffer(int len);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testCompressBound() {
}

public void testCompressValidation() {
try (var src = nativeAccess.newBuffer(1000); var dst = nativeAccess.newBuffer(500)) {
try (var src = nativeAccess.newConfinedBuffer(1000); var dst = nativeAccess.newConfinedBuffer(500)) {
var srcBuf = src.buffer();
var dstBuf = dst.buffer();

Expand All @@ -58,9 +58,9 @@ public void testCompressValidation() {

public void testDecompressValidation() {
try (
var original = nativeAccess.newBuffer(1000);
var compressed = nativeAccess.newBuffer(500);
var restored = nativeAccess.newBuffer(500)
var original = nativeAccess.newConfinedBuffer(1000);
var compressed = nativeAccess.newConfinedBuffer(500);
var restored = nativeAccess.newConfinedBuffer(500)
) {
var originalBuf = original.buffer();
var compressedBuf = compressed.buffer();
Expand Down Expand Up @@ -105,9 +105,9 @@ public void testCycle() {

private void doTestRoundtrip(byte[] data) {
try (
var original = nativeAccess.newBuffer(data.length);
var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length));
var restored = nativeAccess.newBuffer(data.length)
var original = nativeAccess.newConfinedBuffer(data.length);
var compressed = nativeAccess.newConfinedBuffer(zstd.compressBound(data.length));
var restored = nativeAccess.newConfinedBuffer(data.length)
) {
original.buffer().put(0, data);
int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9));
Expand All @@ -121,9 +121,9 @@ private void doTestRoundtrip(byte[] data) {
final int compressedOffset = randomIntBetween(1, 1000);
final int decompressedOffset = randomIntBetween(1, 1000);
try (
var original = nativeAccess.newBuffer(decompressedOffset + data.length);
var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length));
var restored = nativeAccess.newBuffer(decompressedOffset + data.length)
var original = nativeAccess.newConfinedBuffer(decompressedOffset + data.length);
var compressed = nativeAccess.newConfinedBuffer(compressedOffset + zstd.compressBound(data.length));
var restored = nativeAccess.newConfinedBuffer(decompressedOffset + data.length)
) {
original.buffer().put(decompressedOffset, data);
original.buffer().position(decompressedOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ public void decompress(DataInput in, int originalLength, int offset, int length,
final int compressedLength = in.readVInt();

try (
CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength);
CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength)
CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength);
CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength)
) {

while (src.buffer().position() < compressedLength) {
Expand Down Expand Up @@ -193,8 +193,8 @@ public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws I
// identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into
// reusing compression contexts, which are not small and would increase permanent memory usage as well.
try (
CloseableByteBuffer src = nativeAccess.newBuffer(srcLen);
CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound)
CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen);
CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound)
) {

while (buffersInput.position() < buffersInput.length()) {
Expand Down