diff --git a/build.gradle b/build.gradle index a6685bbc..e53e0ee1 100644 --- a/build.gradle +++ b/build.gradle @@ -25,7 +25,7 @@ dependencies { testCompile 'junit:junit:4.11' testCompile 'ch.qos.logback:logback-classic:1.1.2' compile 'org.slf4j:slf4j-api:1.7.8' - compile 'org.msgpack:jackson-dataformat-msgpack:0.7.0-M6' + compile 'org.msgpack:jackson-dataformat-msgpack:0.7.0' compile 'org.komamitsu:phi-accural-failure-detector:0.0.4' } diff --git a/src/main/java/org/komamitsu/fluency/buffer/PackedForwardBuffer.java b/src/main/java/org/komamitsu/fluency/buffer/PackedForwardBuffer.java index a8a148be..65e56dd5 100644 --- a/src/main/java/org/komamitsu/fluency/buffer/PackedForwardBuffer.java +++ b/src/main/java/org/komamitsu/fluency/buffer/PackedForwardBuffer.java @@ -6,9 +6,12 @@ import org.msgpack.core.MessagePacker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; @@ -24,6 +27,23 @@ public class PackedForwardBuffer private final Map retentionBuffers = new HashMap(); private final LinkedBlockingQueue flushableBuffers = new LinkedBlockingQueue(); private final BufferPool bufferPool; + private static final Unsafe unsafe; + static { + Unsafe unsafeInstance = null; + Field f = null; + try { + f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafeInstance = (Unsafe) f.get(null); + } + catch (NoSuchFieldException e) { + LOG.warn("Failed to get 'theUnsafe'", e); + } + catch (IllegalAccessException e) { + LOG.warn("Failed to get 'theUnsafe'", e); + } + unsafe = unsafeInstance; + } private PackedForwardBuffer(PackedForwardBuffer.Config bufferConfig) { @@ -58,7 +78,7 @@ private RetentionBuffer prepareBuffer(String tag, int writeSize) RetentionBuffer newBuffer = new RetentionBuffer(acquiredBuffer); if (retentionBuffer != null) { retentionBuffer.getByteBuffer().flip(); - newBuffer.getByteBuffer().put(retentionBuffer.getByteBuffer()); + copyBuffer(retentionBuffer.getByteBuffer(), newBuffer.getByteBuffer()); bufferPool.returnBuffer(retentionBuffer.getByteBuffer()); } LOG.trace("prepareBuffer(): allocate a new buffer. tag={}, buffer={}", tag, newBuffer); @@ -67,6 +87,21 @@ private RetentionBuffer prepareBuffer(String tag, int writeSize) return newBuffer; } + private void copyBuffer(ByteBuffer src, ByteBuffer dst) + { + if (unsafe == null) { + dst.put(src); + } + else { + long srcAddress = ((DirectBuffer) src).address() + src.position(); + long dstAddress = ((DirectBuffer) dst).address() + dst.position(); + int len = Math.min(src.remaining(), dst.remaining()); + unsafe.copyMemory(srcAddress, dstAddress, len); + src.position(src.position() + len); + dst.position(dst.position() + len); + } + } + @Override public void append(String tag, long timestamp, Map data) throws IOException