Skip to content

Commit 11f7e89

Browse files
andykruth-iiAndy Kruth
andauthored
Fix/streamp 12591/revert (#366)
* Revert "fix: STREAMP-12336: removing validation on schema key if it's changing in update request (#361)" This reverts commit de1399c. * update changelog --------- Co-authored-by: Andy Kruth <ankruth@expediagroup.com>
1 parent 945cf4d commit 11f7e89

File tree

4 files changed

+18
-66
lines changed

4 files changed

+18
-66
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
55
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
66

7-
## [3.0.1] 2024-06-21
7+
## [3.0.2] 2025-01-29
8+
### Reverted
9+
- Reverted the change in 3.0.1, this change would have adverse effects on too many existing schemas.
10+
11+
## ~~[3.0.1] 2024-06-21~~
812
### Bugfix
913
- Added validation if schemaKey has changed in stream update.
1014

core/src/main/java/com/expediagroup/streamplatform/streamregistry/core/services/StreamService.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public Optional<Stream> update(Stream stream) throws ValidationException {
8282
if (existing.isEmpty()) {
8383
throw new ValidationException("Can't update " + stream.getKey() + " because it doesn't exist");
8484
}
85-
validateSchemaKey(stream, existing.get());
85+
stream.setSchemaKey(existing.get().getSchemaKey());
8686
streamValidator.validateForUpdate(stream, existing.get());
8787
stream.setSpecification(handlerService.handleUpdate(stream, existing.get()));
8888
return saveSpecification(stream);
@@ -176,13 +176,4 @@ private java.util.stream.Stream<Process> allProcessesForStream(Stream stream) {
176176
process.getOutputs().stream().anyMatch(output -> output.getStream().equals(stream.getKey()))
177177
);
178178
}
179-
180-
private void validateSchemaKey(Stream stream, Stream existing) {
181-
if (stream.getSchemaKey() == null) {
182-
stream.setSchemaKey(existing.getSchemaKey());
183-
} else if (!existing.getSchemaKey().equals(stream.getSchemaKey())) {
184-
throw new ValidationException("Stream = " + stream.getKey() + " update failed, because existing schemaKey = " +
185-
existing.getSchemaKey() + " is not matching with given schemaKey = " + stream.getSchemaKey());
186-
}
187-
}
188179
}

core/src/test/java/com/expediagroup/streamplatform/streamregistry/core/services/StreamServiceTest.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
import static java.util.Arrays.asList;
1919
import static java.util.Collections.emptyList;
2020
import static java.util.Optional.empty;
21-
import static org.junit.Assert.assertEquals;
22-
import static org.junit.Assert.assertNotNull;
23-
import static org.junit.Assert.assertThrows;
2421
import static org.mockito.Mockito.any;
2522
import static org.mockito.Mockito.doNothing;
2623
import static org.mockito.Mockito.inOrder;
@@ -43,7 +40,6 @@
4340

4441
import com.expediagroup.streamplatform.streamregistry.core.handlers.HandlerService;
4542
import com.expediagroup.streamplatform.streamregistry.core.validators.StreamValidator;
46-
import com.expediagroup.streamplatform.streamregistry.core.validators.ValidationException;
4743
import com.expediagroup.streamplatform.streamregistry.core.views.ConsumerView;
4844
import com.expediagroup.streamplatform.streamregistry.core.views.ProcessView;
4945
import com.expediagroup.streamplatform.streamregistry.core.views.ProducerView;
@@ -186,44 +182,6 @@ public void update() {
186182
verify(streamRepository).saveSpecification(entity);
187183
}
188184

189-
@Test
190-
public void updateWithChangedSchemaKey() {
191-
StreamKey key = new StreamKey();
192-
key.setDomain("domain");
193-
key.setName("stream");
194-
key.setVersion(1);
195-
SchemaKey existingSchema = new SchemaKey("domain", "existing");
196-
SchemaKey updatedSchema = new SchemaKey("domain", "updated");
197-
Stream existingEntity = new Stream(key, new Specification(), existingSchema);
198-
Stream updatedEntity = new Stream(key, new Specification(), updatedSchema);
199-
when(streamRepository.findById(key)).thenReturn(Optional.of(existingEntity));
200-
ValidationException ex = assertThrows(ValidationException.class, () -> streamService.update(updatedEntity));
201-
assertEquals("Stream = " + key + " update failed, because existing schemaKey = " + existingSchema +
202-
" is not matching with given schemaKey = " + updatedSchema, ex.getMessage());
203-
verify(streamRepository).findById(key);
204-
}
205-
206-
@Test
207-
public void updateWithSchemaKeyNull() {
208-
StreamKey key = new StreamKey();
209-
key.setDomain("domain");
210-
key.setName("stream");
211-
key.setVersion(1);
212-
SchemaKey schemaKey = new SchemaKey("domain", "stream_v1");
213-
Stream existingEntity = new Stream(key, new Specification(), schemaKey);
214-
Stream updatedEntity = new Stream(key, new Specification(), null);
215-
when(streamRepository.findById(key)).thenReturn(Optional.of(existingEntity));
216-
streamService.update(updatedEntity);
217-
// should update the schemaKey if null in update
218-
assertNotNull(updatedEntity.getSchemaKey());
219-
assertEquals(updatedEntity.getSchemaKey(), schemaKey);
220-
221-
verify(streamRepository).findById(key);
222-
verify(streamValidator).validateForUpdate(updatedEntity, existingEntity);
223-
verify(handlerService).handleUpdate(updatedEntity, existingEntity);
224-
verify(streamRepository).saveSpecification(updatedEntity);
225-
}
226-
227185
@Test
228186
public void updateStatus() {
229187
final Status status = mock(Status.class);

it/src/test/java/com/expediagroup/streamplatform/streamregistry/it/StreamTestStage.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright (C) 2018-2024 Expedia, Inc.
2+
* Copyright (C) 2018-2021 Expedia, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -70,16 +70,15 @@ public void update() {
7070

7171
@Override
7272
public void upsert() {
73-
/**
74-
* Not throw exception, because we are over-riding schemaKey if schemaKey is null
75-
*/
76-
client.getOptionalData(factory.upsertStreamMutationBuilder()
77-
.schema(null)
78-
.build()).get();
79-
80-
/**
81-
* This should throw exception if schemaKey is matches with the existing schemaKey.
82-
*/
73+
74+
try {
75+
client.getOptionalData(factory.upsertStreamMutationBuilder()
76+
.schema(null)
77+
.build()).get();
78+
} catch (RuntimeException ex) {
79+
assertEquals("Schema does not exist", ex.getMessage());
80+
}
81+
8382
try {
8483
SchemaKeyInput nonExisting = SchemaKeyInput.builder()
8584
.domain(factory.domainName)
@@ -88,8 +87,8 @@ public void upsert() {
8887
client.getOptionalData(factory.upsertStreamMutationBuilder()
8988
.schema(nonExisting)
9089
.build()).get();
91-
} catch(RuntimeException ex ) {
92-
assertTrue(ex.getMessage().contains("update failed, because existing schemaKey"));
90+
} catch (RuntimeException ex) {
91+
assertEquals("Schema does not exist", ex.getMessage());
9392
}
9493

9594
Object data = client.getOptionalData(factory.upsertStreamMutationBuilder().build()).get();

0 commit comments

Comments
 (0)