Skip to content

Commit c31b3ed

Browse files
committed
WIP on updated cluster RC
1 parent adfedde commit c31b3ed

File tree

7 files changed

+93
-27
lines changed

7 files changed

+93
-27
lines changed

services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class ServiceEndpoint implements Externalizable {
3636
public ServiceEndpoint() {}
3737

3838
private ServiceEndpoint(Builder builder) {
39-
this.id = Objects.requireNonNull(builder.id);
39+
this.id = Objects.requireNonNull(builder.id, "ServiceEndpoint.id is required");
4040
this.address = builder.address;
4141
this.contentTypes = Collections.unmodifiableSet(new HashSet<>(builder.contentTypes));
4242
this.tags = Collections.unmodifiableMap(new HashMap<>(builder.tags));
@@ -96,7 +96,11 @@ public void writeExternal(ObjectOutput out) throws IOException {
9696
out.writeUTF(id);
9797

9898
// address
99-
out.writeUTF(address.toString());
99+
boolean addressExists = address != null;
100+
out.writeBoolean(addressExists);
101+
if (addressExists) {
102+
out.writeUTF(address.toString());
103+
}
100104

101105
// contentTypes
102106
out.writeInt(contentTypes.size());
@@ -124,12 +128,15 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
124128
id = in.readUTF();
125129

126130
// address
127-
address = Address.from(in.readUTF());
131+
boolean addressExists = in.readBoolean();
132+
if (addressExists) {
133+
address = Address.from(in.readUTF());
134+
}
128135

129136
// contentTypes
130-
int capacitySize = in.readInt();
131-
Set<String> contentTypes = new HashSet<>(capacitySize);
132-
for (int i = 0; i < capacitySize; i++) {
137+
int contentTypesSize = in.readInt();
138+
Set<String> contentTypes = new HashSet<>(contentTypesSize);
139+
for (int i = 0; i < contentTypesSize; i++) {
133140
contentTypes.add(in.readUTF());
134141
}
135142
this.contentTypes = Collections.unmodifiableSet(contentTypes);

services-api/src/main/java/io/scalecube/services/ServiceMethodDefinition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public ServiceMethodDefinition(String action) {
4747
* @param auth is method protected by authentication
4848
*/
4949
public ServiceMethodDefinition(String action, Map<String, String> tags, boolean auth) {
50-
this.action = Objects.requireNonNull(action);
50+
this.action = Objects.requireNonNull(action, "ServiceMethodDefinition.action is required");
5151
this.tags = Collections.unmodifiableMap(new HashMap<>(tags));
5252
this.auth = auth;
5353
}
@@ -90,7 +90,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
9090
}
9191

9292
@Override
93-
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
93+
public void readExternal(ObjectInput in) throws IOException {
9494
// namespace
9595
action = in.readUTF();
9696

services-api/src/main/java/io/scalecube/services/ServiceRegistration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public ServiceRegistration() {}
3838
*/
3939
public ServiceRegistration(
4040
String namespace, Map<String, String> tags, Collection<ServiceMethodDefinition> methods) {
41-
this.namespace = Objects.requireNonNull(namespace);
41+
this.namespace = Objects.requireNonNull(namespace, "ServiceRegistration.namespace is required");
4242
this.tags = Collections.unmodifiableMap(new HashMap<>(tags));
4343
this.methods = Collections.unmodifiableList(new ArrayList<>(methods));
4444
}

services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.services.exceptions;
22

3+
import java.util.StringJoiner;
4+
35
public abstract class ServiceException extends RuntimeException {
46

57
private final int errorCode;
@@ -25,11 +27,9 @@ public int errorCode() {
2527

2628
@Override
2729
public String toString() {
28-
return getClass().getSimpleName()
29-
+ "{errorCode="
30-
+ errorCode
31-
+ ", errorMessage="
32-
+ getMessage()
33-
+ '}';
30+
return new StringJoiner(", ", getClass().getSimpleName() + "[", "]")
31+
.add("errorCode=" + errorCode)
32+
.add("errorMessage=" + getMessage())
33+
.toString();
3434
}
3535
}

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import javax.management.StandardMBean;
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
24+
import reactor.core.Exceptions;
2425
import reactor.core.publisher.DirectProcessor;
2526
import reactor.core.publisher.Flux;
2627
import reactor.core.publisher.FluxSink;
@@ -150,30 +151,29 @@ private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membership
150151
ServiceDiscoveryEvent discoveryEvent = null;
151152

152153
if (membershipEvent.isAdded() && membershipEvent.newMetadata() != null) {
153-
ServiceEndpoint serviceEndpoint = decodeMetadata(membershipEvent.newMetadata());
154-
discoveryEvent = ServiceDiscoveryEvent.newEndpointAdded(serviceEndpoint);
154+
discoveryEvent =
155+
ServiceDiscoveryEvent.newEndpointAdded(decodeMetadata(membershipEvent.newMetadata()));
155156
}
156157

157158
if (membershipEvent.isRemoved() && membershipEvent.oldMetadata() != null) {
158-
ServiceEndpoint serviceEndpoint = decodeMetadata(membershipEvent.oldMetadata());
159-
discoveryEvent = ServiceDiscoveryEvent.newEndpointRemoved(serviceEndpoint);
159+
discoveryEvent =
160+
ServiceDiscoveryEvent.newEndpointRemoved(decodeMetadata(membershipEvent.oldMetadata()));
160161
}
161162

162163
if (membershipEvent.isLeaving() && membershipEvent.newMetadata() != null) {
163-
ServiceEndpoint serviceEndpoint = decodeMetadata(membershipEvent.newMetadata());
164-
discoveryEvent = ServiceDiscoveryEvent.newEndpointLeaving(serviceEndpoint);
164+
discoveryEvent =
165+
ServiceDiscoveryEvent.newEndpointLeaving(decodeMetadata(membershipEvent.newMetadata()));
165166
}
166167

167168
return discoveryEvent;
168169
}
169170

170171
private ServiceEndpoint decodeMetadata(ByteBuffer byteBuffer) {
171172
try {
172-
return (ServiceEndpoint) clusterConfig.metadataCodec().deserialize(byteBuffer);
173-
} catch (Exception ex) {
174-
// Gobble exception and don't ruin stack
175-
LOGGER.error("Failed to read metadata: " + ex);
176-
return null;
173+
return (ServiceEndpoint) clusterConfig.metadataCodec().deserialize(byteBuffer.duplicate());
174+
} catch (Exception e) {
175+
LOGGER.error("Failed to read metadata: " + e);
176+
throw Exceptions.propagate(e);
177177
}
178178
}
179179

services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,22 @@
99
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
1010
import io.scalecube.cluster.gossip.GossipConfig;
1111
import io.scalecube.cluster.membership.MembershipConfig;
12+
import io.scalecube.cluster.metadata.JdkMetadataCodec;
1213
import io.scalecube.net.Address;
1314
import io.scalecube.services.ServiceEndpoint;
15+
import io.scalecube.services.ServiceMethodDefinition;
16+
import io.scalecube.services.ServiceRegistration;
1417
import io.scalecube.services.discovery.api.ServiceDiscovery;
1518
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
19+
import java.nio.ByteBuffer;
1620
import java.time.Duration;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.UUID;
1725
import java.util.concurrent.atomic.AtomicInteger;
1826
import java.util.function.Supplier;
27+
import org.junit.jupiter.api.Assertions;
1928
import org.junit.jupiter.api.BeforeAll;
2029
import org.junit.jupiter.api.Test;
2130
import reactor.core.publisher.Flux;
@@ -32,11 +41,61 @@ class ScalecubeServiceDiscoveryTest extends BaseTest {
3241
public static final MembershipConfig MEMBERSHIP_CONFIG = MembershipConfig.defaultLocalConfig();
3342
public static final int CLUSTER_SIZE = 3 + 1; // r1 + r2 + r3 (plus 1 for be sure)
3443

44+
private JdkMetadataCodec jdkMetadataCodec = new JdkMetadataCodec();
45+
3546
@BeforeAll
3647
public static void setUp() {
3748
StepVerifier.setDefaultTimeout(TIMEOUT);
3849
}
3950

51+
@Test
52+
public void testJdkMetadataCodec() {
53+
ServiceEndpoint serviceEndpoint =
54+
ServiceEndpoint.builder()
55+
.id(UUID.randomUUID().toString())
56+
.tags(Collections.singletonMap("K", "V"))
57+
.contentTypes(Collections.singleton("json"))
58+
.appendServiceRegistrations(
59+
Collections.singletonList(
60+
new ServiceRegistration(
61+
"namespace",
62+
Collections.singletonMap("KK", "VV"),
63+
Collections.singletonList(
64+
new ServiceMethodDefinition(
65+
"action0", Collections.singletonMap("KKK0", "VVV"), true)))))
66+
.appendServiceRegistrations(
67+
Collections.singletonList(
68+
new ServiceRegistration(
69+
"namespace",
70+
Collections.singletonMap("KK", "VV"),
71+
Collections.singletonList(
72+
new ServiceMethodDefinition(
73+
"action1", Collections.singletonMap("KKK1", "VVV"), true)))))
74+
.appendServiceRegistrations(
75+
Collections.singletonList(
76+
new ServiceRegistration(
77+
"namespace",
78+
Collections.singletonMap("KK", "VV"),
79+
Collections.singletonList(
80+
new ServiceMethodDefinition(
81+
"action2", Collections.singletonMap("KKK2", "VVV"), true)))))
82+
.build();
83+
84+
ByteBuffer buffer = jdkMetadataCodec.serialize(serviceEndpoint);
85+
ServiceEndpoint serviceEndpoint1 = (ServiceEndpoint) jdkMetadataCodec.deserialize(buffer);
86+
Assertions.assertEquals(serviceEndpoint.id(), serviceEndpoint1.id());
87+
Assertions.assertEquals(1, serviceEndpoint1.tags().size());
88+
Assertions.assertEquals(1, serviceEndpoint1.contentTypes().size());
89+
90+
List<ServiceRegistration> serviceRegistrations =
91+
new ArrayList<>(serviceEndpoint1.serviceRegistrations());
92+
Assertions.assertEquals(3, serviceRegistrations.size());
93+
for (ServiceRegistration serviceRegistration : serviceRegistrations) {
94+
Assertions.assertEquals(1, serviceRegistration.methods().size());
95+
Assertions.assertEquals(1, serviceRegistration.tags().size());
96+
}
97+
}
98+
4099
@Test
41100
public void testEndpointIsAddedThenRemoved() {
42101
Address seedAddress = startSeed();

services/src/test/java/io/scalecube/services/routings/sut/WeightedRandomRouter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public Optional<ServiceReference> route(ServiceRegistry serviceRegistry, Service
1313
RandomCollection<ServiceReference> weightedRandom = new RandomCollection<>();
1414
serviceRegistry
1515
.lookupService(request)
16-
.forEach(sr -> weightedRandom.add(Double.valueOf(sr.tags().get("Weight")), sr));
16+
.forEach(sr -> weightedRandom.add(Double.parseDouble(sr.tags().get("Weight")), sr));
1717
return Optional.of(weightedRandom.next());
1818
}
1919
}

0 commit comments

Comments
 (0)