Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [3.0.1] 2024-06-21
## [3.0.2] 2025-01-29
### Reverted
- Reverted the change in 3.0.1, this change would have adverse effects on too many existing schemas.

## ~~[3.0.1] 2024-06-21~~
### Bugfix
- Added validation if schemaKey has changed in stream update.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Optional<Stream> update(Stream stream) throws ValidationException {
if (existing.isEmpty()) {
throw new ValidationException("Can't update " + stream.getKey() + " because it doesn't exist");
}
validateSchemaKey(stream, existing.get());
stream.setSchemaKey(existing.get().getSchemaKey());
streamValidator.validateForUpdate(stream, existing.get());
stream.setSpecification(handlerService.handleUpdate(stream, existing.get()));
return saveSpecification(stream);
Expand Down Expand Up @@ -176,13 +176,4 @@ private java.util.stream.Stream<Process> allProcessesForStream(Stream stream) {
process.getOutputs().stream().anyMatch(output -> output.getStream().equals(stream.getKey()))
);
}

private void validateSchemaKey(Stream stream, Stream existing) {
if (stream.getSchemaKey() == null) {
stream.setSchemaKey(existing.getSchemaKey());
} else if (!existing.getSchemaKey().equals(stream.getSchemaKey())) {
throw new ValidationException("Stream = " + stream.getKey() + " update failed, because existing schemaKey = " +
existing.getSchemaKey() + " is not matching with given schemaKey = " + stream.getSchemaKey());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Optional.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
Expand All @@ -43,7 +40,6 @@

import com.expediagroup.streamplatform.streamregistry.core.handlers.HandlerService;
import com.expediagroup.streamplatform.streamregistry.core.validators.StreamValidator;
import com.expediagroup.streamplatform.streamregistry.core.validators.ValidationException;
import com.expediagroup.streamplatform.streamregistry.core.views.ConsumerView;
import com.expediagroup.streamplatform.streamregistry.core.views.ProcessView;
import com.expediagroup.streamplatform.streamregistry.core.views.ProducerView;
Expand Down Expand Up @@ -186,44 +182,6 @@ public void update() {
verify(streamRepository).saveSpecification(entity);
}

@Test
public void updateWithChangedSchemaKey() {
StreamKey key = new StreamKey();
key.setDomain("domain");
key.setName("stream");
key.setVersion(1);
SchemaKey existingSchema = new SchemaKey("domain", "existing");
SchemaKey updatedSchema = new SchemaKey("domain", "updated");
Stream existingEntity = new Stream(key, new Specification(), existingSchema);
Stream updatedEntity = new Stream(key, new Specification(), updatedSchema);
when(streamRepository.findById(key)).thenReturn(Optional.of(existingEntity));
ValidationException ex = assertThrows(ValidationException.class, () -> streamService.update(updatedEntity));
assertEquals("Stream = " + key + " update failed, because existing schemaKey = " + existingSchema +
" is not matching with given schemaKey = " + updatedSchema, ex.getMessage());
verify(streamRepository).findById(key);
}

@Test
public void updateWithSchemaKeyNull() {
StreamKey key = new StreamKey();
key.setDomain("domain");
key.setName("stream");
key.setVersion(1);
SchemaKey schemaKey = new SchemaKey("domain", "stream_v1");
Stream existingEntity = new Stream(key, new Specification(), schemaKey);
Stream updatedEntity = new Stream(key, new Specification(), null);
when(streamRepository.findById(key)).thenReturn(Optional.of(existingEntity));
streamService.update(updatedEntity);
// should update the schemaKey if null in update
assertNotNull(updatedEntity.getSchemaKey());
assertEquals(updatedEntity.getSchemaKey(), schemaKey);

verify(streamRepository).findById(key);
verify(streamValidator).validateForUpdate(updatedEntity, existingEntity);
verify(handlerService).handleUpdate(updatedEntity, existingEntity);
verify(streamRepository).saveSpecification(updatedEntity);
}

@Test
public void updateStatus() {
final Status status = mock(Status.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2018-2024 Expedia, Inc.
* Copyright (C) 2018-2021 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,16 +70,15 @@ public void update() {

@Override
public void upsert() {
/**
* Not throw exception, because we are over-riding schemaKey if schemaKey is null
*/
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(null)
.build()).get();

/**
* This should throw exception if schemaKey is matches with the existing schemaKey.
*/

try {
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(null)
.build()).get();
} catch (RuntimeException ex) {
assertEquals("Schema does not exist", ex.getMessage());
}

try {
SchemaKeyInput nonExisting = SchemaKeyInput.builder()
.domain(factory.domainName)
Expand All @@ -88,8 +87,8 @@ public void upsert() {
client.getOptionalData(factory.upsertStreamMutationBuilder()
.schema(nonExisting)
.build()).get();
} catch(RuntimeException ex ) {
assertTrue(ex.getMessage().contains("update failed, because existing schemaKey"));
} catch (RuntimeException ex) {
assertEquals("Schema does not exist", ex.getMessage());
}

Object data = client.getOptionalData(factory.upsertStreamMutationBuilder().build()).get();
Expand Down
Loading