Skip to content

Commit 75388d9

Browse files
Default to use netty bytebuf in network layer and remove getAndRelease method (#1375)
This pr includes two changes: 1. Default to use netty bytebuf in network layer, which will remove lots of code dealing with java bytebuffer 2. Remove getAndRelease and replace it with content method and implements ByteBufHolder interface.
1 parent ab69ef2 commit 75388d9

File tree

51 files changed

+461
-705
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+461
-705
lines changed

ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ public class NetworkConfig {
3232
public static final String SELECTOR_EXECUTOR_POOL_SIZE = "selector.executor.pool.size";
3333
public static final String SELECTOR_MAX_KEY_TO_PROCESS = "selector.max.key.to.process";
3434
public static final String SELECTOR_USE_DIRECT_BUFFERS = "selector.use.direct.buffers";
35-
public static final String NETWORK_USE_NETTY_BYTE_BUF = "network.use.netty.byte.buf";
36-
public static final String NETWORK_PUT_REQUEST_SHARE_MEMORY = "network.put.request.share.memory";
3735

3836
/**
3937
* The number of io threads that the server uses for carrying out network requests
@@ -121,14 +119,6 @@ public class NetworkConfig {
121119
@Default("false")
122120
public final boolean selectorUseDirectBuffers;
123121

124-
@Config(NETWORK_USE_NETTY_BYTE_BUF)
125-
@Default("false")
126-
public final boolean networkUseNettyByteBuf;
127-
128-
@Config(NETWORK_PUT_REQUEST_SHARE_MEMORY)
129-
@Default("false")
130-
public final boolean networkPutRequestShareMemory;
131-
132122
public NetworkConfig(VerifiableProperties verifiableProperties) {
133123
numIoThreads = verifiableProperties.getIntInRange(NUM_IO_THREADS, 8, 1, Integer.MAX_VALUE);
134124
queuedMaxRequests = verifiableProperties.getIntInRange(QUEUED_MAX_REQUESTS, 500, 1, Integer.MAX_VALUE);
@@ -147,7 +137,5 @@ public NetworkConfig(VerifiableProperties verifiableProperties) {
147137
selectorMaxKeyToProcess =
148138
verifiableProperties.getIntInRange(SELECTOR_MAX_KEY_TO_PROCESS, -1, -1, Integer.MAX_VALUE);
149139
selectorUseDirectBuffers = verifiableProperties.getBoolean(SELECTOR_USE_DIRECT_BUFFERS, false);
150-
networkUseNettyByteBuf = verifiableProperties.getBoolean(NETWORK_USE_NETTY_BYTE_BUF, false);
151-
networkPutRequestShareMemory = verifiableProperties.getBoolean(NETWORK_PUT_REQUEST_SHARE_MEMORY, false);
152140
}
153141
}

ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public class RouterConfig {
9494
"router.operation.tracker.histogram.cache.timeout.ms";
9595
public static final String ROUTER_MAX_IN_MEM_PUT_CHUNKS = "router.max.in.mem.put.chunks";
9696
public static final String ROUTER_MAX_IN_MEM_GET_CHUNKS = "router.max.in.mem.get.chunks";
97-
public static final String ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY = "router.get.blob.operation.share.memory";
9897
public static final String ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED =
9998
"router.get.eligible.replicas.by.state.enabled";
10099
public static final String ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET = "router.put.use.dynamic.success.target";
@@ -440,13 +439,6 @@ public class RouterConfig {
440439
@Default("4")
441440
public final int routerMaxInMemGetChunks;
442441

443-
/**
444-
* If {@code true}, the blob data shares memory with networking buffer in GetBlobOperation
445-
*/
446-
@Config(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY)
447-
@Default("false")
448-
public final boolean routerGetBlobOperationShareMemory;
449-
450442
/**
451443
* if {@code true}, operation tracker will get replicas in required states based on the type of operation. This helps
452444
* dynamically manage replicas in cluster (i.e. add/remove/move replicas) without restarting frontends.
@@ -558,7 +550,6 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
558550
Integer.MAX_VALUE / routerMaxPutChunkSizeBytes);
559551
routerMaxInMemGetChunks = verifiableProperties.getIntInRange(ROUTER_MAX_IN_MEM_GET_CHUNKS, 4, 1,
560552
Integer.MAX_VALUE / routerMaxPutChunkSizeBytes);
561-
routerGetBlobOperationShareMemory = verifiableProperties.getBoolean(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY, false);
562553
routerGetEligibleReplicasByStateEnabled =
563554
verifiableProperties.getBoolean(ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED, false);
564555
routerPutUseDynamicSuccessTarget = verifiableProperties.getBoolean(ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET, false);

ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java

Lines changed: 0 additions & 95 deletions
This file was deleted.

ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,38 @@
1313
*/
1414
package com.github.ambry.network;
1515

16+
import com.github.ambry.utils.AbstractByteBufHolder;
1617
import io.netty.buffer.ByteBuf;
1718
import io.netty.buffer.ByteBufAllocator;
1819
import java.io.EOFException;
1920
import java.io.IOException;
20-
import java.nio.ByteBuffer;
2121
import java.nio.channels.ReadableByteChannel;
22-
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.Objects;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

2626

2727
/**
2828
* A netty {@link ByteBuf} version of Receive to buffer the incoming request or response.
2929
*/
30-
public class BoundedNettyByteBufReceive implements BoundedReceive<ByteBuf> {
30+
public class BoundedNettyByteBufReceive extends AbstractByteBufHolder<BoundedNettyByteBufReceive> {
3131

3232
private ByteBuf buffer = null;
3333
private ByteBuf sizeBuffer = null;
3434
private long sizeToRead = 0;
3535
private long sizeRead = 0;
3636
private final static Logger logger = LoggerFactory.getLogger(BoundedNettyByteBufReceive.class);
3737

38-
@Override
38+
public BoundedNettyByteBufReceive() {
39+
}
40+
41+
BoundedNettyByteBufReceive(ByteBuf buffer, long sizeToRead) {
42+
this.buffer = Objects.requireNonNull(buffer);
43+
this.sizeToRead = sizeToRead;
44+
}
45+
3946
public boolean isReadComplete() {
40-
return buffer != null && sizeRead >= sizeToRead;
47+
return buffer != null && sizeRead >= sizeToRead;
4148
}
4249

4350
/**
@@ -56,7 +63,6 @@ private int readBytesFromReadableByteChannel(ReadableByteChannel channel, ByteBu
5663
return n;
5764
}
5865

59-
@Override
6066
public long readFrom(ReadableByteChannel channel) throws IOException {
6167
long bytesRead = 0;
6268
if (buffer == null) {
@@ -99,32 +105,22 @@ public long readFrom(ReadableByteChannel channel) throws IOException {
99105
return bytesRead;
100106
}
101107

102-
/**
103-
* Returns the payload as {@link ByteBuf}, at the same time release the current reference to this payload.
104-
* It's not safe to call this function multiple times.
105-
* @return
106-
*/
107-
@Override
108-
public ByteBuf getAndRelease() {
109-
if (buffer == null) {
110-
return null;
111-
} else {
112-
try {
113-
return buffer.retainedDuplicate();
114-
} finally {
115-
buffer.release();
116-
buffer = null;
117-
}
118-
}
119-
}
120-
121108
/**
122109
* The total size in bytes that needs to receive from the channel
123110
* It will be initialized only after header is read.
124111
* @return the size of the data in bytes to receive after reading header, otherwise return 0
125112
*/
126-
@Override
127113
public long sizeRead() {
128114
return sizeRead;
129115
}
116+
117+
@Override
118+
public ByteBuf content() {
119+
return buffer;
120+
}
121+
122+
@Override
123+
public BoundedNettyByteBufReceive replace(ByteBuf content) {
124+
return new BoundedNettyByteBufReceive(content, sizeToRead);
125+
}
130126
}

ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ public class NetworkReceive {
2727
/**
2828
* The bytes received from the destination
2929
*/
30-
private final BoundedReceive receivedBytes;
30+
private final BoundedNettyByteBufReceive receivedBytes;
3131

3232
/**
3333
* The start time of when the receive started
3434
*/
3535
private final long receiveStartTimeInMs;
3636

37-
public NetworkReceive(String connectionId, BoundedReceive receivedBytes, Time time) {
37+
public NetworkReceive(String connectionId, BoundedNettyByteBufReceive receivedBytes, Time time) {
3838
this.connectionId = connectionId;
3939
this.receivedBytes = receivedBytes;
4040
this.receiveStartTimeInMs = time.milliseconds();
@@ -44,7 +44,7 @@ public String getConnectionId() {
4444
return connectionId;
4545
}
4646

47-
public BoundedReceive getReceivedBytes() {
47+
public BoundedNettyByteBufReceive getReceivedBytes() {
4848
return receivedBytes;
4949
}
5050

ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.github.ambry.network;
1515

16+
import io.netty.buffer.ByteBuf;
1617
import java.io.InputStream;
1718

1819

@@ -33,7 +34,11 @@ public interface NetworkRequest {
3334
long getStartTimeInMs();
3435

3536
/**
36-
* Release any resource this request is holding.
37+
* Release any resource this request is holding. By default it returns false so this method can be compatible
38+
* with {@link ByteBuf#release()}
39+
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
3740
*/
38-
default void release() {};
41+
default boolean release() {
42+
return false;
43+
}
3944
}

0 commit comments

Comments
 (0)