Skip to content

Commit 699f583

Browse files
Fix: Implement concurrency control for custom property updates in TypeRepository (#25961)
* Fix: Implement concurrency control for custom property updates in TypeRepository * Move tests to IT --------- Co-authored-by: Ram Narayan Balaji <81347100+yan-3005@users.noreply.github.com>
1 parent f81fc25 commit 699f583

File tree

2 files changed

+102
-21
lines changed

2 files changed

+102
-21
lines changed

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TypeResourceIT.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@
77
import static org.junit.jupiter.api.Assertions.assertTrue;
88

99
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import java.util.ArrayList;
1011
import java.util.List;
1112
import java.util.Map;
1213
import java.util.UUID;
14+
import java.util.concurrent.CopyOnWriteArrayList;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.TimeUnit;
1319
import org.junit.jupiter.api.BeforeAll;
1420
import org.junit.jupiter.api.Disabled;
1521
import org.junit.jupiter.api.Test;
@@ -542,6 +548,65 @@ void test_addMultipleHyperlinkCustomProperties(TestNamespace ns) throws Exceptio
542548
assertTrue(hasProp2, "Type should have second hyperlink custom property");
543549
}
544550

551+
@Test
552+
void test_concurrentCustomPropertyAdditions(TestNamespace ns) throws Exception {
553+
OpenMetadataClient client = SdkClients.adminClient();
554+
Type pipelineType = getTypeByName(client, "pipeline");
555+
556+
int threadCount = 5;
557+
List<CustomProperty> properties = new ArrayList<>();
558+
for (int i = 0; i < threadCount; i++) {
559+
CustomProperty prop = new CustomProperty();
560+
prop.setName(ns.prefix("concurrentProp" + i));
561+
prop.setDescription("Concurrent property " + i);
562+
prop.setPropertyType(STRING_TYPE.getEntityReference());
563+
properties.add(prop);
564+
}
565+
566+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
567+
CountDownLatch startLatch = new CountDownLatch(1);
568+
CountDownLatch doneLatch = new CountDownLatch(threadCount);
569+
List<Exception> errors = new CopyOnWriteArrayList<>();
570+
571+
for (CustomProperty prop : properties) {
572+
executor.submit(
573+
() -> {
574+
try {
575+
startLatch.await();
576+
addCustomProperty(client, pipelineType.getId(), prop);
577+
} catch (InterruptedException e) {
578+
Thread.currentThread().interrupt();
579+
errors.add(e);
580+
} catch (Exception e) {
581+
errors.add(e);
582+
} finally {
583+
doneLatch.countDown();
584+
}
585+
});
586+
}
587+
588+
startLatch.countDown();
589+
assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "All threads should complete within 30s");
590+
executor.shutdown();
591+
592+
assertTrue(errors.isEmpty(), "Concurrent requests should not throw: " + errors);
593+
594+
Type updatedType = getTypeByName(client, "pipeline", "customProperties");
595+
List<String> persistedNames =
596+
updatedType.getCustomProperties() != null
597+
? updatedType.getCustomProperties().stream().map(CustomProperty::getName).toList()
598+
: List.of();
599+
600+
for (CustomProperty prop : properties) {
601+
assertTrue(
602+
persistedNames.contains(prop.getName()),
603+
"Property '"
604+
+ prop.getName()
605+
+ "' was lost due to a concurrent update. Persisted: "
606+
+ persistedNames);
607+
}
608+
}
609+
545610
private static Type createType(OpenMetadataClient client, CreateType createRequest)
546611
throws Exception {
547612
return client
@@ -565,6 +630,16 @@ private static Type getTypeByName(OpenMetadataClient client, String name) throws
565630
return OBJECT_MAPPER.readValue(response, Type.class);
566631
}
567632

633+
private static Type getTypeByName(OpenMetadataClient client, String name, String fields)
634+
throws Exception {
635+
String response =
636+
client
637+
.getHttpClient()
638+
.executeForString(
639+
HttpMethod.GET, "/v1/metadata/types/name/" + name + "?fields=" + fields, null);
640+
return OBJECT_MAPPER.readValue(response, Type.class);
641+
}
642+
568643
private static TypeList listTypes(OpenMetadataClient client) throws Exception {
569644
String response =
570645
client.getHttpClient().executeForString(HttpMethod.GET, "/v1/metadata/types", null);

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TypeRepository.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.Set;
3535
import java.util.UUID;
36+
import java.util.concurrent.ConcurrentHashMap;
3637
import java.util.stream.Collectors;
3738
import lombok.extern.slf4j.Slf4j;
3839
import org.apache.commons.lang3.tuple.Triple;
@@ -64,6 +65,8 @@
6465
public class TypeRepository extends EntityRepository<Type> {
6566
private static final String UPDATE_FIELDS = "customProperties";
6667
private static final String PATCH_FIELDS = "customProperties";
68+
private static final ConcurrentHashMap<UUID, Object> TYPE_PROPERTY_LOCKS =
69+
new ConcurrentHashMap<>();
6770

6871
public TypeRepository() {
6972
super(
@@ -162,31 +165,34 @@ public void postUpdate(Type original, Type updated) {
162165

163166
public PutResponse<Type> addCustomProperty(
164167
UriInfo uriInfo, String updatedBy, UUID id, CustomProperty property) {
165-
Type type = find(id, Include.NON_DELETED);
166-
property.setPropertyType(
167-
Entity.getEntityReferenceById(
168-
Entity.TYPE, property.getPropertyType().getId(), NON_DELETED));
169-
validateProperty(property);
170-
if (type.getCategory().equals(Category.Field)) {
171-
throw new IllegalArgumentException(
172-
"Only entity types can be extended and field types can't be extended");
173-
}
174-
setFieldsInternal(type, putFields);
168+
Object lock = TYPE_PROPERTY_LOCKS.computeIfAbsent(id, k -> new Object());
169+
synchronized (lock) {
170+
Type type = find(id, Include.NON_DELETED);
171+
property.setPropertyType(
172+
Entity.getEntityReferenceById(
173+
Entity.TYPE, property.getPropertyType().getId(), NON_DELETED));
174+
validateProperty(property);
175+
if (type.getCategory().equals(Category.Field)) {
176+
throw new IllegalArgumentException(
177+
"Only entity types can be extended and field types can't be extended");
178+
}
179+
setFieldsInternal(type, putFields);
175180

176-
find(property.getPropertyType().getId(), NON_DELETED); // Validate customProperty type exists
181+
find(property.getPropertyType().getId(), NON_DELETED); // Validate customProperty type exists
177182

178-
// If property already exists, then update it. Else add the new property.
179-
List<CustomProperty> updatedProperties = new ArrayList<>(List.of(property));
180-
for (CustomProperty existing : type.getCustomProperties()) {
181-
if (!existing.getName().equals(property.getName())) {
182-
updatedProperties.add(existing);
183+
// If property already exists, then update it. Else add the new property.
184+
List<CustomProperty> updatedProperties = new ArrayList<>(List.of(property));
185+
for (CustomProperty existing : type.getCustomProperties()) {
186+
if (!existing.getName().equals(property.getName())) {
187+
updatedProperties.add(existing);
188+
}
183189
}
184-
}
185190

186-
type.setCustomProperties(updatedProperties);
187-
type.setUpdatedBy(updatedBy);
188-
type.setUpdatedAt(System.currentTimeMillis());
189-
return createOrUpdate(uriInfo, type, updatedBy);
191+
type.setCustomProperties(updatedProperties);
192+
type.setUpdatedBy(updatedBy);
193+
type.setUpdatedAt(System.currentTimeMillis());
194+
return createOrUpdate(uriInfo, type, updatedBy);
195+
}
190196
}
191197

192198
private List<CustomProperty> getCustomProperties(Type type) {

0 commit comments

Comments
 (0)