Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
aa7e39b
Add Kafka repository event integration
geso02 Nov 11, 2024
73eed34
Merge branch 'eclipse-basyx:main' into kafka-event
geso02 Nov 11, 2024
617d4d4
Merge the main branch and implement missing method
geso02 Nov 11, 2024
44d54d3
Add test properties again and fix wrong beans construction
geso02 Nov 11, 2024
a616e12
Fixed automatic upload handling and improved test case path management
geso02 Nov 14, 2024
e64923e
Fix incorrect bean registration in test cases
geso02 Nov 14, 2024
48a3cc3
Fix missing deserializer bean test
geso02 Nov 14, 2024
8284424
Add @Value annotation back again for the member variable
geso02 Nov 15, 2024
4ed56e6
Merge remote-tracking branch 'upstream/main' into kafka-event
geso02 Feb 18, 2025
eda9480
Trigger build
geso02 Feb 18, 2025
524678f
assign another consumer group
geso02 Feb 18, 2025
99a892d
Add newest kafka version and use single broker setup with kRaft and w…
geso02 Feb 18, 2025
9c1a747
Update docker-compose.yml
geso02 Mar 19, 2025
a7fc955
Merge remote-tracking branch 'upstream/main' into kafka-event
geso02 Apr 2, 2025
9926cf4
Add Kafka Example Project (#522)
geso02 Apr 3, 2025
c5349f6
Cleanup events before teststart
geso02 Apr 3, 2025
37f5844
Cleanup topics on teststart and add uncommented test
geso02 Apr 3, 2025
73c2de7
Revert changes and fix README file nameing
geso02 Apr 4, 2025
96e356c
Add newline
geso02 Apr 4, 2025
be24f9a
Add newline
geso02 Apr 4, 2025
05a31cc
Add newlines
geso02 Apr 4, 2025
48cb0f1
Add newlines
geso02 Apr 4, 2025
ebfae92
Add cleanup methods for tests
geso02 Apr 4, 2025
353ef39
Fix imports and use cleanup methods
geso02 Apr 4, 2025
4db66d8
Cleanup registries before test run
geso02 Apr 4, 2025
9229275
Update topic names and wait intervals
geso02 Apr 7, 2025
c7a5bcb
alwayx use seekToEnd for kafka tests
geso02 Apr 7, 2025
179e8cb
Remove seek-to-end method and use the same group-id for the tests intead
geso02 Apr 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions basyx.aasenvironment/basyx.aasenvironment.component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.aasenvironment-feature-authorization</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.aasrepository-feature-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.submodelrepository-feature-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.aasrepository-feature-kafka</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.submodelservice-feature-kafka</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.submodelrepository-feature-kafka</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.http</artifactId>
Expand All @@ -100,8 +126,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.digitaltwin.basyx</groupId>
<artifactId>basyx.mongodbcore</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*******************************************************************************
* Copyright (C) 2024 DFKI GmbH (https://www.dfki.de/en/web)
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*
******************************************************************************/
package org.eclipse.digitaltwin.basyx.aasenvironment.component;

import java.util.concurrent.TimeUnit;

import org.eclipse.digitaltwin.aas4j.v3.dataformat.json.JsonSerializer;
import org.eclipse.digitaltwin.aas4j.v3.model.AssetAdministrationShell;
import org.eclipse.digitaltwin.aas4j.v3.model.Submodel;
import org.eclipse.digitaltwin.basyx.aasrepository.AasRepository;
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.AasEventKafkaListener;
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.KafkaAasRepositoryFeature;
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.TestShells;
import org.eclipse.digitaltwin.basyx.aasrepository.feature.kafka.events.model.AasEvent;
import org.eclipse.digitaltwin.basyx.core.pagination.PaginationInfo;
import org.eclipse.digitaltwin.basyx.submodelrepository.SubmodelRepository;
import org.eclipse.digitaltwin.basyx.submodelrepository.feature.kafka.KafkaSubmodelRepositoryFeature;
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.SubmodelEventKafkaListener;
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.TestSubmodels;
import org.eclipse.digitaltwin.basyx.submodelservice.feature.kafka.events.model.SubmodelEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import org.springframework.http.MediaType;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.result.MockMvcResultMatchers;

/**
* @author sonnenberg (DFKI GmbH)
*/
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@ComponentScan(basePackages = { "org.eclipse.digitaltwin.basyx"})
@RunWith(SpringRunner.class)
@TestPropertySource(properties = {
"basyx.environment=",
"basyx.feature.kafka.enabled=true",
"spring.kafka.bootstrap-servers=PLAINTEXT_HOST://localhost:9092"
})
@AutoConfigureMockMvc
@Import({ SubmodelEventKafkaListener.class, AasEventKafkaListener.class})
public class KafkaEventsInMemoryStorageIntegrationTest {

@Autowired
private AasEventKafkaListener aasEventListener;

@Autowired
private SubmodelEventKafkaListener submodelEventListener;

@Autowired
private KafkaAasRepositoryFeature aasFeature;

@Autowired
private KafkaSubmodelRepositoryFeature submodelFeature;

@Autowired
private MockMvc mvc;

@Autowired
private JsonSerializer serializer;

@Autowired
private SubmodelRepository smRepo;

@Autowired
private AasRepository aasRepo;

@Before
public void awaitAssignment() throws InterruptedException {
aasEventListener.awaitTopicAssignment();
submodelEventListener.awaitTopicAssignment();

cleanup();
}

@Test
public void testCreateAas() throws Exception {
AssetAdministrationShell shell = TestShells.shell();
String body = serializer.write(shell);

mvc.perform(MockMvcRequestBuilders.post("/shells").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON))
.andExpect(MockMvcResultMatchers.status().isCreated())
.andExpect(MockMvcResultMatchers.content().json(body));
AasEvent aasEvt = aasEventListener.next();
Assert.assertEquals(shell, aasEvt.getAas());
Assert.assertEquals(shell.getId(), aasEvt.getId());
Assert.assertNull(aasEvt.getSubmodelId());
Assert.assertNull(aasEvt.getAssetInformation());
Assert.assertNull(aasEvt.getReference());

Submodel sm = TestSubmodels.createSubmodel("http://submodels/123", "123", "hello");
body = serializer.write(sm);
mvc.perform(MockMvcRequestBuilders.post("/submodels").contentType(MediaType.APPLICATION_JSON).content(body).accept(MediaType.APPLICATION_JSON))
.andExpect(MockMvcResultMatchers.status().isCreated());
SubmodelEvent smEvt = submodelEventListener.next();
Assert.assertEquals(sm, smEvt.getSubmodel());
Assert.assertEquals(sm.getId(), smEvt.getId());
Assert.assertNull(smEvt.getSmElement());
Assert.assertNull(smEvt.getSmElementPath());
}


@Test
public void testFeatureIsEnabled() {
Assert.assertTrue(aasFeature.isEnabled());
Assert.assertTrue(submodelFeature.isEnabled());
}

@After
public void cleanup() throws InterruptedException {
for (AssetAdministrationShell aas : aasRepo.getAllAas(new PaginationInfo(null, null)).getResult()) {
aasRepo.deleteAas(aas.getId());
}

for (Submodel sm : smRepo.getAllSubmodels(new PaginationInfo(null, null)).getResult()) {
smRepo.deleteSubmodel(sm.getId());
}
while(submodelEventListener.next(300, TimeUnit.MICROSECONDS) != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,23 @@ public abstract class BaseIntegrationTest {
protected RegistryAndDiscoveryInterfaceApi api;

@Before
public void initClient() throws ApiException {
public void setUp() throws Exception {
initClient();
cleanup();
}

protected void initClient() throws Exception {
api = new RegistryAndDiscoveryInterfaceApi("http", "127.0.0.1", port);
api.deleteAllShellDescriptors();
queue().assertNoAdditionalMessage();
}

@After
public void cleanup() throws ApiException {
queue().assertNoAdditionalMessage();
protected void cleanup() throws ApiException, InterruptedException {
queue().pullAdditionalMessages();
GetAssetAdministrationShellDescriptorsResult result = api.getAllAssetAdministrationShellDescriptors(null, null, null, null);
for (AssetAdministrationShellDescriptor eachDescriptor : result.getResult()) {
api.deleteAssetAdministrationShellDescriptorById(eachDescriptor.getId());
assertThatEventWasSend(RegistryEvent.builder().id(eachDescriptor.getId()).type(EventType.AAS_UNREGISTERED).build());
}
queue().pullAdditionalMessages();
}

@Test
Expand Down Expand Up @@ -230,7 +233,6 @@ public void whenDeleteAll_thenAllDescriptorsAreRemoved() throws ApiException {
assertThat(events.remove(RegistryEvent.builder().id("id_" + i).type(EventType.AAS_UNREGISTERED).build())).isTrue();
}
assertThat(events.isEmpty());
queue().assertNoAdditionalMessage();
}

@Test
Expand All @@ -246,7 +248,7 @@ public void whenCreateAndDeleteDescriptors_thenAllDescriptorsAreRemoved() throws
all = api.getAllAssetAdministrationShellDescriptors(null, null, null, null).getResult();
assertThat(all).isEmpty();

queue().assertNoAdditionalMessage();
queue().pullAdditionalMessages();
}

@Test
Expand Down Expand Up @@ -279,7 +281,7 @@ public void whenRegisterAndUnregisterSubmodel_thenSubmodelIsCreatedAndDeleted()
aasDescriptor = api.getAssetAdministrationShellDescriptorById(aasId);
assertThat(aasDescriptor.getSubmodelDescriptors()).doesNotContain(toRegister);

queue().assertNoAdditionalMessage();
queue().pullAdditionalMessages();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,8 @@ public void reset() {
}
}

public void assertNoAdditionalMessage() {
try {
String message = messageQueue.poll(1, TimeUnit.SECONDS);
if (message != null) {
throw new EventListenerException("Got additional message: " + message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new EventListenerException(e);
}
public void pullAdditionalMessages() throws InterruptedException {
while(messageQueue.poll(100, TimeUnit.MILLISECONDS) != null);
}

public RegistryEvent poll() {
Expand All @@ -81,11 +73,11 @@ public RegistryEvent poll() {
throw new EventListenerException(e);
}
}

public static final class EventListenerException extends RuntimeException {

private static final long serialVersionUID = 1L;

public EventListenerException(Throwable e) {
super(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@
******************************************************************************/
package org.eclipse.digitaltwin.basyx.aasregistry.service.storage.memory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.TopicPartition;
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.BaseIntegrationTest;
import org.eclipse.digitaltwin.basyx.aasregistry.service.tests.integration.EventQueue;
import org.junit.Before;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
Expand All @@ -51,10 +48,12 @@ public class KafkaEventsInMemoryStorageIntegrationTest extends BaseIntegrationTe
@Autowired
private RegistrationEventKafkaListener listener;

@Before
public void awaitAssignment() throws InterruptedException {
@Override
public void setUp() throws Exception {
listener.awaitTopicAssignment();
super.setUp();
}


@Override
public EventQueue queue() {
Expand All @@ -68,9 +67,6 @@ private static class RegistrationEventKafkaListener implements ConsumerSeekAware
private final EventQueue queue;
private final CountDownLatch latch = new CountDownLatch(1);

@Value("${spring.kafka.template.default-topic}")
private String topicName;

@SuppressWarnings("unused")
public RegistrationEventKafkaListener(ObjectMapper mapper) {
this.queue = new EventQueue(mapper);
Expand All @@ -85,8 +81,7 @@ public void receiveMessage(String content) {
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
for (TopicPartition eachPartition : assignments.keySet()) {
if (topicName.equals(eachPartition.topic())) {
callback.seekToEnd(List.of(eachPartition));
if ("aas-registry".equals(eachPartition.topic())) {
latch.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,23 @@ public class AuthorizedClientTest extends BaseIntegrationTest {
@Value("${local.server.port}")
private int port;

@Before
public void awaitAssignment() throws InterruptedException {
@Override
public void setUp() throws Exception {
listener.awaitTopicAssignment();
super.setUp();
}

@Override
public EventQueue queue() {
return listener.getQueue();
}

@Before
@Override
public void initClient() throws ApiException {
public void initClient() throws Exception {
api = new AuthorizedConnectedAasRegistry("http://127.0.0.1:" + port, new TokenManager("http://localhost:9096/realms/BaSyx/protocol/openid-connect/token", new ClientCredentialAccessTokenProvider(new ClientCredential("workstation-1", "nY0mjyECF60DGzNmQUjL81XurSl8etom"))));

api.deleteAllShellDescriptors();
queue().assertNoAdditionalMessage();
queue().pullAdditionalMessages();
}

@Test
Expand Down Expand Up @@ -110,4 +110,4 @@ public void whenPostShellDescriptor_LocationIsReturned() throws ApiException, IO
// TODO: It uses normal GET unauthorized request, need to override and refactor
}

}
}
Loading