Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.alibaba.rsocket.broker;

import com.alibaba.rsocket.encoding.RSocketEncodingFacade;
import com.alibaba.rsocket.encoding.impl.RSocketEncodingFacadeImpl;
import com.vaadin.flow.component.dependency.StyleSheet;
import com.vaadin.flow.component.page.AppShellConfigurator;
import com.vaadin.flow.component.page.Push;
Expand Down Expand Up @@ -31,7 +32,7 @@ public class AlibabaRSocketBrokerServer implements AppShellConfigurator {
public static void main(String[] args) {
//checking encoder first
//noinspection ResultOfMethodCallIgnored
RSocketEncodingFacade.getInstance();
RSocketEncodingFacade.getInstance(new RSocketEncodingFacadeImpl());
SpringApplication.run(AlibabaRSocketBrokerServer.class, args);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import com.alibaba.rsocket.metadata.RSocketMimeType;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import com.alibaba.spring.boot.rsocket.broker.responder.RSocketBrokerHandlerRegistry;
import com.alibaba.spring.boot.rsocket.broker.responder.RSocketBrokerResponderHandler;
import com.alibaba.spring.boot.rsocket.broker.route.ServiceMeshInspector;
import com.alibaba.spring.boot.rsocket.broker.route.ServiceRoutingSelector;
import com.alibaba.spring.boot.rsocket.broker.security.AuthenticationService;
import com.alibaba.spring.boot.rsocket.broker.security.RSocketAppPrincipal;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.Payload;
import io.rsocket.util.DefaultPayload;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -55,31 +57,33 @@ public Mono<ResponseEntity<String>> handle(@PathVariable("serviceName") String s
@RequestHeader(name = "Authorization", required = false, defaultValue = "") String authorizationValue) {
try {
GSVRoutingMetadata routingMetadata = new GSVRoutingMetadata(group, serviceName, method, version);
Integer serviceHashCode = routingMetadata.id();
Integer targetHandlerId = routingSelector.findHandler(serviceHashCode);
if (!endpoint.isEmpty() && endpoint.startsWith("id:")) {
targetHandlerId = Integer.valueOf(endpoint.substring(3).trim());
}
return Optional.ofNullable(targetHandlerId)
.flatMap(handlerId -> Optional.ofNullable(handlerRegistry.findById(handlerId)))
.map(targetHandler -> {
if (authRequired) {
RSocketAppPrincipal principal = authAuthorizationValue(authorizationValue);
if (principal == null || !serviceMeshInspector.isRequestAllowed(principal, routingMetadata.gsv(), targetHandler.getPrincipal())) {
return Mono.just(error(RsocketErrorCode.message("RST-900401", routingMetadata.gsv())));
}
}
RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(routingMetadata, jsonMetaEncoding);
ByteBuf bodyBuf = body == null ? EMPTY_BUFFER : Unpooled.wrappedBuffer(body);
return targetHandler.requestResponse(DefaultPayload.create(bodyBuf, compositeMetadata.getContent()))
.map(payload -> {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setCacheControl(CacheControl.noCache().getHeaderValue());
return new ResponseEntity<>(payload.getDataUtf8(), headers, HttpStatus.OK);
});
})
.orElseGet(() -> Mono.just(error(RsocketErrorCode.message("RST-900404", routingMetadata.gsv()))));
RequestHandler requestHandler = new RequestHandler(routingMetadata, endpoint, authorizationValue, body);
return requestHandler.getResponseEntityMono();
// Integer serviceHashCode = routingMetadata.id();
// Integer targetHandlerId = routingSelector.findHandler(serviceHashCode);
// if (!endpoint.isEmpty() && endpoint.startsWith("id:")) {
// targetHandlerId = Integer.valueOf(endpoint.substring(3).trim());
// }
// return Optional.ofNullable(targetHandlerId)
// .flatMap(handlerId -> Optional.ofNullable(handlerRegistry.findById(handlerId)))
// .map(targetHandler -> {
// if (authRequired) {
// RSocketAppPrincipal principal = authAuthorizationValue(authorizationValue);
// if (principal == null || !serviceMeshInspector.isRequestAllowed(principal, routingMetadata.gsv(), targetHandler.getPrincipal())) {
// return Mono.just(error(RsocketErrorCode.message("RST-900401", routingMetadata.gsv())));
// }
// }
// RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(routingMetadata, jsonMetaEncoding);
// ByteBuf bodyBuf = body == null ? EMPTY_BUFFER : Unpooled.wrappedBuffer(body);
// return targetHandler.requestResponse(DefaultPayload.create(bodyBuf, compositeMetadata.getContent()))
// .map(payload -> {
// HttpHeaders headers = new HttpHeaders();
// headers.setContentType(MediaType.APPLICATION_JSON);
// headers.setCacheControl(CacheControl.noCache().getHeaderValue());
// return new ResponseEntity<>(payload.getDataUtf8(), headers, HttpStatus.OK);
// });
// })
// .orElseGet(() -> Mono.just(error(RsocketErrorCode.message("RST-900404", routingMetadata.gsv()))));
} catch (Exception e) {
return Mono.just(error(e.getMessage()));
}
Expand All @@ -104,4 +108,61 @@ public ResponseEntity<String> error(String errorText) {
return new ResponseEntity<>(errorText, headers, HttpStatus.BAD_REQUEST);
}

private class RequestHandler {
private final GSVRoutingMetadata routingMetadata;
private final String endpoint;
private final String authorizationValue;
private final byte[] body;
private Integer targetHandlerId;

RequestHandler(GSVRoutingMetadata routingMetadata, String endpoint, String authorizationValue, byte[] body) {
this.routingMetadata = routingMetadata;
this.endpoint = endpoint;
this.authorizationValue = authorizationValue;
this.body = body;
}

private Mono<ResponseEntity<String>> getResponseEntityMono() {
try {
targetHandlerId = getTargetHandlerId();
return Optional.ofNullable(targetHandlerId)
.flatMap(handlerId -> Optional.ofNullable(handlerRegistry.findById(handlerId)))
.map(this::processRequest)
.orElseGet(() -> Mono.just(error(RsocketErrorCode.message("RST-900404", routingMetadata.gsv()))));

}catch (Exception e) {
return Mono.just(error(e.getMessage()));
}
}

private Integer getTargetHandlerId() {
Integer serviceHashCode = routingMetadata.id();
if (!endpoint.isEmpty() && endpoint.startsWith("id:")) {
return Integer.valueOf(endpoint.substring(3).trim());
}
return routingSelector.findHandler(serviceHashCode);
}

private Mono<ResponseEntity<String>> processRequest(RSocketBrokerResponderHandler targetHandler) {
if (authRequired) {
RSocketAppPrincipal principal = authAuthorizationValue(authorizationValue);
if (principal == null || !serviceMeshInspector.isRequestAllowed(principal, routingMetadata.gsv(), targetHandler.getPrincipal())) {
return Mono.just(error(RsocketErrorCode.message("RST-900401", routingMetadata.gsv())));
}
}
RSocketCompositeMetadata compositeMetadata = RSocketCompositeMetadata.from(routingMetadata, jsonMetaEncoding);
ByteBuf bodyBuf = body == null ? EMPTY_BUFFER : Unpooled.wrappedBuffer(body);
return targetHandler.requestResponse(DefaultPayload.create(bodyBuf, compositeMetadata.getContent()))
.map(this::createResponseEntity);
}

private ResponseEntity<String> createResponseEntity(Payload payload) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setCacheControl(CacheControl.noCache().getHeaderValue());
return new ResponseEntity<>(payload.getDataUtf8(), headers, HttpStatus.OK);
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,31 @@ public void setPublishedServices(Set<ServiceLocator> publishedServices) {
}

public void setPorts(int webPort, int managementPort, Map<Integer, String> rsocketPorts, String ip) {
if (webPort > 0 || managementPort > 0 || (rsocketPorts != null && !rsocketPorts.isEmpty())) {
boolean showPortsInfo = webPort > 0 || managementPort > 0 || (rsocketPorts != null && !rsocketPorts.isEmpty());
if (showPortsInfo) {
this.portsInfoHeader.setVisible(true);
this.portsInfo.setVisible(true);
List<String> ports = new ArrayList<>();
if (webPort > 0) {
ports.add("Web:" + webPort);
}
if (managementPort > 0) {
ports.add("Management:" + webPort);
}
if (rsocketPorts != null && !rsocketPorts.isEmpty()) {
for (Map.Entry<Integer, String> entry : rsocketPorts.entrySet()) {
ports.add("RSocket: " + entry.getValue() + "://" + ip + ":" + entry.getKey());
}
}
List<String> ports = getPortsInfo(webPort, managementPort, rsocketPorts, ip);
this.portsInfo.setText(String.join("\r\n", ports));
}
}

private List<String> getPortsInfo(int webPort, int managementPort, Map<Integer, String> rsocketPorts, String ip) {
List<String> ports = new ArrayList<>();
if (webPort > 0) {
ports.add("Web:" + webPort);
}
if (managementPort > 0) {
ports.add("Management:" + webPort);
}
if (rsocketPorts != null && !rsocketPorts.isEmpty()) {
for (Map.Entry<Integer, String> entry : rsocketPorts.entrySet()) {
ports.add("RSocket: " + entry.getValue() + "://" + ip + ":" + entry.getKey());
}
}
return ports;
}

public void setConsumedServices(Set<String> consumedServices) {
if (consumedServices != null && !consumedServices.isEmpty()) {
this.consumedServicesHeader.setVisible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,19 @@ public RSocketBroker findConsistentBroker(String clientId) {
@Override
public void start() {
final String localIp = NetworkUtil.LOCAL_IP;
final int syncInterval = 5_000;
final int numVirtualNodeReplicas = 12;
monoCluster = new ClusterImpl()
.config(clusterConfig -> clusterConfig.externalHost(localIp).externalPort(gossipListenPort))
.membership(membershipConfig -> membershipConfig.seedMembers(seedMembers()).syncInterval(5_000))
.membership(membershipConfig -> membershipConfig.seedMembers(seedMembers()).syncInterval(syncInterval))
.transportFactory(TcpTransportFactory::new)
.transport(transportConfig -> transportConfig.port(gossipListenPort))
.handler(cluster1 -> this)
.start();
//subscribe and start & join the cluster
monoCluster.subscribe();
this.localBroker = new RSocketBroker(localIp, brokerProperties.getExternalDomain());
this.consistentHash = new KetamaConsistentHash<>(12, Collections.singletonList(localIp));
this.consistentHash = new KetamaConsistentHash<>(numVirtualNodeReplicas, Collections.singletonList(localIp));
brokers.put(localIp, localBroker);
log.info(RsocketErrorCode.message("RST-300002"));
Metrics.globalRegistry.gauge("cluster.broker.count", this, (DoubleFunction<RSocketBrokerManagerGossipImpl>) brokerManagerGossip -> brokerManagerGossip.brokers.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ public interface RSocketEncodingFacade {
*
* @return encoding facade
*/
static RSocketEncodingFacade getInstance() {
return RSocketEncodingFacadeImpl.instance;
// static RSocketEncodingFacade getInstance() {
// return RSocketEncodingFacadeImpl.instance;
// }
static RSocketEncodingFacade getInstance(RSocketEncodingFacadeImpl impl) {
return impl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class RSocketEncodingFacadeImpl implements RSocketEncodingFacade {
/**
* composite metadata ByteBuf for message mime types
*/
public static final RSocketEncodingFacade instance = new RSocketEncodingFacadeImpl();
// public static final RSocketEncodingFacade instance = new RSocketEncodingFacadeImpl();

public RSocketEncodingFacadeImpl() {
String vmName = System.getProperty("java.vm.name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.alibaba.rsocket.MutableContext;
import com.alibaba.rsocket.ServiceLocator;
import com.alibaba.rsocket.encoding.RSocketEncodingFacade;
import com.alibaba.rsocket.encoding.impl.RSocketEncodingFacadeImpl;
import com.alibaba.rsocket.metadata.MessageMimeTypeMetadata;
import com.alibaba.rsocket.metadata.RSocketCompositeMetadata;
import com.alibaba.rsocket.metadata.RSocketMimeType;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class RSocketRequesterRpcProxy implements InvocationHandler {
/**
* encoding facade
*/
protected RSocketEncodingFacade encodingFacade = RSocketEncodingFacade.getInstance();
protected RSocketEncodingFacade encodingFacade = RSocketEncodingFacade.getInstance(new RSocketEncodingFacadeImpl());
/**
* java method metadata map cache for performance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.alibaba.rsocket.AbstractRSocket;
import com.alibaba.rsocket.encoding.RSocketEncodingFacade;
import com.alibaba.rsocket.encoding.impl.RSocketEncodingFacadeImpl;
import com.alibaba.rsocket.metadata.*;
import com.alibaba.rsocket.observability.RsocketErrorCode;
import com.alibaba.rsocket.rpc.LocalReactiveServiceCaller;
Expand Down Expand Up @@ -34,7 +35,7 @@ public abstract class RSocketResponderSupport extends AbstractRSocket {
* data format as upstream/downstream + app_name + service names, such as upstream:broker:*, downstream:app-name:*
*/
protected String sourcing;
public static final RSocketEncodingFacade encodingFacade = RSocketEncodingFacade.getInstance();
public static final RSocketEncodingFacade encodingFacade = RSocketEncodingFacade.getInstance(new RSocketEncodingFacadeImpl());
private Map<String, ByteBuf> compositeMetadataForMimeTypes = new HashMap<>();

public void setSourcing(String sourcing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,36 +93,40 @@ public void start() throws Exception {
for (Map.Entry<Integer, String> entry : schemas.entrySet()) {
String schema = entry.getValue();
int port = entry.getKey();
ServerTransport<?> transport;
if (schema.equals("local")) {
transport = LocalServerTransport.create("unittest");
} else if (schema.equals("tcp")) {
transport = TcpServerTransport.create(host, port);
} else if (schema.equals("tcps")) {
TcpServer tcpServer = TcpServer.create()
.host(host)
.port(port)
.secure(ssl -> ssl.sslContext(
SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
.protocols(protocols)
.sslProvider(getSslProvider())
));
transport = TcpServerTransport.create(tcpServer);
} else if (schema.equals("ws")) {
transport = WebsocketServerTransport.create(host, port);
} else if (schema.equals("wss")) {
HttpServer httpServer = HttpServer.create()
.host(host)
.port(port)
.secure(ssl -> ssl.sslContext(
SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
.protocols(protocols)
.sslProvider(getSslProvider())
));
transport = WebsocketServerTransport.create(httpServer);
} else {
transport = TcpServerTransport.create(host, port);
}
ServerTransportFactory factory = ServerTransportFactory.getFactory(schema);
ServerTransport<?> transport = factory.create(host, port, SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
.protocols(protocols)
.sslProvider(getSslProvider()));
// if (schema.equals("local")) {
// transport = LocalServerTransport.create("unittest");
// } else if (schema.equals("tcp")) {
// transport = TcpServerTransport.create(host, port);
// } else if (schema.equals("tcps")) {
// TcpServer tcpServer = TcpServer.create()
// .host(host)
// .port(port)
// .secure(ssl -> ssl.sslContext(
// SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
// .protocols(protocols)
// .sslProvider(getSslProvider())
// ));
// transport = TcpServerTransport.create(tcpServer);
// } else if (schema.equals("ws")) {
// transport = WebsocketServerTransport.create(host, port);
// } else if (schema.equals("wss")) {
// HttpServer httpServer = HttpServer.create()
// .host(host)
// .port(port)
// .secure(ssl -> ssl.sslContext(
// SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
// .protocols(protocols)
// .sslProvider(getSslProvider())
// ));
// transport = WebsocketServerTransport.create(httpServer);
// } else {
// transport = TcpServerTransport.create(host, port);
// }

RSocketServer rsocketServer = RSocketServer.create();
//acceptor interceptor
for (SocketAcceptorInterceptor acceptorInterceptor : acceptorInterceptors) {
Expand Down Expand Up @@ -182,4 +186,4 @@ private SslProvider getSslProvider() {
return SslProvider.JDK;
}
}
}
}
Loading