Skip to content

Commit c0d7586

Browse files
[Internal]Adding API to force specific RNTBD header ordering as preparation for ThinClient (Azure#44233)
* Adding RNTBD preparations for ThinClient * Adding unit tests * Adding Copyright headers * Addressed code review feedback
1 parent e69e7cc commit c0d7586

File tree

16 files changed

+338
-50
lines changed

16 files changed

+338
-50
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
4+
5+
import com.azure.cosmos.implementation.HttpConstants;
6+
import com.azure.cosmos.implementation.OperationType;
7+
import com.azure.cosmos.implementation.ResourceType;
8+
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
9+
import com.azure.cosmos.implementation.directconnectivity.Uri;
10+
import io.netty.buffer.ByteBuf;
11+
import io.netty.buffer.Unpooled;
12+
import org.testng.annotations.Test;
13+
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import java.util.Random;
17+
import java.util.UUID;
18+
19+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
20+
21+
public class RntbdTokenStreamTests {
22+
private static final Random rnd = new Random();
23+
@Test(groups = { "unit" })
24+
public void noReorderingInDirectMode() {
25+
List<Short> headerIds = createAndEncodeRequestHeaders(false);
26+
assertThat(headerIds).isNotNull();
27+
assertThat(headerIds.size()).isEqualTo(6);
28+
assertThat(headerIds.get(0)).isEqualTo(RntbdConstants.RntbdRequestHeader.PayloadPresent.id());
29+
assertThat(headerIds.get(1)).isEqualTo(RntbdConstants.RntbdRequestHeader.ReplicaPath.id());
30+
assertThat(headerIds.get(2)).isEqualTo(RntbdConstants.RntbdRequestHeader.TransportRequestID.id());
31+
assertThat(headerIds.get(3)).isEqualTo(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey.id());
32+
assertThat(headerIds.get(4)).isEqualTo(RntbdConstants.RntbdRequestHeader.CorrelatedActivityId.id());
33+
assertThat(headerIds.get(5)).isEqualTo(RntbdConstants.RntbdRequestHeader.GlobalDatabaseAccountName.id());
34+
}
35+
36+
@Test(groups = { "unit" })
37+
public void withReorderingForThinClient() {
38+
List<Short> headerIds = createAndEncodeRequestHeaders(true);
39+
assertThat(headerIds).isNotNull();
40+
assertThat(headerIds.size()).isEqualTo(4);
41+
assertThat(headerIds.get(0)).isEqualTo(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey.id());
42+
assertThat(headerIds.get(1)).isEqualTo(RntbdConstants.RntbdRequestHeader.GlobalDatabaseAccountName.id());
43+
assertThat(headerIds.get(2)).isEqualTo(RntbdConstants.RntbdRequestHeader.PayloadPresent.id());
44+
assertThat(headerIds.get(3)).isEqualTo(RntbdConstants.RntbdRequestHeader.CorrelatedActivityId.id());
45+
}
46+
47+
private static List<Short> createAndEncodeRequestHeaders(boolean forThinClient) {
48+
UUID activityId = UUID.randomUUID();
49+
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
50+
null,
51+
OperationType.Create,
52+
ResourceType.Document);
53+
54+
String correlatedActivityId = UUID.randomUUID().toString();
55+
request.getHeaders().put(
56+
HttpConstants.HttpHeaders.CORRELATED_ACTIVITY_ID,
57+
correlatedActivityId
58+
);
59+
60+
request.getHeaders().put(
61+
HttpConstants.HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME,
62+
"SomeAccount"
63+
);
64+
65+
RntbdRequestFrame frame = new RntbdRequestFrame(
66+
activityId,
67+
RntbdConstants.RntbdOperationType.Create,
68+
RntbdConstants.RntbdResourceType.Document);
69+
70+
RntbdRequestArgs args = new RntbdRequestArgs(request, new Uri("prefix://someUri"));
71+
72+
byte[] hashValue = new byte[16];
73+
rnd.nextBytes(hashValue);
74+
RntbdTokenStream<RntbdConstants.RntbdRequestHeader> input = new RntbdRequestHeaders(args, frame);
75+
input
76+
.get(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey)
77+
.setValue(hashValue);
78+
79+
ByteBuf out = Unpooled.buffer(1024);
80+
input.encode(out, forThinClient);
81+
82+
out.readerIndex(0);
83+
84+
return getHeaderIdsInOrder(out);
85+
}
86+
87+
private static List<Short> getHeaderIdsInOrder(ByteBuf in) {
88+
List<Short> headerIds = new ArrayList<>();
89+
while (in.readableBytes() > 0) {
90+
91+
final short id = in.readShortLE();
92+
headerIds.add(id);
93+
final RntbdTokenType type = RntbdTokenType.fromId(in.readByte());
94+
95+
RntbdToken token = RntbdToken.create(RntbdConstants.RntbdRequestHeader.map.get(id));
96+
token.decode(in);
97+
}
98+
99+
return headerIds;
100+
}
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
4+
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.buffer.Unpooled;
7+
import org.testng.annotations.Test;
8+
9+
import java.util.Base64;
10+
import java.util.Random;
11+
12+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
13+
14+
public class RntbdTokenTests {
15+
private static final Random rnd = new Random();
16+
@Test(groups = { "unit" })
17+
public void getValueIsIdempotent() {
18+
RntbdToken token = RntbdToken.create(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey);
19+
byte[] blob = new byte[10];
20+
rnd.nextBytes(blob);
21+
token.setValue(blob);
22+
23+
String expectedJson = "{\"id\":90,\"name\":\"EffectivePartitionKey\",\"present\":true,"
24+
+ "\"required\":false,\"value\":\""
25+
+ Base64.getEncoder().encodeToString(blob)
26+
+ "\",\"tokenType\":\"Bytes\"}";
27+
assertThat(RntbdObjectMapper.toJson(token)).isEqualTo(expectedJson);
28+
29+
Object value1 = token.getValue();
30+
Object value2 = token.getValue();
31+
assertThat(value1).isSameAs(value2);
32+
assertThat(value1).isSameAs(blob);
33+
34+
assertThat(RntbdObjectMapper.toJson(token)).isEqualTo(expectedJson);
35+
36+
ByteBuf buffer = Unpooled.buffer(1024);
37+
token.encode(buffer);
38+
39+
RntbdToken decodedToken = RntbdToken.create(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey);
40+
// skipping 3 bytes (2 bytes for header id + 1 byte for token type)
41+
buffer.readerIndex(3);
42+
// when decoding the RntbdToken.value is a ByteBuffer - not a byte[] - testing this path for idempotency as well
43+
decodedToken.decode(buffer);
44+
assertThat(RntbdObjectMapper.toJson(decodedToken)).isEqualTo(expectedJson);
45+
46+
value1 = decodedToken.getValue();
47+
assertThat(value1).isInstanceOf(ByteBuf.class);
48+
ByteBuf byteBufValue1 = (ByteBuf)value1;
49+
assertThat(byteBufValue1.readableBytes()).isEqualTo(10);
50+
byte[] byteArray1 = new byte[10];
51+
byteBufValue1.getBytes(byteBufValue1.readerIndex(), byteArray1);
52+
assertThat(byteArray1).isEqualTo(blob);
53+
54+
value2 = decodedToken.getValue();
55+
assertThat(value1).isSameAs(value2);
56+
ByteBuf byteBufValue2 = (ByteBuf)value2;
57+
assertThat(byteBufValue2.readableBytes()).isEqualTo(10);
58+
byte[] byteArray2 = new byte[10];
59+
byteBufValue2.getBytes(byteBufValue2.readerIndex(), byteArray2);
60+
assertThat(byteArray2).isEqualTo(blob);
61+
62+
assertThat(RntbdObjectMapper.toJson(decodedToken)).isEqualTo(expectedJson);
63+
}
64+
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ public static class HttpHeaders {
286286
// Thinclient headers
287287
public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type";
288288
public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type";
289+
290+
public static final String GLOBAL_DATABASE_ACCOUNT_NAME = "GlobalDatabaseAccountName";
289291
}
290292

291293
public static class A_IMHeaderValues {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque
9898
// todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format)
9999
// todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy
100100
// todo: need to conditionally add some headers (userAgent, replicaId/endpoint, etc)
101-
rntbdRequest.encode(byteBuf);
101+
rntbdRequest.encode(byteBuf, true);
102102

103103
return new HttpRequest(
104104
HttpMethod.POST,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
import com.azure.cosmos.implementation.ResourceType;
88
import io.netty.handler.codec.DecoderException;
99

10+
import java.util.Arrays;
1011
import java.util.EnumSet;
12+
import java.util.List;
1113
import java.util.Map;
14+
import java.util.Set;
1215
import java.util.stream.Collector;
1316
import java.util.stream.Collectors;
17+
import java.util.stream.Stream;
1418

1519
import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;
1620

@@ -495,7 +499,7 @@ public enum RntbdRequestHeader implements RntbdHeader {
495499
ConsistencyLevel((short) 0x0010, RntbdTokenType.Byte, false),
496500
EntityId((short) 0x0011, RntbdTokenType.String, false),
497501
ResourceSchemaName((short) 0x0012, RntbdTokenType.SmallString, false),
498-
ReplicaPath((short) 0x0013, RntbdTokenType.String, true),
502+
ReplicaPath((short) 0x0013, RntbdTokenType.String, false), // true in direct, but not for thin client
499503
ResourceTokenExpiry((short) 0x0014, RntbdTokenType.ULong, false),
500504
DatabaseName((short) 0x0015, RntbdTokenType.String, false),
501505
CollectionName((short) 0x0016, RntbdTokenType.String, false),
@@ -591,13 +595,46 @@ public enum RntbdRequestHeader implements RntbdHeader {
591595
CorrelatedActivityId((short) 0x00B0, RntbdTokenType.Guid, false),
592596
SDKSupportedCapabilities((short) 0x00A2, RntbdTokenType.ULong, false),
593597
ChangeFeedWireFormatVersion((short) 0x00B2, RntbdTokenType.String, false),
594-
PriorityLevel((short) 0x00BF, RntbdTokenType.Byte, false);
598+
PriorityLevel((short) 0x00BF, RntbdTokenType.Byte, false),
599+
GlobalDatabaseAccountName((short) 0x00CE, RntbdTokenType.String, false);
600+
601+
public static final List<RntbdRequestHeader> thinClientHeadersInOrderList = Arrays.asList(
602+
EffectivePartitionKey,
603+
GlobalDatabaseAccountName,
604+
DatabaseName,
605+
CollectionName,
606+
CollectionRid,
607+
// ResourceId,
608+
PayloadPresent,
609+
DocumentName,
610+
AuthorizationToken,
611+
Date);
612+
613+
private static final List<RntbdRequestHeader> thinClientExclusionList = Arrays.asList(
614+
RntbdConstants.RntbdRequestHeader.ResourceId,
615+
RntbdConstants.RntbdRequestHeader.TransportRequestID,
616+
RntbdRequestHeader.IntendedCollectionRid,
617+
RntbdConstants.RntbdRequestHeader.ReplicaPath);
618+
619+
public static final Set<Short> thinClientProxyExcludedSet;
620+
public static final Set<Short> thinClientProxyOrderedOrExcludedSet;
595621

596622
public static final Map<Short, RntbdRequestHeader> map;
597623
public static final EnumSet<RntbdRequestHeader> set = EnumSet.allOf(RntbdRequestHeader.class);
598624

599625
static {
600626
final Collector<RntbdRequestHeader, ?, Map<Short, RntbdRequestHeader>> collector = Collectors.toMap(RntbdRequestHeader::id, h -> h);
627+
thinClientProxyOrderedOrExcludedSet =
628+
Stream.concat(
629+
thinClientExclusionList.stream(),
630+
thinClientHeadersInOrderList.stream()
631+
)
632+
.map(RntbdRequestHeader::id)
633+
.collect(Collectors.toSet());
634+
thinClientProxyExcludedSet =
635+
thinClientExclusionList.stream()
636+
.map(RntbdRequestHeader::id)
637+
.collect(Collectors.toSet());
601638
map = set.stream().collect(collector);
602639
}
603640

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,13 @@ public static RntbdContext decode(final ByteBuf in) {
136136
public void encode(final ByteBuf out) {
137137

138138
final Headers headers = new Headers(this);
139-
final int length = RntbdResponseStatus.LENGTH + headers.computeLength();
139+
final int length = RntbdResponseStatus.LENGTH + headers.computeLength(false);
140140
final RntbdResponseStatus responseStatus = new RntbdResponseStatus(length, this.status(), this.activityId());
141141

142142
final int start = out.writerIndex();
143143

144144
responseStatus.encode(out);
145-
headers.encode(out);
145+
headers.encode(out, false);
146146
headers.release();
147147

148148
final int end = out.writerIndex();
@@ -166,7 +166,7 @@ public static RntbdContext from(final RntbdContextRequest request, final ServerP
166166
headers.serverVersion.setValue(properties.getVersion());
167167
headers.unauthenticatedTimeoutInSeconds.setValue(0);
168168

169-
final int length = RntbdResponseStatus.LENGTH + headers.computeLength();
169+
final int length = RntbdResponseStatus.LENGTH + headers.computeLength(false);
170170
final UUID activityId = request.getActivityId();
171171

172172
final RntbdResponseStatus responseStatus = new RntbdResponseStatus(length, status, activityId);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdContextRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ public static RntbdContextRequest decode(final ByteBuf in) {
7474

7575
public void encode(final ByteBuf out) {
7676

77-
final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength();
77+
final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength(false);
7878
final int start = out.writerIndex();
7979

8080
out.writeIntLE(expectedLength);
8181

8282
final RntbdRequestFrame header = new RntbdRequestFrame(this.getActivityId(), RntbdOperationType.Connection, RntbdResourceType.Connection);
8383
header.encode(out);
84-
this.headers.encode(out);
84+
this.headers.encode(out, false);
8585

8686
final int observedLength = out.writerIndex() - start;
8787

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdObjectMapper.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.InputStream;
2626
import java.time.Duration;
2727
import java.time.Instant;
28+
import java.util.Base64;
2829
import java.util.concurrent.ConcurrentHashMap;
2930

3031
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
@@ -69,6 +70,23 @@ public static <T> T readValue(String string, Class<T> type) throws IOException {
6970

7071
public static String toJson(final Object value) {
7172
try {
73+
if (value instanceof RntbdToken) {
74+
RntbdToken token = (RntbdToken)value;
75+
if (token.isPresent() && token.getTokenType() == RntbdTokenType.Bytes) {
76+
Object tokenValue = token.getValue();
77+
if (tokenValue instanceof ByteBuf) {
78+
ByteBuf buf = (ByteBuf) tokenValue;
79+
80+
byte[] blob = new byte[buf.readableBytes()];
81+
buf.getBytes(buf.readerIndex(), blob);
82+
String base64String = Base64.getEncoder().encodeToString(blob);
83+
String json = objectWriter.writeValueAsString(value);
84+
ObjectNode parsed = (ObjectNode)objectMapper.readTree(json);
85+
parsed.put("value", base64String);
86+
return parsed.toString();
87+
}
88+
}
89+
}
7290
return objectWriter.writeValueAsString(value);
7391
} catch (final JsonProcessingException error) {
7492
logger.debug("could not convert {} value to JSON due to:", value.getClass(), error);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkState;
1616

1717
public final class RntbdRequest {
18-
1918
private static final byte[] EMPTY_BYTE_ARRAY = {};
2019

2120
private final RntbdRequestFrame frame;
@@ -42,6 +41,19 @@ public <T> T getHeader(final RntbdRequestHeader header) {
4241
return (T) this.headers.get(header).getValue();
4342
}
4443

44+
@JsonIgnore
45+
@SuppressWarnings("unchecked")
46+
// Returns true if set successfully - false if the header does not exist
47+
public boolean setHeaderValue(final RntbdRequestHeader header, Object value) {
48+
RntbdToken token = this.headers.get(header);
49+
if (token == null) {
50+
return false;
51+
}
52+
53+
token.setValue(value);
54+
return true;
55+
}
56+
4557
public Long getTransportRequestId() {
4658
return this.getHeader(RntbdRequestHeader.TransportRequestID);
4759
}
@@ -76,16 +88,18 @@ public static RntbdRequest decode(final ByteBuf in) {
7688
return new RntbdRequest(header, metadata, payload);
7789
}
7890

79-
public void encode(final ByteBuf out) {
91+
public void encode(final ByteBuf out, boolean forThinClient) {
8092

81-
final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength();
82-
final int start = out.writerIndex();
93+
// If payload exists it is encoded as prefix length (32-bit) + the raw payload
94+
final int effectivePayloadSize = this.payload != null && this.payload.length > 0 ? this.payload.length + 4 : 0;
95+
final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength(forThinClient);
8396

97+
final int start = out.writerIndex();
8498
out.writeIntLE(expectedLength);
8599
this.frame.encode(out);
86-
this.headers.encode(out);
100+
this.headers.encode(out, forThinClient);
87101

88-
final int observedLength = out.writerIndex() - start;
102+
int observedLength = out.writerIndex() - start;
89103

90104
checkState(observedLength == expectedLength,
91105
"encoding error: {\"expectedLength\": %s, \"observedLength\": %s}",
@@ -95,6 +109,13 @@ public void encode(final ByteBuf out) {
95109
if (this.payload.length > 0) {
96110
out.writeIntLE(this.payload.length);
97111
out.writeBytes(this.payload);
112+
113+
observedLength = out.writerIndex() - start;
114+
115+
checkState(observedLength == expectedLength + effectivePayloadSize,
116+
"payload encoding error: {\"expectedLength\": %s, \"observedLength\": %s}",
117+
expectedLength + effectivePayloadSize,
118+
observedLength);
98119
}
99120
}
100121

0 commit comments

Comments
 (0)