Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
.classpath
.settings/

## VS Code
java-formatter.xml
settings.json

## Java
target/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public class EventHubWebhookAdapter extends HttpServlet {
public EventHubWebhookAdapter(CdsRuntime runtime) {
this.runtime = runtime;
this.messagingServices = runtime.getServiceCatalog().getServices(MessagingService.class)
.map(OutboxService::unboxed)
.filter(EventHubMessagingService.class::isInstance)
.map(EventHubMessagingService.class::cast)
.toList();
.map(OutboxService::unboxed)
.filter(EventHubMessagingService.class::isInstance)
.map(EventHubMessagingService.class::cast)
.toList();
ServiceBinding binding = EventHubBindingUtils.getServiceBinding(runtime).get();
this.clientId = EventHubBindingUtils.getClientId(binding);
this.isMultitenant = EventHubBindingUtils.isBindingMultitenant(binding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public class EventHubClient extends RestClient {

public EventHubClient(ServiceBinding binding) {
super(ServiceBindingDestinationOptions
.forService(binding)
.onBehalfOf(OnBehalfOf.TECHNICAL_USER_PROVIDER)
.build());
.forService(binding)
.onBehalfOf(OnBehalfOf.TECHNICAL_USER_PROVIDER)
.build());
}

public void sendMessage(Map<String, Object> message, Map<String, Object> headers) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class EventHubMessagingService extends AbstractMessagingService {

private static final Logger logger = LoggerFactory.getLogger(EventHubMessagingService.class);
public static final String CE_SOURCE = "ceSource";
public static final String SYSTEM_ID = "systemId";

private final String ceSource;
private final String systemId;
private final boolean isMultitenant;
private final MessagingBrokerQueueListener queueListener;
private final EventHubClient eventHubClient;
Expand All @@ -48,12 +50,21 @@ public EventHubMessagingService(ServiceBinding binding, MessagingServiceConfig s
this.ceSource = ((List<String>) binding.getCredentials().get(CE_SOURCE)).get(0) + '/';
} else {
this.ceSource = null;
logger.error("Missing ceSource in binding credentials, emit() will be deactivated");
}

if (binding.getCredentials().containsKey(SYSTEM_ID)) {
this.systemId = (String) binding.getCredentials().get(SYSTEM_ID);
} else {
this.systemId = null;
logger.error("Missing systemId in binding credentials, emit() will be deactivated");
}

this.isMultitenant = EventHubBindingUtils.isBindingMultitenant(binding);
this.queueListener = new MessagingBrokerQueueListener(this, toFullyQualifiedQueueName(queue), queue, runtime, true);
// emitting messages is only supported in multitenant mode
this.eventHubClient = this.isMultitenant ? new EventHubClient(binding) : null;

boolean bindingHasEndpoints = EventHubBindingUtils.bindingHasEndpoints(binding);
this.eventHubClient = bindingHasEndpoints ? new EventHubClient(binding) : null;
}

private static MessagingServiceConfig ensureMandatoryConfig(MessagingServiceConfig serviceConfig) {
Expand Down Expand Up @@ -127,17 +138,22 @@ protected void registerQueueListener(String queue, MessagingBrokerQueueListener

@Override
protected void emitTopicMessage(String topic, TopicMessageEventContext context) {
// emitting messages is only supported in multitenant mode
if (!this.isMultitenant) {
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_FAILED);
}

String tenant = getTenant(context);

try {
Map<String, Object> headers = context.getHeadersMap();
if (ceSource != null) {
headers.put(CloudEventUtils.KEY_SOURCE, ceSource + tenant);
if (isMultitenant) {
if (ceSource != null) {
headers.put(CloudEventUtils.KEY_SOURCE, ceSource + tenant);
} else {
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_MISSING_CE_SOURCE);
}
} else {
if (systemId != null) {
headers.put(CloudEventUtils.KEY_SOURCE, ceSource + systemId);
} else {
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_MISSING_SYSTEM_ID);
}
}

logger.debug("Sending message for Event Hub '{}' to type '{}'", getName(), headers.get(CloudEventUtils.KEY_TYPE));
Expand All @@ -148,7 +164,6 @@ protected void emitTopicMessage(String topic, TopicMessageEventContext context)
}

private String getTenant(EventContext context) {

String tenant = context.getUserInfo().getTenant();

if (tenant != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void services(CdsRuntimeConfigurer configurer) {
List<MessagingServiceConfig> serviceConfigs = config.getServicesByBinding(binding.getName().get());

if (!serviceConfigs.isEmpty()) {
logger.debug("Initialization of the Event Hub based on service binding '{}'", binding.getName().get());
createDefaultService = false;
serviceConfigs.forEach(serviceConfig -> {
if (Boolean.TRUE.equals(serviceConfig.isEnabled())) {
Expand All @@ -50,13 +51,16 @@ public void services(CdsRuntimeConfigurer configurer) {
logger.debug("Initialization of the Event Hub based on service binding '{}' and kind '{}'", binding.getName().get(), KIND_LABEL);
createDefaultService = false;
serviceConfigsByKind.forEach(serviceConfig -> {
// check that the service is enabled and whether not already found by name or binding
if (Boolean.TRUE.equals(serviceConfig.isEnabled())
&& serviceConfigs.stream().noneMatch(c -> c.getName().equals(serviceConfig.getName()))) {
configureService(configurer, binding, serviceConfig);
} else {
logger.info("The messaging service '{}' is explicitly disabled via configuration", serviceConfig.getName());
// check whether the service config is not already found by name or binding
if (serviceConfigs.stream().noneMatch(c -> c.getName().equals(serviceConfig.getName()))) {
// check that the service is enabled
if (Boolean.TRUE.equals(serviceConfig.isEnabled())) {
configureService(configurer, binding, serviceConfig);
} else {
logger.info("The messaging service '{}' is explicitly disabled via configuration", serviceConfig.getName());
}
}

});
}

Expand All @@ -69,18 +73,18 @@ public void services(CdsRuntimeConfigurer configurer) {
configureService(configurer, binding, defConfig);
} else {
logger.warn(
"Could not create service for binding '{}': A configuration with the same name is already defined for another kind or binding.",
binding.getName().get());
"Could not create service for binding '{}': A configuration with the same name is already defined for another kind or binding.",
binding.getName().get());
}
}

logger.debug("Finished the initialization of the Event Hub service binding '{}'", binding.getName().get());
logger.info("Finished the initialization of the Event Hub service binding '{}'", binding.getName().get());
});
}

private void configureService(CdsRuntimeConfigurer configurer, ServiceBinding binding, MessagingServiceConfig serviceConfig) {
logger.debug("Loading config '{}' for service binding '{}'", serviceConfig.getName(), binding.getName().get());
EventHubMessagingService messagingService = new EventHubMessagingService(binding, serviceConfig, configurer.getCdsRuntime());
configurer.service(outboxed(messagingService, serviceConfig, configurer.getCdsRuntime()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ public static boolean isBindingMultitenant(ServiceBinding binding) {
return ServiceBindingUtils.matches(binding, MT_BINDING_LABEL);
}

@SuppressWarnings("unchecked")
public static boolean bindingHasEndpoints(ServiceBinding binding) {
Map<String, Object> credentials = binding.getCredentials();
Map<String, Object> endpoints = (Map<String, Object>) credentials.getOrDefault("endpoints", Map.of());
return !endpoints.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

public enum EventHubErrorStatuses implements ErrorStatus {

EVENT_HUB_EMIT_FAILED(50007026, "Event Hub service in single tenant plan does not support to emit events.", ErrorStatuses.SERVER_ERROR),
MULTIPLE_EVENT_HUB_BINDINGS(50007027, "Multiple event-hub service bindings found: Only a single service binding for Event Hub is supported.", ErrorStatuses.SERVER_ERROR),
EVENT_HUB_TENANT_CONTEXT_MISSING(50007028, "Missing tenant context to emit a message to Event Hub.", ErrorStatuses.SERVER_ERROR);
EVENT_HUB_TENANT_CONTEXT_MISSING(50007028, "Missing tenant context to emit a message to Event Hub.", ErrorStatuses.SERVER_ERROR),
EVENT_HUB_EMIT_MISSING_CE_SOURCE(50007029, "Event Hub service failed to emit, due to ceSource missing in the service binding.", ErrorStatuses.SERVER_ERROR),
EVENT_HUB_EMIT_MISSING_SYSTEM_ID(50007030, "Event Hub service failed to emit, due to systemId missing in the service binding.", ErrorStatuses.SERVER_ERROR);

private final int code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ void testDefaultServConfiguration() {
CdsRuntimeConfigurer configurer = CdsRuntimeConfigurer.create();
configurer.environment(() -> {
return Stream.of(new DefaultServiceBindingBuilder()
.withName("eb-mt-tests-eb").withServicePlan("event-connectivity")
.withServiceName("event-broker").build());
.withName("eb-mt-tests-eb")
.withServiceName("event-broker")
.withServicePlan("event-connectivity")
.build());
});

configurer.serviceConfigurations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -15,36 +14,9 @@
import com.sap.cds.services.impl.environment.SimplePropertiesProvider;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.runtime.CdsRuntimeConfigurer;
import com.sap.cloud.environment.servicebinding.api.DefaultServiceBindingBuilder;

class EventHubMessagingServiceTest {

@Test
void testEmit_SingleTenantNotSupported() {
CdsProperties properties = new CdsProperties();
CdsProperties.Messaging.MessagingServiceConfig config = new CdsProperties.Messaging.MessagingServiceConfig("cfg");
config.setBinding("eb-mt-tests-eb");
config.getOutbox().setEnabled(false);
properties.getMessaging().getServices().put(config.getName(), config);

CdsRuntimeConfigurer configurer = CdsRuntimeConfigurer.create(new SimplePropertiesProvider(properties));
configurer.environment(() -> {
return Stream.of(new DefaultServiceBindingBuilder()
.withName("eb-mt-tests-eb").withServicePlan("event-connectivity")
.withServiceName("event-broker").build());
});

configurer.environmentConfigurations();
configurer.serviceConfigurations();
configurer.eventHandlerConfigurations();
CdsRuntime runtime = configurer.complete();


ContextualizedServiceException e = Assertions.assertThrows(ContextualizedServiceException.class, () -> emitMessage(runtime));
assertEquals(EventHubErrorStatuses.EVENT_HUB_EMIT_FAILED, e.getErrorStatus());

}

@Test
void testEmit_TenantNotSupported() {
CdsProperties properties = new CdsProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,63 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.environment.SimplePropertiesProvider;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.runtime.CdsRuntimeConfigurer;
import com.sap.cloud.environment.servicebinding.api.DefaultServiceBindingBuilder;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;

class EventHubBindingUtilsTest {

private ServiceBinding binding;
private CdsRuntime runtime;
private Map<String, ServiceBinding> bindings = new HashMap<String, ServiceBinding>();
private Map<String, CdsRuntime> runtimes = new HashMap<String, CdsRuntime>();

@BeforeEach
public void setUp() throws Exception {
loadBinding("mt-binding", "bindings-mt.json");
loadBinding("st-binding", "bindings-st.json");
}

private void loadBinding(String id, String bindingPath) {
CdsProperties properties = new CdsProperties();
properties.getEnvironment().getLocal().setDefaultEnvPath("classpath:bindings.json");
runtime = CdsRuntimeConfigurer.create(new SimplePropertiesProvider(properties)).environmentConfigurations().complete();
binding = runtime.getEnvironment().getServiceBindings().findFirst().get();
properties.getEnvironment().getLocal().setDefaultEnvPath("classpath:" + bindingPath);
CdsRuntime runtime = CdsRuntimeConfigurer.create(new SimplePropertiesProvider(properties)).environmentConfigurations().complete();
runtimes.put(id, runtime);
bindings.put(id, runtime.getEnvironment().getServiceBindings().findFirst().get());
}

@Test
void testGetClientId() {
String clientId = EventHubBindingUtils.getClientId(binding);
assertEquals("a5de02ca-a031-47b6-9bec-e15ac24c663a", clientId);
for (ServiceBinding binding : bindings.values()) {
String clientId = EventHubBindingUtils.getClientId(binding);
assertEquals("a5de02ca-a031-47b6-9bec-e15ac24c663a", clientId);
}
}

@Test
void testIsBindingMultiTenant() {
boolean isMultitenant = EventHubBindingUtils.isBindingMultitenant(binding);
assertTrue(isMultitenant);
assertTrue(EventHubBindingUtils.isBindingMultitenant(bindings.get("mt-binding")));
assertFalse(EventHubBindingUtils.isBindingMultitenant(bindings.get("st-binding")));
}

@Test
void testIsBindingSingleTenant() {
binding = new DefaultServiceBindingBuilder().withCredentials(binding.getCredentials()).withTags(List.of("event-broker")).build();
boolean isMultitenant = EventHubBindingUtils.isBindingMultitenant(binding);
assertFalse(isMultitenant);
void testGetServiceBinding() {
for (String id : runtimes.keySet()) {
CdsRuntime runtime = runtimes.get(id);
ServiceBinding retrievedBinding = EventHubBindingUtils.getServiceBinding(runtime).get();
assertEquals(bindings.get(id), retrievedBinding);
}
}

@Test
void testGetServiceBinding() {
ServiceBinding retrievedBinding = EventHubBindingUtils.getServiceBinding(runtime).get();
assertEquals(binding, retrievedBinding);
void testBindingHasEndpoints() {
for (ServiceBinding binding : bindings.values()) {
assertTrue(EventHubBindingUtils.bindingHasEndpoints(binding));
}
}
}
46 changes: 46 additions & 0 deletions cds-feature-event-hub/src/test/resources/bindings-st.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"VCAP_SERVICES": {
"event-broker": [
{
"label": "event-broker",
"provider": null,
"plan": "event-connectivity",
"name": "eb-st-tests-eb",
"tags": [],
"instance_guid": "220ae6d2-aff9-4a0e-af63-93d2ee78be9b",
"instance_name": "eb-st-tests-eb",
"binding_guid": "77e25652-159a-4ec6-adb4-a4e1e69ab14d",
"binding_name": null,
"credentials": {
"eventing": {
"http": {
"x509": {
"url": "https://http-gateway.canary.beb.em.services.cloud.sap"
}
}
},
"authentication-type": "X509_IAS",
"serviceInstanceId": "220ae6d2-aff9-4a0e-af63-93d2ee78be9b",
"ceSource": [
"/default/sap.cdsjavacpoc"
],
"systemId": "0fc7e124-2933-4a1b-a473-8ec6e70a6180",
"ias": {
"clientId": "a5de02ca-a031-47b6-9bec-e15ac24c663a"
},
"authentication-service": {
"service-label": "identity"
},
"endpoints": {
"eventing-endpoint": {
"uri": "https://http-gateway.canary.beb.em.services.cloud.sap",
"always-requires-token": false
}
}
},
"syslog_drain_url": null,
"volume_mounts": []
}
]
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</developers>

<properties>
<revision>4.0.2-SNAPSHOT</revision>
<revision>4.0.2</revision>
<java.version>17</java.version>
<maven.compiler.release>${java.version}</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Loading